首页 > 编程 > C# > 正文

C#中一个高性能异步socket封装库的实现思路分享

2020-01-24 00:24:31
字体:
来源:转载
供稿:网友

前言

socket是软件之间通讯最常用的一种方式。c#实现socket通讯有很多中方法,其中效率最高就是异步通讯。

异步通讯实际是利用windows完成端口(IOCP)来处理的,关于完成端口实现原理,大家可以参考网上文章。

我这里想强调的是采用完成端口机制的异步通讯是windows下效率最高的通讯方式,没有之一!

异步通讯比同步通讯处理要难很多,代码编写中会遇到许多“坑“。如果没有经验,很难完成。

我搜集了大量资料,完成了对异步socket的封装。此库已用稳定高效的运行几个月。

纵观网上的资料,我还没有遇到一个满意的封装库。许多文章把数据收发和协议处理杂糅在一块,代码非常难懂,也无法扩展。

在编写该库时,避免以上缺陷。将逻辑处理层次化,模块化!同时实现了高可用性与高性能。

为了使大家对通讯效率有初步了解,先看测试图。

主机配置情况

百兆带宽基本占满,cpu占用40%,我的电脑在空闲时,cpu占用大概20%,也就是说程序占用cpu 20%左右。

这个库是可扩展的,就是说即使10万个连接,收发同样的数据,cpu占用基本相同。

库的结构图

目标

即可作为服务端(监听)也可以作为客户端(主动连接)使用。

可以适应任何网络协议。收发的数据针对字节流或一个完整的包。对协议内容不做处理。

高可用性。将复杂的底层处理封装,对外接口非常友好。

高性能。最大限度优化处理。单机可支持数万连接,收发速度可达几百兆bit。

实现思路

网络处理逻辑可以分为以下几个部分:

网络监听 可以在多个端口实现监听。负责生成socket,生成的socket供后续处理。监听模块功能比较单一,如有必要,可对监听模块做进一步优化。

主动连接 可以异步或同步的连接对方。连接成功后,对socket的后续处理,与监听得到的socket完全一样。注:无论是监听得到的socket,还是连接得到的socket,后续处理完全一样。

Socket收发处理 每个socket对应一个收发实例,socket收发只针对字节流处理。收发时,做了优化。比如发送时,对数据做了沾包,提高发送性能;接收时,一次投递1K的数据。

组包处理 一般数据包都有包长度指示;比如 报头的前俩个字节表示长度,根据这个值就可以组成一个完整的包。

NetListener 监听

using System;using System.Net;using System.Net.Sockets;using System.Threading; namespace IocpCore{ class NetListener {  private Socket listenSocket;  public ListenParam _listenParam { get; set; }  public event Action<ListenParam, AsyncSocketClient> OnAcceptSocket;   bool start;   NetServer _netServer;  public NetListener(NetServer netServer)  {   _netServer = netServer;  }   public int _acceptAsyncCount = 0;  public bool StartListen()  {   try   {    start = true;    IPEndPoint listenPoint = new IPEndPoint(IPAddress.Parse("0.0.0.0"), _listenParam._port);    listenSocket = new Socket(listenPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);    listenSocket.Bind(listenPoint);    listenSocket.Listen(200);     Thread thread1 = new Thread(new ThreadStart(NetProcess));    thread1.Start();        StartAccept();    return true;   }   catch (Exception ex)   {    NetLogger.Log(string.Format("**监听异常!{0}", ex.Message));    return false;   }  }   AutoResetEvent _acceptEvent = new AutoResetEvent(false);  private void NetProcess()  {   while (start)   {    DealNewAccept();    _acceptEvent.WaitOne(1000 * 10);   }  }   private void DealNewAccept()  {   try   {    if(_acceptAsyncCount <= 10)    {     StartAccept();    }     while (true)    {     AsyncSocketClient client = _newSocketClientList.GetObj();     if (client == null)      break;      DealNewAccept(client);    }   }   catch (Exception ex)   {    NetLogger.Log(string.Format("DealNewAccept 异常 {0}***{1}", ex.Message, ex.StackTrace));   }  }   private void DealNewAccept(AsyncSocketClient client)  {   client.SendBufferByteCount = _netServer.SendBufferBytePerClient;   OnAcceptSocket?.Invoke(_listenParam, client);  }   private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs acceptEventArgs)  {   try   {    Interlocked.Decrement(ref _acceptAsyncCount);    _acceptEvent.Set();    acceptEventArgs.Completed -= AcceptEventArg_Completed;    ProcessAccept(acceptEventArgs);   }   catch (Exception ex)   {    NetLogger.Log(string.Format("AcceptEventArg_Completed {0}***{1}", ex.Message, ex.StackTrace));   }  }   public bool StartAccept()  {   SocketAsyncEventArgs acceptEventArgs = new SocketAsyncEventArgs();   acceptEventArgs.Completed += AcceptEventArg_Completed;    bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArgs);   Interlocked.Increment(ref _acceptAsyncCount);    if (!willRaiseEvent)   {    Interlocked.Decrement(ref _acceptAsyncCount);    _acceptEvent.Set();    acceptEventArgs.Completed -= AcceptEventArg_Completed;    ProcessAccept(acceptEventArgs);   }   return true;  }   ObjectPool<AsyncSocketClient> _newSocketClientList = new ObjectPool<AsyncSocketClient>();  private void ProcessAccept(SocketAsyncEventArgs acceptEventArgs)  {   try   {    using (acceptEventArgs)    {     if (acceptEventArgs.AcceptSocket != null)     {      AsyncSocketClient client = new AsyncSocketClient(acceptEventArgs.AcceptSocket);      client.CreateClientInfo(this);       _newSocketClientList.PutObj(client);      _acceptEvent.Set();     }    }   }   catch (Exception ex)   {    NetLogger.Log(string.Format("ProcessAccept {0}***{1}", ex.Message, ex.StackTrace));   }  } }}

