首页 > 编程 > .NET > 正文

.NET可复用TCP通信层之消息分派器组件

2024-07-10 13:04:47
字体:
来源:转载
供稿:网友


  上一篇主要讲到了tcp通信层中的核心组件――tcp组件的实现,tcp组件是整个通信层的消息驱动源,甚至,可以将tcp组件看作是我们整个服务器系统的消息驱动源,消息处理过程从这里引发。类似的消息驱动源还有发布的webservice接口、remoting接口等。今天我们需要关注的是tcp通信层中的“中央”组件――消息分派器组件itcpreqstreamdispatcher,大家已经从前文的组件关系图中看到了消息分派器的大致位置和作用了,它是tcp通信组件和消息处理器之间的“桥梁”。我们再对前文描述的通信层组件之间关系的一段话回顾一下:

  “当网络(tcp)组件从某个tcp连接上接收到一个请求时,会将请求转发给消息分派器,消息分派器通过idatastreamhelper组件获取请求消息的类型,然后根据此类型要求处理器工厂创建对应类型的请求处理器,请求处理器处理请求并返回结果。接下来再由网络组件把结果返回给终端用户。在消息分派器进行请求消息分派之前,可能涉及一系列的操作,像消息加密/解密、消息分裂/重组、消息验证等。”

  上面的描述中已经体现出了消息分派器的主要职责,在理解了消息分派器职责的基础上,我们可以进一步来看看消息分派器的定义和实现了。 
  二.消息分派器组件
  1.消息分派器组件接口的定义

  消息分派器的接口很简单:

    public interface itcpreqstreamdispatcher : ireqeststreamdispatcher
    {       
        arraylist dealrequestmessage(requestdata requestdata ,out byte[] leftdata ,ref requestvalidation validation) ;//同步回复
        bool      dealrequestmessage(requestdata requestdata , networkstream userstream ,out byte[] leftdata) ; //异步回复       
    }

  这个接口只有两个方法,第二个方法用于异步发送回复(即绕开tcp组件发送回复),该方法的核心部分可以由第一个方法实现,我们把注意力放在第一个方法上,而tcp组件与消息分派器进行交互的也正是第一个方法。我先解释一下这个方法的几个参数的含义:

  requestdata是对请求消息的封装:

    //从网络接收到的原始数据的封装
    public class requestdata
    {
        public int  connectid = 0 ;
        public bool isfirstmsg = false ; //标志是否为连接建立后的第一条消息
        public byte[] buff     = null ; //接收数据缓冲区 ,可能其头部包含上次未处理完的数据
        public int validcount  = 0 ; //缓冲区中有效字节的个数 >= 本次接收的字节数       
    }

  前面已经提到过,connectid用于标志每一个tcp连接,isfirstmsg用于表明是否为tcp连接建立后的第一个消息,因为我们可能需要对第一个消息进行额外的验证,比如,果第一个消息不是登录请求,就关闭该tcp连接。

  第二个参数leftdata,表示requestdata.buff中的数据经过消息分裂器分裂之后余下的数据(一条非完整的消息),这些数据被tcp组件用来放在下一次收到的数据的头部进行消息重组。

  第三个参数validation,是个ref参数,用于通知tcp组件对消息验证的结果,如果验证失败,tcp组件将关闭对应的tcp连接。

  该方法的返回值是回复的集合,每一个回复对应一个请求,而requestdata.buff中的数据可能分裂成多个请求。另外要注意,有些请求可能是没有回复消息的。

  在我们的tcp组件的两种实现中,都可以看到类似下面的与消息分派器交互的语句:

                //处理请求   
                byte[] leftdata = null ;               
                arraylist repondlist = this.messagedispatcher.dealrequestmessage(key.requestdata  ,out leftdata , ref key.validation) ;
                if(this.validaterequest)
                {
                    if(key.validation.gotocloseconnection)
                    {
                        this.disposeoneconnection(streamhashcode ,key.validation.cause) ;
                    }
                }
 
  2.消息分派器组件基本元素的实现

  正如在实现tcp组件之前需要构建一些基本元素,在实现消息分派器之前也是如此,用于支持消息分派器实现的基本元素包括:idatastreamhelper、消息分裂器、消息处理器工厂、itcpstreamdispatcherhook等。

  (1)idatastreamhelper消息分裂器

  idatastreamhelper,前文中已经提到,idatastreamhelper用于从请求/回复消息中提取消息的“元数据”,并提供一些辅助方法,每个特定的应用,它们对idatastreamhelper的实现可能是不一样的。idatastreamhelper接口定义如下:

     /// <summary>
    /// idatastreamhelper 通信协议的面向流辅助设施。
    /// </summary>
    public interface idatastreamhelper :istringencoder
    {
        int maxrecievebuffsize{get ;} //接收缓冲区的大小
        int messageheaderlength{get ;} //消息头的长度
        int offsetoflengthfield{get ;} //表示消息长度的字段在消息头中的偏移
        idatastreamheader parsemessageheader(byte[] data ,int offset) ; //解析消息头
        lengthtypeinheader lengthtypeinheader{get ;}
        byte[] getrespondwhenfailure(byte[] reqdata ,servicefailuretype failtype) ;    //根据服务失败类型获取失败回复消息
        byte[] getrespondwhenfailure(byte[] reqdata ,string errormsg) ;           
    }
    /// <summary>
    /// stringencoder 限定字符串编码格式
    /// </summary>
    public interface istringencoder
    {
        string getstrfromstream(byte[] stream ,int offset ,int len) ;
        byte[] getbytesfromstr(string ss) ;
    }
    /// <summary>
    /// servicefailuretype 服务失败类型
    /// </summary>
    public enum servicefailuretype
    {
        invalidmessge ,parsefailure ,handlefailure ,servicestopped ,serviceisnotexit ,serverisbusy
    }

  idatastreamheader即是我们所说的消息的“元数据”,如其名所示,它也是消息的“消息头”。请让我补充说明一下,依照我的经验,消息由消息头header和消息主体body组成,消息头用于存放消息的“元数据”等信息,而消息主体用于存放与特定请求相关的数据。消息头的长度固定,比如都是64字节或都是128字节。请求消息和回复消息公用相同格式的消息头。我们来看看消息头接口idatastreamheader的定义:

    public interface idatastreamheader
    {
        int messagelength    {get ;set ;} //本消息长度
        int typekey            {get ;set ;} //请求的目录类型
        int servicekey        {get ;set ;} //请求类型
        int serviceitemindex{get ;set ;} //请求细分索引
        int randomnum        {get ;set ;} //用于将回复与请求一一对应起来       
        int result            {get ;set ;} //服务结果   
   
        string userid        {get ;set ;} //发出请求的用户编号

        byte[] todatastream() ;              //将消息头转化为流,流的长度位消息头的长度
        void   todatastream(byte[] buff ,int offset);   
    }

  需要解释一下typekey、servicekey、serviceitemindex,我们实际上将服务类型分为三级,可以举个不太恰当的例子让大家有个感性的认识。比如,生活中的衣、食、住、行可以作为不同的typekey,而“衣”中的春装、冬装可作为servicekey,而“春装”中的t恤、夹克可作为serviceitemindex。对于服务的类型,你可以根据自己的意愿分成任意层级,但据我的经验,通常情况下,三层已经够用了。

  (2)消息分裂器

  前面已经多次提到消息分裂器messagesplitter,它用于将接收缓冲区中的数据分裂成一个个完整的消息,并且把余下的非完整数据返回,其接口定义如下:

