FastSocket这个东西上次我已经说过,它使用简单,功能强大,扩展灵活,目前在新浪的生产环境中已经被广泛使用,所以它的性能,安全等各方面我们绝对可以信赖,今天我们来说一个话题,和上一讲有关,这次我们制作一个基于FastSocket的传输协议,它的意义重大,当fastSocket提供的协议不能满足项目要求时,我们就必须硬着头皮去自己写了,还好,fastsocket为我们铺好了路,我们只要按着这条路走下去,就可以了。
首先,如果要想扩展一个自己的协议,要对%20client和server端分别进行开发,下面我们来看一下client的开发
我们要添加的类有三个文件组成,分别是DSSBinaryPRotocol,DSSBinaryResponse和一个使用这个协议的客户端入口DSSBinarySocketClient
DSSBinaryProtocol
/// <summary> /// 异步二进制协议 /// 协议格式 /// [Message Length(int32)][SeqID(int32)][ProjectID(int16)][Cmd Length(int16)][VersonNumber Length(int16)][Cmd + VersonNumber + Body Buffer] /// 其中参数TableName和VersonNumber长度为40,不够自动在左侧补空格 /// </summary> public sealed class DSSBinaryProtocol : iprotocol<DSSBinaryResponse> { #region IProtocol Members /// <summary> /// find response /// </summary> /// <param name="connection"></param> /// <param name="buffer"></param> /// <param name="readlength"></param> /// <returns></returns> /// <exception cref="BadProtocolException">bad async binary protocl</exception> public DSSBinaryResponse FindResponse(IConnection connection, ArraySegment<byte> buffer, out int readlength) { if (buffer.Count < 4) { readlength = 0; return null; } //获取message length var messageLength = NetworkBitConverter.ToInt32(buffer.Array, buffer.Offset); if (messageLength < 7) throw new BadProtocolException("bad async binary protocl"); readlength = messageLength + 4; if (buffer.Count < readlength) { readlength = 0; return null; } var seqID = NetworkBitConverter.ToInt32(buffer.Array, buffer.Offset + 4); var projectID = NetworkBitConverter.ToInt16(buffer.Array, buffer.Offset + 8); var flagLength = NetworkBitConverter.ToInt16(buffer.Array, buffer.Offset + 10); var versonLength = NetworkBitConverter.ToInt16(buffer.Array, buffer.Offset + 12); var strName = Encoding.UTF8.GetString(buffer.Array, buffer.Offset + 14, flagLength); var versonNumber = Encoding.UTF8.GetString(buffer.Array, buffer.Offset + 14 + flagLength, versonLength); var dataLength = messageLength - 10 - flagLength - versonLength; byte[] data = null; if (dataLength > 0) { data = new byte[dataLength]; Buffer.BlockCopy(buffer.Array, buffer.Offset + 14 + flagLength + versonLength, data, 0, dataLength); } return new DSSBinaryResponse(seqID, projectID, strName, versonNumber, data); } #endregion }View Code
DSSBinaryResponse
/// <summary> /// 数据同步系统DSS使用的Socket协议,我们称为DSSBinary协议 /// [Message Length(int32)][SeqID(int32)][ProjectID(int16)][Cmd Length(int16)][VersonNumber Length(int16)][Cmd + VersonNumber + Body Buffer] /// </summary> public class DSSBinaryResponse : IResponse { /// <summary> /// 流水ID /// </summary> public int SeqID { get; private set; } /// <summary> /// 项目类型编号 /// </summary> public short ProjectID { get; set; } /// <summary> /// 本次传输的版本号,所有客户端唯一[项目名称(4字节)+guid(36字节)] /// </summary> public string VersonNumber { get; private set; } /// <summary> /// 命令名称 /// </summary> public string Flag { get; private set; } /// <summary> /// 要操作的表对象,以字节数组形式进行传输 /// </summary> public readonly byte[] Buffer = null; public DSSBinaryResponse(int seqID, short projectID, string flag, string versonNumber, byte[] buffer) { this.SeqID = seqID; this.ProjectID = projectID; this.VersonNumber = versonNumber; this.Flag = flag; this.Buffer = buffer; } }View Code
DSSBinarySocketClient
/// <summary> /// 异步socket客户端 /// </summary> public class DSSBinarySocketClient : PooledSocketClient<DSSBinaryResponse> { #region Constructors /// <summary> /// new /// </summary> public DSSBinarySocketClient() : base(new DSSBinaryProtocol()) { } /// <summary> /// new /// </summary> /// <param name="socketBufferSize"></param> /// <param name="messageBufferSize"></param> public DSSBinarySocketClient(int socketBufferSize, int messageBufferSize) : base(new DSSBinaryProtocol(), socketBufferSize, messageBufferSize, 3000, 3000) { } /// <summary> /// new /// </summary> /// <param name="socketBufferSize"></param> /// <param name="messageBufferSize"></param> /// <param name="millisecondsSendTimeout"></param> /// <param name="millisecondsReceiveTimeout"></param> public DSSBinarySocketClient(int socketBufferSize, int messageBufferSize, int millisecondsSendTimeout, int millisecondsReceiveTimeout) : base(new DSSBinaryProtocol(), socketBufferSize, messageBufferSize, millisecondsSendTimeout, millisecondsReceiveTimeout) { } #endregion #region Public Methods public Task<TResult> Send<TResult>(string cmdName, short projectID, string versonNumber, byte[] payload, Func<DSSBinaryResponse, TResult> funcResultFactory, object asyncState = null) { return this.Send(null, cmdName, projectID, versonNumber, payload, funcResultFactory, asyncState); } public Task<TResult> Send<TResult>(byte[] consistentKey, string cmdName, short projectID, string versonNumber, byte[] payload, Func<DSSBinaryResponse, TResult> funcResultFactory, object asyncState = null) { if (string.IsNullOrEmpty(cmdName)) throw new ArgumentNullException("cmdName"); if (funcResultFactory == null) throw new ArgumentNullException("funcResultFactory"); var seqID = base.NextRequestSeqID(); var cmdLength = cmdName.Length; var versonNumberLength = versonNumber.Length; var messageLength = (payload == null ? 0 : payload.Length) + cmdLength + versonNumberLength + 10; var sendBuffer = new byte[messageLength + 4];
新闻热点
疑难解答