NetConnectManage连接处理

using System;using System.Net;using System.Net.Sockets;namespace IocpCore{ class NetConnectManage {  public event Action<SocketEventParam, AsyncSocketClient> OnSocketConnectEvent;  public bool ConnectAsyn(string peerIp, int peerPort, object tag)  {   try   {    Socket socket = new Socket(SocketType.Stream, ProtocolType.Tcp);    SocketAsyncEventArgs socketEventArgs = new SocketAsyncEventArgs();    socketEventArgs.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(peerIp), peerPort);    socketEventArgs.Completed += SocketConnect_Completed;    SocketClientInfo clientInfo = new SocketClientInfo();    socketEventArgs.UserToken = clientInfo;    clientInfo.PeerIp = peerIp;    clientInfo.PeerPort = peerPort;    clientInfo.Tag = tag;    bool willRaiseEvent = socket.ConnectAsync(socketEventArgs);    if (!willRaiseEvent)    {     ProcessConnect(socketEventArgs);     socketEventArgs.Completed -= SocketConnect_Completed;     socketEventArgs.Dispose();    }    return true;   }   catch (Exception ex)   {    NetLogger.Log("ConnectAsyn",ex);    return false;   }  }  private void SocketConnect_Completed(object sender, SocketAsyncEventArgs socketEventArgs)  {   ProcessConnect(socketEventArgs);   socketEventArgs.Completed -= SocketConnect_Completed;   socketEventArgs.Dispose();  }  private void ProcessConnect(SocketAsyncEventArgs socketEventArgs)  {   SocketClientInfo clientInfo = socketEventArgs.UserToken as SocketClientInfo;   if (socketEventArgs.SocketError == SocketError.Success)   {    DealConnectSocket(socketEventArgs.ConnectSocket, clientInfo);   }   else   {    SocketEventParam socketParam = new SocketEventParam(EN_SocketEvent.connect, null);    socketParam.ClientInfo = clientInfo;    OnSocketConnectEvent?.Invoke(socketParam, null);   }  }  void DealConnectSocket(Socket socket, SocketClientInfo clientInfo)  {   clientInfo.SetClientInfo(socket);   AsyncSocketClient client = new AsyncSocketClient(socket);   client.SetClientInfo(clientInfo);   //触发事件   SocketEventParam socketParam = new SocketEventParam(EN_SocketEvent.connect, socket);   socketParam.ClientInfo = clientInfo;   OnSocketConnectEvent?.Invoke(socketParam, client);  }  public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)  {   socket = null;   try   {    Socket socketTmp = new Socket(SocketType.Stream, ProtocolType.Tcp);    SocketClientInfo clientInfo = new SocketClientInfo();    clientInfo.PeerIp = peerIp;    clientInfo.PeerPort = peerPort;    clientInfo.Tag = tag;    EndPoint remoteEP = new IPEndPoint(IPAddress.Parse(peerIp), peerPort);    socketTmp.Connect(remoteEP);    if (!socketTmp.Connected)     return false;    DealConnectSocket(socketTmp, clientInfo);    socket = socketTmp;    return true;   }   catch (Exception ex)   {    NetLogger.Log(string.Format("连接对方:({0}:{1})出错!", peerIp, peerPort), ex);    return false;   }  } }}

AsyncSocketClient socket收发处理

using System;using System.Collections.Generic;using System.Diagnostics;using System.Net;using System.Net.Sockets;namespace IocpCore{ public class AsyncSocketClient {  public static int IocpReadLen = 1024;  public readonly Socket ConnectSocket;  protected SocketAsyncEventArgs m_receiveEventArgs;  public SocketAsyncEventArgs ReceiveEventArgs { get { return m_receiveEventArgs; } set { m_receiveEventArgs = value; } }  protected byte[] m_asyncReceiveBuffer;  protected SocketAsyncEventArgs m_sendEventArgs;  public SocketAsyncEventArgs SendEventArgs { get { return m_sendEventArgs; } set { m_sendEventArgs = value; } }  protected byte[] m_asyncSendBuffer;  public event Action<AsyncSocketClient, byte[]> OnReadData;  public event Action<AsyncSocketClient, int> OnSendData;  public event Action<AsyncSocketClient> OnSocketClose;  static object releaseLock = new object();  public static int createCount = 0;  public static int releaseCount = 0;  ~AsyncSocketClient()  {   lock (releaseLock)   {    releaseCount++;   }  }  public AsyncSocketClient(Socket socket)  {   lock (releaseLock)   {    createCount++;   }   ConnectSocket = socket;   m_receiveEventArgs = new SocketAsyncEventArgs();   m_asyncReceiveBuffer = new byte[IocpReadLen];   m_receiveEventArgs.AcceptSocket = ConnectSocket;   m_receiveEventArgs.Completed += ReceiveEventArgs_Completed;   m_sendEventArgs = new SocketAsyncEventArgs();   m_asyncSendBuffer = new byte[IocpReadLen * 2];   m_sendEventArgs.AcceptSocket = ConnectSocket;   m_sendEventArgs.Completed += SendEventArgs_Completed;  }  SocketClientInfo _clientInfo;  public SocketClientInfo ClientInfo  {   get   {    return _clientInfo;   }  }  internal void CreateClientInfo(NetListener netListener)  {   _clientInfo = new SocketClientInfo();   try   {    _clientInfo.Tag = netListener._listenParam._tag;    IPEndPoint ip = ConnectSocket.LocalEndPoint as IPEndPoint;    Debug.Assert(netListener._listenParam._port == ip.Port);    _clientInfo.LocalIp = ip.Address.ToString();    _clientInfo.LocalPort = netListener._listenParam._port;    ip = ConnectSocket.RemoteEndPoint as IPEndPoint;    _clientInfo.PeerIp = ip.Address.ToString();    _clientInfo.PeerPort = ip.Port;   }   catch (Exception ex)   {    NetLogger.Log("CreateClientInfo", ex);   }  }  internal void SetClientInfo(SocketClientInfo clientInfo)  {   _clientInfo = clientInfo;  }  #region read process  bool _inReadPending = false;  public EN_SocketReadResult ReadNextData()  {   lock (this)   {    if (_socketError)     return EN_SocketReadResult.ReadError;    if (_inReadPending)     return EN_SocketReadResult.InAsyn;    if(!ConnectSocket.Connected)    {     OnReadError();     return EN_SocketReadResult.ReadError;    }    try    {     m_receiveEventArgs.SetBuffer(m_asyncReceiveBuffer, 0, m_asyncReceiveBuffer.Length);     _inReadPending = true;     bool willRaiseEvent = ConnectSocket.ReceiveAsync(ReceiveEventArgs); //投递接收请求     if (!willRaiseEvent)     {      _inReadPending = false;      ProcessReceive();      if (_socketError)      {       OnReadError();       return EN_SocketReadResult.ReadError;      }      return EN_SocketReadResult.HaveRead;     }     else     {      return EN_SocketReadResult.InAsyn;     }    }    catch (Exception ex)    {     NetLogger.Log("ReadNextData", ex);     _inReadPending = false;     OnReadError();     return EN_SocketReadResult.ReadError;    }   }  }  private void ProcessReceive()  {   if (ReceiveEventArgs.BytesTransferred > 0    && ReceiveEventArgs.SocketError == SocketError.Success)   {    int offset = ReceiveEventArgs.Offset;    int count = ReceiveEventArgs.BytesTransferred;    byte[] readData = new byte[count];    Array.Copy(m_asyncReceiveBuffer, offset, readData, 0, count);    _inReadPending = false;    if (!_socketError)     OnReadData?.Invoke(this, readData);   }   else   {    _inReadPending = false;    OnReadError();   }  }  private void ReceiveEventArgs_Completed(object sender, SocketAsyncEventArgs e)  {   lock (this)   {    _inReadPending = false;    ProcessReceive();    if (_socketError)    {     OnReadError();    }   }  }  bool _socketError = false;  private void OnReadError()  {   lock (this)   {    if (_socketError == false)    {     _socketError = true;     OnSocketClose?.Invoke(this);    }    CloseClient();   }  }  #endregion  #region send process  int _sendBufferByteCount = 102400;  public int SendBufferByteCount  {   get   {    return _sendBufferByteCount;   }   set   {    if (value < 1024)    {     _sendBufferByteCount = 1024;    }    else    {     _sendBufferByteCount = value;    }   }  }  SendBufferPool _sendDataPool = new SendBufferPool();  internal EN_SendDataResult PutSendData(byte[] data)  {   if (_socketError)    return EN_SendDataResult.no_client;   if (_sendDataPool._bufferByteCount >= _sendBufferByteCount)   {    return EN_SendDataResult.buffer_overflow;   }   if (data.Length <= IocpReadLen)   {    _sendDataPool.PutObj(data);   }   else   {    List<byte[]> dataItems = SplitData(data, IocpReadLen);    foreach (byte[] item in dataItems)    {     _sendDataPool.PutObj(item);    }   }   return EN_SendDataResult.ok;  }  bool _inSendPending = false;  public EN_SocketSendResult SendNextData()  {   lock (this)   {    if (_socketError)    {     return EN_SocketSendResult.SendError;    }    if (_inSendPending)    {     return EN_SocketSendResult.InAsyn;    }    int sendByteCount = GetSendData();    if (sendByteCount == 0)    {     return EN_SocketSendResult.NoSendData;    }    //防止抛出异常,否则影响性能    if (!ConnectSocket.Connected)    {     OnSendError();     return EN_SocketSendResult.SendError;    }    try    {     m_sendEventArgs.SetBuffer(m_asyncSendBuffer, 0, sendByteCount);     _inSendPending = true;     bool willRaiseEvent = ConnectSocket.SendAsync(m_sendEventArgs);     if (!willRaiseEvent)     {      _inSendPending = false;      ProcessSend(m_sendEventArgs);      if (_socketError)      {       OnSendError();       return EN_SocketSendResult.SendError;      }      else      {       OnSendData?.Invoke(this, sendByteCount);       //继续发下一条       return EN_SocketSendResult.HaveSend;      }     }     else     {      return EN_SocketSendResult.InAsyn;     }    }    catch (Exception ex)    {     NetLogger.Log("SendNextData", ex);     _inSendPending = false;     OnSendError();     return EN_SocketSendResult.SendError;    }   }  }  private void SendEventArgs_Completed(object sender, SocketAsyncEventArgs sendEventArgs)  {   lock (this)   {    try    {     _inSendPending = false;     ProcessSend(m_sendEventArgs);     int sendCount = 0;     if (sendEventArgs.SocketError == SocketError.Success)     {      sendCount = sendEventArgs.BytesTransferred;     }     OnSendData?.Invoke(this, sendCount);     if (_socketError)     {      OnSendError();     }    }    catch (Exception ex)    {     NetLogger.Log("SendEventArgs_Completed", ex);    }   }  }  private bool ProcessSend(SocketAsyncEventArgs sendEventArgs)  {   if (sendEventArgs.SocketError == SocketError.Success)   {    return true;   }   else   {    OnSendError();    return false;   }  }  private int GetSendData()  {   int dataLen = 0;   while (true)   {    byte[] data = _sendDataPool.GetObj();    if (data == null)     return dataLen;    Array.Copy(data, 0, m_asyncSendBuffer, dataLen, data.Length);    dataLen += data.Length;    if (dataLen > IocpReadLen)     break;   }   return dataLen;  }  private void OnSendError()  {   lock (this)   {    if (_socketError == false)    {     _socketError = true;     OnSocketClose?.Invoke(this);    }    CloseClient();   }  }  #endregion  internal void CloseSocket()  {   try   {    ConnectSocket.Close();   }   catch (Exception ex)   {    NetLogger.Log("CloseSocket", ex);   }  }  static object socketCloseLock = new object();  public static int closeSendCount = 0;  public static int closeReadCount = 0;  bool _disposeSend = false;  void CloseSend()  {   if (!_disposeSend && !_inSendPending)   {    lock (socketCloseLock)     closeSendCount++;    _disposeSend = true;    m_sendEventArgs.SetBuffer(null, 0, 0);    m_sendEventArgs.Completed -= SendEventArgs_Completed;    m_sendEventArgs.Dispose();   }  }  bool _disposeRead = false;  void CloseRead()  {   if (!_disposeRead && !_inReadPending)   {    lock (socketCloseLock)     closeReadCount++;    _disposeRead = true;    m_receiveEventArgs.SetBuffer(null, 0, 0);    m_receiveEventArgs.Completed -= ReceiveEventArgs_Completed;    m_receiveEventArgs.Dispose();   }  }  private void CloseClient()  {   try   {    CloseSend();    CloseRead();    ConnectSocket.Close();   }   catch (Exception ex)   {    NetLogger.Log("CloseClient", ex);   }  }  //发送缓冲大小  private List<byte[]> SplitData(byte[] data, int maxLen)  {   List<byte[]> items = new List<byte[]>();   int start = 0;   while (true)   {    int itemLen = Math.Min(maxLen, data.Length - start);    if (itemLen == 0)     break;    byte[] item = new byte[itemLen];    Array.Copy(data, start, item, 0, itemLen);    items.Add(item);    start += itemLen;   }   return items;  } } public enum EN_SocketReadResult {  InAsyn,  HaveRead,  ReadError } public enum EN_SocketSendResult {  InAsyn,  HaveSend,  NoSendData,  SendError } class SendBufferPool {  ObjectPool<byte[]> _bufferPool = new ObjectPool<byte[]>();  public Int64 _bufferByteCount = 0;  public bool PutObj(byte[] obj)  {   if (_bufferPool.PutObj(obj))   {    lock (this)    {     _bufferByteCount += obj.Length;    }    return true;   }   else   {    return false;   }  }  public byte[] GetObj()  {   byte[] result = _bufferPool.GetObj();   if (result != null)   {    lock (this)    {     _bufferByteCount -= result.Length;    }   }   return result;  } }}

NetServer 聚合其他类

using System;using System.Collections.Generic;using System.Diagnostics;using System.Linq;using System.Net.Sockets;using System.Threading;namespace IocpCore{ public class NetServer {  public Action<SocketEventParam> OnSocketPacketEvent;  //每个连接发送缓冲大小  public int SendBufferBytePerClient { get; set; } = 1024 * 100;  bool _serverStart = false;  List<NetListener> _listListener = new List<NetListener>();  //负责对收到的字节流 组成完成的包  ClientPacketManage _clientPacketManage;  public Int64 SendByteCount { get; set; }  public Int64 ReadByteCount { get; set; }  List<ListenParam> _listListenPort = new List<ListenParam>();  public void AddListenPort(int port, object tag)  {   _listListenPort.Add(new ListenParam(port, tag));  }  /// <summary>  ///   /// </summary>  /// <param name="listenFault">监听失败的端口</param>  /// <returns></returns>  public bool StartListen(out List<int> listenFault)  {   _serverStart = true;   _clientPacketManage = new ClientPacketManage(this);   _clientPacketManage.OnSocketPacketEvent += PutClientPacket;   _netConnectManage.OnSocketConnectEvent += SocketConnectEvent;   _listListener.Clear();   Thread thread1 = new Thread(new ThreadStart(NetPacketProcess));   thread1.Start();   Thread thread2 = new Thread(new ThreadStart(NetSendProcess));   thread2.Start();   Thread thread3 = new Thread(new ThreadStart(NetReadProcess));   thread3.Start();   listenFault = new List<int>();   foreach (ListenParam param in _listListenPort)   {    NetListener listener = new NetListener(this);    listener._listenParam = param;    listener.OnAcceptSocket += Listener_OnAcceptSocket;    if (!listener.StartListen())    {     listenFault.Add(param._port);    }    else    {     _listListener.Add(listener);     NetLogger.Log(string.Format("监听成功!端口:{0}", param._port));    }   }   return listenFault.Count == 0;  }  public void PutClientPacket(SocketEventParam param)  {   OnSocketPacketEvent?.Invoke(param);  }  //获取包的最小长度  int _packetMinLen;  int _packetMaxLen;  public int PacketMinLen  {   get { return _packetMinLen; }  }  public int PacketMaxLen  {   get { return _packetMaxLen; }  }  /// <summary>  /// 设置包的最小和最大长度  /// 当minLen=0时,认为是接收字节流  /// </summary>  /// <param name="minLen"></param>  /// <param name="maxLen"></param>  public void SetPacketParam(int minLen, int maxLen)  {   Debug.Assert(minLen >= 0);   Debug.Assert(maxLen > minLen);   _packetMinLen = minLen;   _packetMaxLen = maxLen;  }  //获取包的总长度  public delegate int delegate_GetPacketTotalLen(byte[] data, int offset);  public delegate_GetPacketTotalLen GetPacketTotalLen_Callback;  ObjectPoolWithEvent<SocketEventParam> _socketEventPool = new ObjectPoolWithEvent<SocketEventParam>();  private void NetPacketProcess()  {   while (_serverStart)   {    try    {     DealEventPool();    }    catch (Exception ex)    {     NetLogger.Log(string.Format("DealEventPool 异常 {0}***{1}", ex.Message, ex.StackTrace));    }    _socketEventPool.WaitOne(1000);   }  }  Dictionary<Socket, AsyncSocketClient> _clientGroup = new Dictionary<Socket, AsyncSocketClient>();  public int ClientCount  {   get   {    lock (_clientGroup)    {     return _clientGroup.Count;    }   }  }  public List<Socket> ClientList  {   get   {    lock (_clientGroup)    {     return _clientGroup.Keys.ToList();    }   }  }  private void DealEventPool()  {   while (true)   {    SocketEventParam param = _socketEventPool.GetObj();    if (param == null)     return;    if (param.SocketEvent == EN_SocketEvent.close)    {     lock (_clientGroup)     {      _clientGroup.Remove(param.Socket);     }    }    if (_packetMinLen == 0)//字节流处理    {     OnSocketPacketEvent?.Invoke(param);    }    else    {     //组成一个完整的包 逻辑     _clientPacketManage.PutSocketParam(param);    }   }  }  private void SocketConnectEvent(SocketEventParam param, AsyncSocketClient client)  {   try   {    if (param.Socket == null || client == null) //连接失败    {         }    else    {     lock (_clientGroup)     {      bool remove = _clientGroup.Remove(client.ConnectSocket);      Debug.Assert(!remove);      _clientGroup.Add(client.ConnectSocket, client);     }     client.OnSocketClose += Client_OnSocketClose;     client.OnReadData += Client_OnReadData;     client.OnSendData += Client_OnSendData;     _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));    }    _socketEventPool.PutObj(param);   }   catch (Exception ex)   {    NetLogger.Log(string.Format("SocketConnectEvent 异常 {0}***{1}", ex.Message, ex.StackTrace));   }  }  internal void OnRcvPacketLenError(Socket socket, byte[] buffer, int offset, int packetLen)  {   try   {    lock (_clientGroup)    {     if (!_clientGroup.ContainsKey(socket))     {      Debug.Assert(false);      return;     }     NetLogger.Log(string.Format("报长度异常!包长:{0}", packetLen));     AsyncSocketClient client = _clientGroup[socket];     client.CloseSocket();    }   }   catch (Exception ex)   {    NetLogger.Log(string.Format("OnRcvPacketLenError 异常 {0}***{1}", ex.Message, ex.StackTrace));   }  }  #region listen port  private void Listener_OnAcceptSocket(ListenParam listenPatam, AsyncSocketClient client)  {   try   {    lock (_clientGroup)    {     bool remove = _clientGroup.Remove(client.ConnectSocket);     Debug.Assert(!remove);     _clientGroup.Add(client.ConnectSocket, client);    }    client.OnSocketClose += Client_OnSocketClose;    client.OnReadData += Client_OnReadData;    client.OnSendData += Client_OnSendData;    _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));    SocketEventParam param = new SocketEventParam(EN_SocketEvent.accept, client.ConnectSocket);    param.ClientInfo = client.ClientInfo;    _socketEventPool.PutObj(param);   }   catch (Exception ex)   {    NetLogger.Log(string.Format("Listener_OnAcceptSocket 异常 {0}***{1}", ex.Message, ex.StackTrace));   }  }  ObjectPoolWithEvent<SocketEventDeal> _listSendEvent = new ObjectPoolWithEvent<SocketEventDeal>();  private void NetSendProcess()  {   while (true)   {    DealSendEvent();    _listSendEvent.WaitOne(1000);   }  }  ObjectPoolWithEvent<SocketEventDeal> _listReadEvent = new ObjectPoolWithEvent<SocketEventDeal>();  private void NetReadProcess()  {   while (true)   {    DealReadEvent();    _listReadEvent.WaitOne(1000);   }  }    private void DealSendEvent()  {   while (true)   {    SocketEventDeal item = _listSendEvent.GetObj();    if (item == null)     break;    switch (item.SocketEvent)    {     case EN_SocketDealEvent.send:      {       while (true)       {        EN_SocketSendResult result = item.Client.SendNextData();        if (result == EN_SocketSendResult.HaveSend)         continue;        else         break;       }      }      break;     case EN_SocketDealEvent.read:      {       Debug.Assert(false);      }      break;         }   }  }  private void DealReadEvent()  {   while (true)   {    SocketEventDeal item = _listReadEvent.GetObj();    if (item == null)     break;    switch (item.SocketEvent)    {     case EN_SocketDealEvent.read:      {       while (true)       {        EN_SocketReadResult result = item.Client.ReadNextData();        if (result == EN_SocketReadResult.HaveRead)         continue;        else         break;       }      }      break;     case EN_SocketDealEvent.send:      {       Debug.Assert(false);      }      break;    }   }  }  private void Client_OnReadData(AsyncSocketClient client, byte[] readData)  {   //读下一条   _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));   try   {    SocketEventParam param = new SocketEventParam(EN_SocketEvent.read, client.ConnectSocket);    param.ClientInfo = client.ClientInfo;    param.Data = readData;    _socketEventPool.PutObj(param);    lock (this)    {     ReadByteCount += readData.Length;    }   }   catch (Exception ex)   {    NetLogger.Log(string.Format("Client_OnReadData 异常 {0}***{1}", ex.Message, ex.StackTrace));   }  }#endregion  private void Client_OnSendData(AsyncSocketClient client, int sendCount)  {   //发送下一条   _listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send));   lock (this)   {    SendByteCount += sendCount;   }  }  private void Client_OnSocketClose(AsyncSocketClient client)  {   try   {    SocketEventParam param = new SocketEventParam(EN_SocketEvent.close, client.ConnectSocket);    param.ClientInfo = client.ClientInfo;    _socketEventPool.PutObj(param);   }   catch (Exception ex)   {    NetLogger.Log(string.Format("Client_OnSocketClose 异常 {0}***{1}", ex.Message, ex.StackTrace));   }  }  /// <summary>  /// 放到发送缓冲  /// </summary>  /// <param name="socket"></param>  /// <param name="data"></param>  /// <returns></returns>  public EN_SendDataResult SendData(Socket socket, byte[] data)  {   if (socket == null)    return EN_SendDataResult.no_client;   lock (_clientGroup)   {    if (!_clientGroup.ContainsKey(socket))     return EN_SendDataResult.no_client;    AsyncSocketClient client = _clientGroup[socket];    EN_SendDataResult result = client.PutSendData(data);    if (result == EN_SendDataResult.ok)    {     //发送下一条     _listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send));         }    return result;   }  }  /// <summary>  /// 设置某个连接的发送缓冲大小  /// </summary>  /// <param name="socket"></param>  /// <param name="byteCount"></param>  /// <returns></returns>  public bool SetClientSendBuffer(Socket socket, int byteCount)  {   lock (_clientGroup)   {    if (!_clientGroup.ContainsKey(socket))     return false;    AsyncSocketClient client = _clientGroup[socket];    client.SendBufferByteCount = byteCount;    return true;   }  }  #region connect process  NetConnectManage _netConnectManage = new NetConnectManage();  /// <summary>  /// 异步连接一个客户端  /// </summary>  /// <param name="peerIp"></param>  /// <param name="peerPort"></param>  /// <param name="tag"></param>  /// <returns></returns>  public bool ConnectAsyn(string peerIp, int peerPort, object tag)  {   return _netConnectManage.ConnectAsyn(peerIp, peerPort, tag);  }  /// <summary>  /// 同步连接一个客户端  /// </summary>  /// <param name="peerIp"></param>  /// <param name="peerPort"></param>  /// <param name="tag"></param>  /// <param name="socket"></param>  /// <returns></returns>  public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)  {   return _netConnectManage.Connect(peerIp, peerPort, tag, out socket);  }  #endregion } enum EN_SocketDealEvent {  read,  send, } class SocketEventDeal {  public AsyncSocketClient Client { get; set; }  public EN_SocketDealEvent SocketEvent { get; set; }  public SocketEventDeal(AsyncSocketClient client, EN_SocketDealEvent socketEvent)  {   Client = client;   SocketEvent = socketEvent;  } }}

库的使用

使用起来非常简单,示例如下

using IocpCore;using System;using System.Collections.Generic;using System.Linq;using System.Net.Sockets;using System.Text;using System.Threading.Tasks;using System.Windows;namespace WarningClient{ public class SocketServer {  public Action<SocketEventParam> OnSocketEvent;  public Int64 SendByteCount  {   get   {    if (_netServer == null)     return 0;    return _netServer.SendByteCount;   }  }  public Int64 ReadByteCount  {   get   {    if (_netServer == null)     return 0;    return _netServer.ReadByteCount;   }  }  NetServer _netServer;  EN_PacketType _packetType = EN_PacketType.byteStream;  public void SetPacktType(EN_PacketType packetType)  {   _packetType = packetType;   if (_netServer == null)    return;   if (packetType == EN_PacketType.byteStream)   {    _netServer.SetPacketParam(0, 1024);   }   else   {    _netServer.SetPacketParam(9, 1024);   }  }  public bool Init(List<int> listenPort)  {   NetLogger.OnLogEvent += NetLogger_OnLogEvent;   _netServer = new NetServer();   SetPacktType(_packetType);   _netServer.GetPacketTotalLen_Callback += GetPacketTotalLen;   _netServer.OnSocketPacketEvent += SocketPacketDeal;   foreach (int n in listenPort)   {    _netServer.AddListenPort(n, n);   }   List<int> listenFault;   bool start = _netServer.StartListen(out listenFault);   return start;  }  int GetPacketTotalLen(byte[] data, int offset)  {   if (MainWindow._packetType == EN_PacketType.znss)    return GetPacketZnss(data, offset);   else    return GetPacketAnzhiyuan(data, offset);  }  int GetPacketAnzhiyuan(byte[] data, int offset)  {   int n = data[offset + 5] + 6;   return n;  }  int GetPacketZnss(byte[] data, int offset)  {   int packetLen = (int)(data[4]) + 5;   return packetLen;  }  public bool ConnectAsyn(string peerIp, int peerPort, object tag)  {   return _netServer.ConnectAsyn(peerIp, peerPort, tag);  }  public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)  {   return _netServer.Connect(peerIp, peerPort, tag, out socket);  }  private void NetLogger_OnLogEvent(string message)  {   AppLog.Log(message);  }  Dictionary<Socket, SocketEventParam> _clientGroup = new Dictionary<Socket, SocketEventParam>();  public int ClientCount  {   get   {    lock (_clientGroup)    {     return _clientGroup.Count;    }   }  }  public List<Socket> ClientList  {   get   {    if (_netServer != null)     return _netServer.ClientList;    return new List<Socket>();   }  }  void AddClient(SocketEventParam socketParam)  {   lock (_clientGroup)   {    _clientGroup.Remove(socketParam.Socket);    _clientGroup.Add(socketParam.Socket, socketParam);   }  }  void RemoveClient(SocketEventParam socketParam)  {   lock (_clientGroup)   {    _clientGroup.Remove(socketParam.Socket);   }  }  ObjectPool<SocketEventParam> _readDataPool = new ObjectPool<SocketEventParam>();  public ObjectPool<SocketEventParam> ReadDataPool  {   get   {    return _readDataPool;   }  }  private void SocketPacketDeal(SocketEventParam socketParam)  {   OnSocketEvent?.Invoke(socketParam);   if (socketParam.SocketEvent == EN_SocketEvent.read)   {    if (MainWindow._isShowReadPacket)     _readDataPool.PutObj(socketParam);   }   else if (socketParam.SocketEvent == EN_SocketEvent.accept)   {    AddClient(socketParam);    string peerIp = socketParam.ClientInfo.PeerIpPort;    AppLog.Log(string.Format("客户端链接!本地端口:{0},对端:{1}",     socketParam.ClientInfo.LocalPort, peerIp));   }   else if (socketParam.SocketEvent == EN_SocketEvent.connect)   {    string peerIp = socketParam.ClientInfo.PeerIpPort;    if (socketParam.Socket != null)    {     AddClient(socketParam);     AppLog.Log(string.Format("连接对端成功!本地端口:{0},对端:{1}",      socketParam.ClientInfo.LocalPort, peerIp));    }    else    {     AppLog.Log(string.Format("连接对端失败!本地端口:{0},对端:{1}",      socketParam.ClientInfo.LocalPort, peerIp));    }   }   else if (socketParam.SocketEvent == EN_SocketEvent.close)   {    MainWindow.MainWnd.OnSocketDisconnect(socketParam.Socket);    RemoveClient(socketParam);    string peerIp = socketParam.ClientInfo.PeerIpPort;    AppLog.Log(string.Format("客户端断开!本地端口:{0},对端:{1},",     socketParam.ClientInfo.LocalPort, peerIp));   }  }  public EN_SendDataResult SendData(Socket socket, byte[] data)  {   if(socket == null)   {    MessageBox.Show("还没连接!");    return EN_SendDataResult.no_client;   }   return _netServer.SendData(socket, data);  }  internal void SendToAll(byte[] data)  {   lock (_clientGroup)   {    foreach (Socket socket in _clientGroup.Keys)    {     SendData(socket, data);    }   }  } }}

以上这篇C#中一个高性能异步socket封装库的实现思路分享就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持武林网。

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表