public interface imessagesplitter
    {
        void initialize(int maxbuffsize ,int headerlen ,int offsetlenfield ,lengthtypeinheader lentype) ;
        arraylist splitrequestmsgs(byte[] buff ,int validcount , out byte[] leftdata) ;//arraylist 中每条记录都是是byte[],表示一个完整的请求
    }
    //消息头中的长度是body长度还是总长度
    public enum lengthtypeinheader
    {
        totallen ,bodylen
    }

  其中,initialize方法中的参数都可以由idatastreamheader提供。leftdata是余下的非完整消息的数据。splitrequestmsgs方法返回的集合中是一条条完整的请求消息。

  (3)消息处理器工厂

  消息处理器工厂根据消息的类型(typekey、servicekey)创建对应的消息处理器来出来该消息,其接口定义如下:

    public interface irequestdealerfactory
    {
        irequestdealer createdealer(int requesttype ,int servertypekey)  ;//servertypekey 比如城市代号
        event cbackrequestrecieved requestrecieved ;
    }

  createdealer方法返回的irequestdealer就是消息处理器,每一个消息处理器用于处理某种特定类型(servicekey)的所有请求。通常,可以将消息处理器封装成插件dll,以实现功能服务的“热插拔”。

  (4)消息处理器

  消息处理器irequestdealer定义如下:

    public interface irequestdealer
    {       
        byte[]  dealrequestmessage(roundedrequestmsg reqmsg ) ;//同步回复

        event cbackrequestrecieved requestrecieved ;
    }
    public delegate void cbackrequestrecieved(roundedrequestmsg roundedmsg) ;
    /// <summary>
    /// roundedrequestmsg 对应于一条完整的请求
    /// </summary>
    public struct roundedrequestmsg
    {
        public int connectid ; //请求所对应的tcp连接
        public byte[] data ;
    }

  roundedrequestmsg.data是经消息分裂器分裂得到的一个完整的请求消息,一个字节不多、一个字节也不少。

  (5)itcpstreamdispatcherhook

  itcpstreamdispatcherhook是一个hook,它为用户提供了一个自定义的对请求/回复消息进行操作的插入点。itcpstreamdispatcherhook由tcpstreamdispatcher使用,用于对请求消息和回复消息进行截获,然后处理或转换这些消息,比如常用的处理/转换操作包括:加密/解密、消息验证等等。itcpstreamdispatcherhook定义如下:

    /// <summary>
    /// itcpstreamdispatcherhook 由tcpstreamdispatcher使用,用于对请求消息和回复消息进行截获,然后处理转换这些消息,
    /// 比如加密/解密。 
    /// </summary>
    public interface itcpstreamdispatcherhook
    {
        //转换消息
        byte[] capturerequestmsg(byte[] roundedmsg) ;
        byte[] capturerespondmsg(byte[] roundedmsg) ;

        //验证消息,以下验证的消息是还没有被捕获的消息
        bool verifyfirstmsgofuser(byte[] roundedmsg ,ref requestvalidation validation) ;
        bool verifyothermessage(byte[]   roundedmsg ,ref requestvalidation validation) ;
    }

  关于这个接口中各方法的含义可以在消息分派器的实现中更好的领会!

  3.消息分派器实现

  在前述的基本元素的基础上,实现消息分派器非常简单,我们来看其核心方法dealrequestmessage的实现源码:

      private imessagesplitter               curmsgsplitter = new messagespliter() ;
      private idatastreamhelper            curmsghelper ;  //必须设置
      private irequestdealerfactory       curdealerfactory ;  //必须设置
      private itcpstreamdispatcherhook tcpstreamdispatcherhook ;

       public arraylist dealrequestmessage(requestdata requestdata, out byte[] leftdata, ref requestvalidation validation)
        {
            //消息分裂
            arraylist respondlist = new arraylist() ;
            arraylist reqlist = this.curmsgsplitter.splitrequestmsgs(requestdata.buff ,requestdata.validcount ,out leftdata) ;
            if(reqlist == null)
            {
                return respondlist ;
            }               
            bool verified = true ;
            for(int i=0; i<reqlist.count ;i++)
            {       
                byte[] thedata = (byte[])reqlist[i] ;
                #region 验证消息               
                if(requestdata.isfirstmsg && (i == 0))
                {                       
                    verified = this.tcpstreamdispatcherhook.verifyfirstmsgofuser(thedata ,ref validation) ;                   
                }
                else
                {                           
                    verified = this.tcpstreamdispatcherhook.verifyothermessage(thedata ,ref validation ) ;                   
                }

                if(! verified)
                {
                    if(validation.gotocloseconnection)
                    {
                        return null ;
                    }

                    this.addrespondtolist(respondlist ,this.curmsghelper.getrespondwhenfailure(thedata ,servicefailuretype.invalidmessge)) ;
                    continue ;
                }
                #endregion
               
                //接插,捕获/转换请求消息
                byte[] reqdata = this.tcpstreamdispatcherhook.capturerequestmsg(thedata) ;
                #region 处理消息
                //处理消息
                idatastreamheader header = this.curmsghelper.parsemessageheader(reqdata ,0);
                irequestdealer dealer = this.curdealerfactory.createdealer(header.servicekey ,header.typekey) ;
                if(dealer == null)
                {
                    this.addrespondtolist(respondlist ,this.curmsghelper.getrespondwhenfailure(reqdata ,servicefailuretype.serviceisnotexit)) ;
                    continue ;
                }
                roundedrequestmsg roundreqmsg = new roundedrequestmsg();
                roundreqmsg.connectid = requestdata.connectid ;
                roundreqmsg.data = reqdata ;   
                try
                {
                    byte[] responddata = dealer.dealrequestmessage(roundreqmsg) ;
                   
                    if(responddata != null)
                    {
                        this.addrespondtolist(respondlist ,responddata) ;
                    }
                }
                catch(exception ee)
                {                   
                    this.addrespondtolist(respondlist , this.curmsghelper.getrespondwhenfailure(reqdata ,ee.message)) ;
                }   
                #endregion
            }

            return respondlist;
        }
        //将回复消息加密后放入list
        private void addrespondtolist(arraylist list ,byte[] theresponddata)
        {
            //接插,捕获/转换回复消息
            byte[] responddata = this.tcpstreamdispatcherhook.capturerespondmsg(theresponddata) ;
            list.add(responddata) ;
        }

  如果你是一直按顺序读下来的,理解上面的实现一定不成什么问题。到这里,tcp通信层的所有重要的设施基本都已介绍完毕,最后,给出了提示,即,在你的应用中,如何使用这个可复用的tcp通信层。步骤如下:

  (1)实现idatastreamhelper接口。

  (2)实现ireqeststreamdispatcher接口,如果采用的是tcp协议,则可直接使用参考实现tcpstreamdispatcher

  (3)实现各种请求处理器,这些处理器实现irequestdealer接口。

  (4)实现irequestdealerfactory接口。

  接下来,还有什么?其实,还有很多,都可以提高到框架的层次,以便复用。比如,前面我们处理消息都是基于流(byte[])的形式,在此基础上,我们可以更上一层,采用基于对象的形式――即,将请求消息和回复消息都封装成类,这就涉及了流的解析(流=>对象)和对象序列化(消息对象=>流)问题;另外,我们甚至可以将tcp用户管理纳入到框架的高度,以进行复用,比如,通常基于tcp服务的系统都需要管理在线的tcp用户,并记录tcp用户请求服务的具体信息、在线时间等,这些经过良好的分析概括都可以提高到复用的高度。以后有时间,我会将这样的经验和大家分享。

  最后,把enterpriseserverbase类库中的network命名空间中的源码和大家共享,希望对大家有所帮助!(另,该命名空间中已经包含了上述的基于对象的消息和tcp用户管理的可复用组件)。

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表