Helios是一套高性能的Socket通信中间件,使用C#编写。Helios的开发受到Netty的启发,使用非阻塞的事件驱动模型架构来实现高并发高吞吐量。Helios为我们大大的简化了Socket编程,它已经为我们处理好了高并发情况下的解包,粘包,buffer管理等等。
GitHub:https://github.com/helios-io/helios/
为避免误会特别提示:helios不是本人作品,小弟还在努力的路上。
Takes the complexity out of socket PRogramming with intelligent I/O, concurrency, buffer management, and pipelining APIs.
使用socket编程不再复杂。提供智能的I/O,并发,buffer管理,管道形式的API。
Helios is Reactive - it uses a event-driven architecture to simplify development and build responsive systems that scale.
Helios是反应式的,它使用事件驱动的架构来简化开发和构建易伸缩的系统。
Performance is a cross-cutting concern we factor in at every level in the design of the framework in order to eliminate overhead for your apps and clients.
这个系统在开发和设计的时候都充分考虑到了性能,构建你的app和client的时候请消除这方面的顾虑。
Helios powers the clustering and remoting capbilities built into Akka.NET and more.
Akka.net的集群,远程功能构建在Helios之上。
要用来演示Socket通信那么最好的示例无非就是聊天程序了。
整个解决方案包含3个项目:
这个项目里是一些公共的类型,新建完之后使用nuget添加helios的库
Message 类:所有发送的消息都是通过Message包装的,每一个消息都有一个Command跟Content来构成。
public class Message { public Command Command { get; set; } public string Content { get; set; } }
Command枚举:用来描述消息的命令
public enum Command { Join, Send, }
MessageConverter静态类:这个类用来转换Message对象为Byte[],或者把Byte[]转换成Message对象。Message对象在通过Helios传输的时候需要先转成Byte[],所以我们需要自己定义包的格式。我们用Byte[]的前四位来存放Command,Content转成Byte后从第5位开始存放。
public class MessageConverter { public static Message ToMessage(NetworkData data) { try { var commandData = data.Buffer.Take(4).ToArray(); var contentData = data.Buffer.Skip(4).Take(data.Buffer.Length - 4).ToArray(); var command = BitConverter.ToInt32(commandData,0); var content = Encoding.UTF8.GetString(contentData); return new Message() { Command = (Command)command, Content = content }; } catch (Exception exc) { Console.WriteLine("Cant convert NetworkData to Message : {0}", exc.Message); } return null; } public static byte[] ToBytes(Message message) { try { var commandBytes = BitConverter.GetBytes((int)message.Command); var messageBytes = Encoding.UTF8.GetBytes(message.Content); var bytes = new byte[commandBytes.Length + messageBytes.Length]; commandBytes.CopyTo(bytes, 0); messageBytes.CopyTo(bytes, commandBytes.Length); return bytes; } catch (Exception exc) { Console.WriteLine("Cant convert message to bytes : {0}", exc.Message); } return null; } }
不用说也知道,这是聊天室的服务端,负责连接用户及转发消息。
internal class Program { private static readonly ConcurrentDictionary<string, IConnection> Clients = new ConcurrentDictionary<string, IConnection>(); private static void Main(string[] args) { var host = IPAddress.Any; var port = 9991; Console.Title = "Server"; Console.WriteLine("Starting server on {0}:{1}", host, port); var serverFactory = new ServerBootstrap() .SetTransport(TransportType.Tcp) .Build(); var server = serverFactory.NewReactor(NodeBuilder.BuildNode().Host(host).WithPort(port)); server.OnConnection += (address, connection) => { Console.WriteLine("Connected: {0}", address); connection.BeginReceive(Receive); }; server.OnDisconnection += (reason, address) => Console.WriteLine("Disconnected: {0}; Reason: {1}", address.RemoteHost, reason.Type); server.Start(); Console.WriteLine("Running, press any key to exit"); Console.ReadKey(); } /// <summary> /// 处理接受到的消息 /// </summary> /// <param name="data"></param> /// <param name="channel"></param> public static void Receive(NetworkData data, IConnection channel) { var message = MessageConverter.ToMessage(data); switch (message.Command) { case Command.Join: JoinGroup(message.Content, channel); break; case Command.Send: Broadcast(message.Content); break; } } public static void JoinGroup(string clientName, IConnection channel) { if (Clients.TryAdd(clientName, channel)) { Broadcast(string.Format("{0} join group successful .", clientName)); } else { var errMsg = new Message() { Command = Command.Send, Content = "client name is used." }; SendMessage(channel, errMsg); } } /// <summary> /// 广播消息 /// </summary> /// <param name="clientMessage"></param> public static void Broadcast(string clientMessage) { Console.WriteLine(clientMessage); var clientName = clientMessage.Split(':')[0]; var message = new Message { Command = Command.Send, Content = clientMessage }; foreach (var client in Clients) { if (client.Key != clientName) { SendMessage(client.Value, message); } } } public static void SendMessage(IConnection connection, Message message) { var messageBytes = MessageConverter.ToBytes(message); connection.Send(new NetworkData { Buffer = messageBytes, Length = messageBytes.Length }); } }
聊天服务的客户端
internal class Program { public static IConnection Client; public static string ClientName; private static void Main(string[] args) { var host = IPAddress.Loopback; var port = 9991; var connectionFactory = new ClientBootstrap() .SetTransport(TransportType.Tcp).Build(); //New一个Client Client = connectionFactory.NewConnection(Node.Empty(), NodeBuilder.BuildNode().Host(host).WithPort(port)); Client.OnConnection += (address, connection) => { Console.WriteLine("Connect server successful."); connection.BeginReceive(Received); }; Client.OnDisconnection += (address, reason) => Console.WriteLine("Disconnected."); Console.WriteLine("Input ClientName "); ClientName = Console.ReadLine(); Console.Title = string.Format("Client {0}", ClientName); //建立连接 Client.Open(); //加入聊天组 Join(); //等待输入 WaitInput(); } public static void WaitInput() { while (true) { var input = Console.ReadLine(); if (!string.IsNullOrEmpty(input)) { var message = MakeSendMessage(input); SendMessage(Client, message); } } } /// <summary> /// Jion chat group /// </summary> public static void Join() { var message = MakeJoinMessage(); SendMessage(Client,message); } /// <summary> /// 处理接受到的消息 /// </summary> /// <param name="data"></param> /// <param name="responseChannel"></param> public static void Received(NetworkData data, IConnection responseChannel) { var message = MessageConverter.ToMessage(data); if (message.Command == Command.Send) { Console.WriteLine(message.Content); } } /// <summary> /// 构造聊天消息 /// </summary> /// <param name="input"></param> /// <returns></returns> public static Message MakeSendMessage(string input) { return new Message { Command = Command.Send, Content = string.Format("{0}:{1}", ClientName, input) }; } /// <summary> /// 构造加入组的消息 /// </summary> /// <returns></returns> public static Message MakeJoinMessage() { var message = new Message(); message.Command = Command.Join; message.Content = ClientName; return message; } public static void SendMessage(IConnection connection, Message message) { var messageBytes = MessageConverter.ToBytes(message); connection.Send(new NetworkData { Buffer = messageBytes, Length = messageBytes.Length }); } }
这样一个简单的聊天室程序就完成了。
helios 1.0的异步编程模型是基于APM的,从helios 2.0开始会改成SocketAsyncEventArgs方式来实现异步。SocketAsyncEventArgs底层封装了IOCP,IOCP是Windows server上Socket通讯性能最高的技术,使用了IOCP的helios 2.0势必具有更高的性能,所以对于helios 2.0还是非常期待的。
示例下载:http://files.VEVb.com/files/kklldog/HeliosChat.7z
新闻热点
疑难解答