首页 > 开发 > 综合 > 正文

c#中异步基于消息通信的完成端口的TCP/IP协议的组件实现(源代码)

2024-07-21 02:19:06
字体:
来源:转载
供稿:网友
源代码:

客户端:

using system;
using system.io;
using system.componentmodel;
using system.collections;
using system.diagnostics;
using system.net;
using system.net.sockets;
using system.threading;

namespace mykj
{
?///
?/// mytcpipclient 提供在net tcp_ip 协议上基于消息的客户端
?///
?public class mytcpipclient : system.componentmodel.component
?{
??private int buffersize=2048;
??private string tcpipserverip="127.0.0.1";
??private int tcpipserverport=11000;
??private socket clientsocket=null;
??private manualresetevent connectdone = new manualresetevent(false);
??private manualresetevent senddone = new manualresetevent(false);
??
??private void connectcallback(iasyncresult ar)
??{
???try
???{
????socket client = (socket) ar.asyncstate;
????client.endconnect(ar);
????
???}
???catch (exception e)
???{
????onerrorevent(new erroreventargs(e));
???}
???finally
???{
????connectdone.set();
???}
??}
??private void sendcallback(iasyncresult ar)
??{
???try
???{
????socket client = (socket) ar.asyncstate;
????int bytessent = client.endsend(ar);
????//console.writeline(bytessent);
???}
???catch (exception e)
???{
????onerrorevent(new erroreventargs(e));
???}
???finally
???{
????senddone.set();
???}
??}
??private void receivecallback(iasyncresult ar)
??{
???socket handler=null;
???try
???{
????lock(ar)
????{
?????stateobject state = (stateobject) ar.asyncstate;
?????handler = state.worksocket;
?????
?????int bytesread = handler.endreceive(ar);
?????
?????if (bytesread > 0)
?????{
??????int readpiont=0;?
??????while(readpiont??????{?
???????if(state.cortrol==0 && readpiont???????{
????????long bi1=state.buffer[readpiont];
????????bi1=(bi1<<24)&0xff000000;
????????state.packsize=bi1;
????????readpiont++;
????????state.cortrol=1;
???????}
??????
???????if(state.cortrol==1 && readpiont???????{
????????long bi1=state.buffer[readpiont];
????????bi1=(bi1<<16)&0x00ff0000;
????????state.packsize=state.packsize+bi1;
????????readpiont++;
????????state.cortrol=2;
???????}
??????
???????if(state.cortrol==2 && readpiont???????{
????????long bi1=state.buffer[readpiont];
????????bi1=(bi1<<8)&0x0000ff00;
????????state.packsize=state.packsize+bi1;
????????readpiont++;
????????state.cortrol=3;
???????}
???????
???????if(state.cortrol==3 && readpiont???????{
????????long bi1=state.buffer[readpiont];
????????bi1=bi1&0xff;
????????state.packsize=state.packsize+bi1-4;
????????readpiont++;
????????state.cortrol=4;
???????}
???????
???????if(state.cortrol==4 && readpiont???????{
????????long bi1=state.buffer[readpiont];
????????bi1=(bi1<<24)&0xff000000;
????????state.residualsize=bi1;
????????readpiont++;
????????state.cortrol=5;
????????state.packsize-=1;
???????}
???????
???????if(state.cortrol==5 && readpiont???????{
????????long bi1=state.buffer[readpiont];
????????bi1=(bi1<<16)&0x00ff0000;
????????state.residualsize=state.residualsize+bi1;
????????readpiont++;
????????state.cortrol=6;
????????state.packsize-=1;
???????}
???????
???????if(state.cortrol==6 && readpiont???????{
????????long bi1=state.buffer[readpiont];
????????bi1=(bi1<<8)&0x0000ff00;
????????state.residualsize=state.residualsize+bi1;
????????readpiont++;
????????state.cortrol=7;
????????state.packsize-=1;
???????}
???????if(state.cortrol==7 && readpiont???????{
????????long bi1=state.buffer[readpiont];
????????bi1=bi1&0xff;
????????state.residualsize=state.residualsize+bi1;
????????state.datastream.setlength(0);
????????state.datastream.position=0;
????????
????????readpiont++;
????????state.cortrol=8;
????????state.packsize-=1;
???????}
???????
???????if(state.cortrol==8 && readpiont???????{
????????int bi1=bytesread-readpiont;
????????int bi2=(int)(state.residualsize-state.datastream.length);
????????if(bi1>=bi2)
????????{
?????????state.datastream.write(state.buffer,readpiont,bi2);
?????????readpiont+=bi2;
?????????oninceptevent(new incepteventargs(state.datastream,handler));
?????????state.cortrol=9;
?????????state.packsize-=bi2;
?????????}
????????else
????????{
?????????state.datastream.write(state.buffer,readpiont,bi1);
?????????readpiont+=bi1;
?????????state.packsize-=bi1;
????????}
???????}
???????if(state.cortrol==9 && readpiont???????{
????????int bi1=bytesread-readpiont;
????????if(bi1????????{
?????????state.packsize=state.packsize-bi1;
?????????readpiont+=bi1;
????????}?
????????else
????????{
?????????state.cortrol=0;
?????????readpiont+=(int)state.packsize;
????????}
???????}
??????}
?????}
?????else
?????{
??????throw(new exception("读入的数据小于1bit"));
?????}
?????if(handler.connected==true)
?????{
??????handler.beginreceive(state.buffer,0,buffersize,0,
???????new asynccallback(receivecallback), state);
?????}
????}
???}
???catch (exception e)
???{
????onerrorevent(new erroreventargs(e));
????
???}
??}
??
??///
??/// 连接服务器
??///
??public void conn()
??{
???try
???{
????clientsocket=new socket(addressfamily.internetwork,sockettype.stream,protocoltype.tcp);?
????ipaddress ipaddress = ipaddress.parse(tcpipserverip);
????ipendpoint remoteep = new ipendpoint(ipaddress, tcpipserverport);
????connectdone.reset();
????clientsocket.beginconnect(remoteep,new asynccallback(connectcallback),clientsocket);
????connectdone.waitone();
????stateobject state = new stateobject(buffersize,clientsocket);
????clientsocket.beginreceive(state.buffer,0,buffersize,0,
?????new asynccallback(receivecallback), state);?
???}
???catch(exception e)
???{
????onerrorevent(new erroreventargs(e));
???}
???
??}
??///
??/// 断开连接
??///
??public void close()
??{
???try
???{
????clientsocket.shutdown(socketshutdown.both);
????clientsocket.close();
???}
???catch(exception e)
???{
????onerrorevent(new erroreventargs(e));
???}
???
??}
??///
??/// 发送一个流数据
??///
??/// 数据流
??public void send(stream astream)
??{
???try
???{
????if(clientsocket.connected==false)
????{
?????throw(new exception("没有连接服务器不可以发送信息!"));
????}
????astream.position=0;
????byte[] bytedata=new byte[buffersize];
????int bi1=(int)((astream.length+8)/buffersize);
????int bi2=(int)astream.length;
????if(((astream.length+8)%buffersize)>0)
????{
?????bi1=bi1+1;
????}
????bi1=bi1*buffersize;
????
????bytedata[0]=system.convert.tobyte(bi1>>24);
????bytedata[1]=system.convert.tobyte((bi1&0x00ff0000)>>16);
????bytedata[2]=system.convert.tobyte((bi1&0x0000ff00)>>8);
????bytedata[3]=system.convert.tobyte((bi1&0x000000ff));
????
????bytedata[4]=system.convert.tobyte(bi2>>24);
????bytedata[5]=system.convert.tobyte((bi2&0x00ff0000)>>16);
????bytedata[6]=system.convert.tobyte((bi2&0x0000ff00)>>8);
????bytedata[7]=system.convert.tobyte((bi2&0x000000ff));
????
????int n = astream.read(bytedata, 8, bytedata.length-8);
????
????while (n>0)
????{
?????clientsocket.beginsend(bytedata, 0, bytedata.length, 0,?new asynccallback(sendcallback), clientsocket);
?????senddone.waitone();
?????bytedata=new byte[buffersize];
?????n = astream.read(bytedata,0,bytedata.length);
????}
???}
???catch (exception e)
???{
????onerrorevent(new erroreventargs(e));
???}
??}
??
??///
??/// 构造
??///
??/// 父控件
??public mytcpipclient(system.componentmodel.icontainer container)
??{
???container.add(this);
???initializecomponent();

???//
???// todo: 在 initializecomponent 调用后添加任何构造函数代码
???//
??}
??///
??/// 构造
??///
??public mytcpipclient()
??{
???initializecomponent();

???//
???// todo: 在 initializecomponent 调用后添加任何构造函数代码
???//
??}

??#region component designer generated code
??///
??/// 设计器支持所需的方法 - 不要使用代码编辑器修改
??/// 此方法的内容。
??///
??private void initializecomponent()
??{

??}
??#endregion

??///
??/// 要连接的服务器ip地址
??///
??public string tcpipserverip
??{
???get
???{
????return tcpipserverip;
???}
???set
???{
????tcpipserverip=value;
???}
??}

??///
??/// 要连接的服务器所使用的端口
??///
??public int tcpipserverport
??{
???get
???{
????return tcpipserverport;
???}
???set
???{
????tcpipserverport=value;
???}
??}

??///
??/// 缓冲器大小
??///
??public int buffersize
??{
???get
???{
????return buffersize;
???}
???set
???{
????buffersize=value;
???}
??}
??
??///
??/// 连接的活动状态
??///
??public bool activ
??{
???get
???{
????if(clientsocket==null)
????{
?????return false;
????}
????return clientsocket.connected;
???}
??}
??///
??/// 接收到数据引发的事件
??///
??public event inceptevent incept;
??///
??/// 引发接收数据事件
??///
??/// 接收数据
??protected virtual void oninceptevent(incepteventargs e)
??{
???if (incept != null)
???{
????incept(this, e);
???}
??}
??///
??/// 发生错误引发的事件
??///
??public event errorevent error;
??///
??/// 引发错误事件
??///
??/// 错误数据
??protected virtual void onerrorevent(erroreventargs e)
??{
???if (error != null)
???{
????error(this, e);
???}
??}
??
?}
?
?///
?/// 接收数据事件
?///
?public class incepteventargs : eventargs
?{?
??private readonly stream datastream;
??private readonly socket clientsocket;
??///
??/// 构造
??///
??/// 接收到的数据
??/// 接收的插座
??public incepteventargs(stream astream,socket clientsocket)
??{
???datastream=astream;
???clientsocket=clientsocket;
??}
??///
??/// 接受的数据流
??///
??public stream astream
??{????
???get { return datastream;}?????
??}
??///
??/// 接收的插座
??///
??public socket clientsocket
??{????
???get { return clientsocket;}?????
??}
?}?
?///
?/// 定义接收委托
?///
?public delegate void inceptevent(object sender, incepteventargs e);
?///
?/// 错处事件
?///
?public class erroreventargs : eventargs
?{?
??private readonly exception error;
??///
??/// 构造
??///
??/// 错误信息对象
??public erroreventargs(exception error)
??{
???error=error;
??}
??///
??/// 错误信息对象
??///
??public exception error
??{????
???get { return error;}?????
??}
?}
?///
?/// 错误委托
?///
?public delegate void errorevent(object sender, erroreventargs e);
?
?
?
}


服务器端:

using system;
using system.io;
using system.componentmodel;
using system.collections;
using system.diagnostics;
using system.net;
using system.net.sockets;
using system.threading;

namespace mykj
{
?///
?/// mytcpipclient 提供在net tcp_ip 协议上基于消息的服务端
?///
?public class mytcpipserver : system.componentmodel.component
?{
??private int buffersize=2048;
??private string tcpipserverip="";
??private int tcpipserverport=11000;
??private socket listener=null;
??private manualresetevent alldone = new manualresetevent(false);
??private manualresetevent senddone = new manualresetevent(false);
??private thread thread=null;
??
??private void startlistening()
??{
???try
???{
????listener = new socket(addressfamily.internetwork,
?????sockettype.stream, protocoltype.tcp);
?????
????ipaddress ipaddress;
????if(tcpipserverip.trim()=="")
????{
?????ipaddress=ipaddress.any;?
????}
????else
????{
?????ipaddress=ipaddress.parse(tcpipserverip);
????}
????ipendpoint localendpoint = new ipendpoint(ipaddress, tcpipserverport);
????
????listener.bind(localendpoint);
????listener.listen(10);
????while (true)
????{
?????alldone.reset();
?????listener.beginaccept(new asynccallback(acceptcallback),listener);
?????alldone.waitone();
????}
???}
???catch (exception e)
???{
????onerrorserverevent(new errorservereventargs(e,listener));?
???}
??}
??
??private void readcallback(iasyncresult ar)
??{
???socket handler=null;
???try
???{
????lock(ar)
????{
?????stateobject state = (stateobject) ar.asyncstate;
?????handler = state.worksocket;
?????
?????int bytesread = handler.endreceive(ar);
?????
?????if (bytesread > 0)
?????{
??????int readpiont=0;?
??????while(readpiont??????{?
???????if(state.cortrol==0 && readpiont???????{
????????long bi1=state.buffer[readpiont];
????????bi1=(bi1<<24)&0xff000000;
????????state.packsize=bi1;
????????readpiont++;
????????state.cortrol=1;
???????}
??????
???????if(state.cortrol==1 && readpiont???????{
????????long bi1=state.buffer[readpiont];
????????bi1=(bi1<<16)&0x00ff0000;
????????state.packsize=state.packsize+bi1;
????????readpiont++;
????????state.cortrol=2;
???????}
??????
???????if(state.cortrol==2 && readpiont???????{
????????long bi1=state.buffer[readpiont];
????????bi1=(bi1<<8)&0x0000ff00;
????????state.packsize=state.packsize+bi1;
????????readpiont++;
????????state.cortrol=3;
???????}
???????
???????if(state.cortrol==3 && readpiont???????{
????????long bi1=state.buffer[readpiont];
????????bi1=bi1&0xff;
????????state.packsize=state.packsize+bi1-4;
????????readpiont++;
????????state.cortrol=4;
???????}
???????
???????if(state.cortrol==4 && readpiont???????{
????????long bi1=state.buffer[readpiont];
????????bi1=(bi1<<24)&0xff000000;
????????state.residualsize=bi1;
????????readpiont++;
????????state.cortrol=5;
????????state.packsize-=1;
???????}
???????
???????if(state.cortrol==5 && readpiont???????{
????????long bi1=state.buffer[readpiont];
????????bi1=(bi1<<16)&0x00ff0000;
????????state.residualsize=state.residualsize+bi1;
????????readpiont++;
????????state.cortrol=6;
????????state.packsize-=1;
???????}
???????
???????if(state.cortrol==6 && readpiont???????{
????????long bi1=state.buffer[readpiont];
????????bi1=(bi1<<8)&0x0000ff00;
????????state.residualsize=state.residualsize+bi1;
????????readpiont++;
????????state.cortrol=7;
????????state.packsize-=1;
???????}
???????if(state.cortrol==7 && readpiont???????{
????????long bi1=state.buffer[readpiont];
????????bi1=bi1&0xff;
????????state.residualsize=state.residualsize+bi1;
????????state.datastream.setlength(0);
????????state.datastream.position=0;
????????
????????readpiont++;
????????state.cortrol=8;
????????state.packsize-=1;
???????}
???????
???????if(state.cortrol==8 && readpiont???????{
????????int bi1=bytesread-readpiont;
????????int bi2=(int)(state.residualsize-state.datastream.length);
????????if(bi1>=bi2)
????????{
?????????state.datastream.write(state.buffer,readpiont,bi2);
?????????readpiont+=bi2;
?????????oninceptserverevent(new inceptservereventargs(state.datastream,state.worksocket,this));
?????????state.cortrol=9;
?????????state.packsize-=bi2;
?????????
?????????
????????}
????????else
????????{
?????????state.datastream.write(state.buffer,readpiont,bi1);
?????????readpiont+=bi1;
?????????state.packsize-=bi1;
????????}
???????}
???????if(state.cortrol==9 && readpiont???????{
????????int bi1=bytesread-readpiont;
????????if(bi1????????{
?????????state.packsize=state.packsize-bi1;
?????????readpiont+=bi1;
????????}?
????????else
????????{
?????????state.cortrol=0;
?????????readpiont+=(int)state.packsize;
????????}
???????}
??????}
??????if(handler.connected==true)
??????{
???????handler.beginreceive(state.buffer,0,buffersize,0,
????????new asynccallback(readcallback), state);
??????}
?????}
?????else
?????{
??????handler.shutdown(socketshutdown.both);
??????handler.close();
??????//throw(new exception("读入的数据小于1bit"));
?????}
????}
???}
???catch (exception e)
???{
????onerrorserverevent(new errorservereventargs(e,handler));
????
???}
??}
??
??private void sendcallback(iasyncresult ar)
??{
???socket client = (socket) ar.asyncstate;
???try
???{
????int bytessent = client.endsend(ar);
???}
???catch (exception e)
???{
????onerrorserverevent(new errorservereventargs(e,client));
???}
???finally
???{
????senddone.set();
???}
??}
??
??private void acceptcallback(iasyncresult ar)
??{
???socket handler=null;
???try
???{
????socket listener = (socket) ar.asyncstate;
????handler= listener.endaccept(ar);
????stateobject state = new stateobject(buffersize,handler);
????state.worksocket = handler;
????handler.beginreceive(state.buffer,0,buffersize,0,
?????new asynccallback(readcallback), state);
???}
???catch (exception e)
???{
????onerrorserverevent(new errorservereventargs(e,handler));
???}
???finally
???{
????alldone.set();
???}
??}
??
??///
??/// 析构
??///
??/// 不知道
??protected override void dispose(bool disposing)
??{
???abort();
??}
??
??///
??/// 引发接收事件
??///
??/// 数据
??protected virtual void oninceptserverevent(inceptservereventargs e)
??{
???if (inceptserver != null)
???{
????inceptserver(this, e);
???}
??}
??///
??/// 引发错误事件
??///
??/// 数据
??protected virtual void onerrorserverevent(errorservereventargs e)
??{
???if (errorserver != null)
???{
????errorserver(this, e);
???}
??}
??
??///
??/// 开始监听访问
??///
??public void listening()
??{
???//startlistening();
???thread=new thread(new threadstart(startlistening));
???thread.name="mytcpipserver.listening";
???thread.start();
??}
??///
??/// 异常中止服务
??///
??public void abort()
??{
???if(thread!=null)
???{
????thread.abort();
????listener.close();
???}
??}
??
??///
??///构造
??///
??/// 父控件
??public mytcpipserver(system.componentmodel.icontainer container)
??{
???container.add(this);
???initializecomponent();

???//
???// todo: 在 initializecomponent 调用后添加任何构造函数代码
???//
??}

??///
??/// 构造
??///
??public mytcpipserver()
??{
???initializecomponent();

???//
???// todo: 在 initializecomponent 调用后添加任何构造函数代码
???//
??}

??#region component designer generated code
??///
??/// 设计器支持所需的方法 - 不要使用代码编辑器修改
??/// 此方法的内容。
??///
??private void initializecomponent()
??{

??}
??#endregion

??///
??/// 要连接的服务器ip地址
??///
??public string tcpipserverip
??{
???get
???{
????return tcpipserverip;
???}
???set
???{
????tcpipserverip=value;
???}
??}

??///
??/// 要连接的服务器所使用的端口
??///
??public int tcpipserverport
??{
???get
???{
????return tcpipserverport;
???}
???set
???{
????tcpipserverport=value;
???}
??}

??///
??/// 缓冲器大小
??///
??public int buffersize
??{
???get
???{
????return buffersize;
???}
???set
???{
????buffersize=value;
???}
??}
??
??///
??/// 连接的活动状态
??///
??public bool activ
??{
???get
???{
????return listener.connected;
???}
???//set
???//{
???//?activ=value;
???//}
??}

??///
??/// 发送一个流数据
??///
??public void send(socket clientsocket,stream astream)
??{
???try
???{
????if(clientsocket.connected==false)
????{
?????throw(new exception("没有连接客户端不可以发送信息!"));
????}
????astream.position=0;
????byte[] bytedata=new byte[buffersize];
????int bi1=(int)((astream.length+8)/buffersize);
????int bi2=(int)astream.length;
????if(((astream.length+8)%buffersize)>0)
????{
?????bi1=bi1+1;
????}
????bi1=bi1*buffersize;
????
????bytedata[0]=system.convert.tobyte(bi1>>24);
????bytedata[1]=system.convert.tobyte((bi1&0x00ff0000)>>16);
????bytedata[2]=system.convert.tobyte((bi1&0x0000ff00)>>8);
????bytedata[3]=system.convert.tobyte((bi1&0x000000ff));
????
????bytedata[4]=system.convert.tobyte(bi2>>24);
????bytedata[5]=system.convert.tobyte((bi2&0x00ff0000)>>16);
????bytedata[6]=system.convert.tobyte((bi2&0x0000ff00)>>8);
????bytedata[7]=system.convert.tobyte((bi2&0x000000ff));
????
????int n = astream.read(bytedata, 8, bytedata.length-8);
????
????while (n>0)
????{
?????clientsocket.beginsend(bytedata, 0, bytedata.length, 0,?new asynccallback(sendcallback), clientsocket);
?????senddone.waitone();
?????bytedata=new byte[buffersize];
?????n = astream.read(bytedata,0,bytedata.length);
????}
???}
???catch (exception e)
???{
????onerrorserverevent(new errorservereventargs(e,clientsocket));
???}
??}
??
??///
??/// 接收到数据事件
??///
??public event inceptserverevent inceptserver;
??///
??/// 发生错误事件
??///
??public event errorserverevent errorserver;
?}
?///
?/// 状态对象
?///
?public class stateobject
?{
??///
??/// 构造
??///
??/// 缓存
??/// 工作的插座
??public stateobject(int buffersize,socket worksocket)
??{
???buffer = new byte[buffersize];
???worksocket=worksocket;
??}
??///
??/// 缓存
??///
??public byte[] buffer = null;
??///
??/// 工作插座
??///
??public socket worksocket = null;?????????????
??///
??/// 数据流
??///
??public stream datastream=new memorystream();
??///
??/// 剩余大小
??///
??public long residualsize=0;
??///
??/// 数据包大小
??///
??public long packsize=0;
??///
??/// 计数器
??///
??public int cortrol=0;
?}
?
?///
?/// 接收事件
?///
?public class inceptservereventargs : eventargs
?{?
??private readonly stream datastream;
??private readonly socket serversocket;
??private readonly mytcpipserver tcpipserver;
??///
??/// 构造
??///
??/// 数据
??/// 工作插座
??/// 提供服务的tcp/ip对象
??public inceptservereventargs(stream astream,socket serversocket,mytcpipserver tcpipserver)
??{
???datastream=astream;
???serversocket=serversocket;
???tcpipserver=tcpipserver;
??}
??? ///
??? /// 数据
??? ///
??public stream astream
??{????
???get { return datastream;}?????
??}
??///
??/// 工作插座
??///
??public socket serversocket
??{????
???get { return serversocket;}?????
??}
??///
??/// 提供tcp/ip服务的服务器对象.
??///
??public mytcpipserver tcpipserver
??{????
???get { return tcpipserver;}?????
??}
?}
?///
?/// 接收数据委托
?///
? public delegate void inceptserverevent(object sender, inceptservereventargs e);
?///
?/// 错误事件委托
?///
?public class errorservereventargs : eventargs
?{?
??private readonly exception error;
??private readonly socket serversocket;
??///
??/// 构造
??///
??/// 数据
??/// 问题插座
??public errorservereventargs(exception error,socket serversocket)
??{
???error=error;
???serversocket=serversocket;
??}
??? ///
??? /// 数据
??? ///
??public exception error
??{????
???get { return error;}?????
??}
??///
??/// 问题插座
??///
??public socket serversocket
??{????
???get { return serversocket;}?????
??}
?}
?///
?///错误事件委托
?///
? public delegate void errorserverevent(object sender, errorservereventargs e);
}

完成端口方式是nt目前最好的一种通信方式,它在大吞吐,大数量连接下,比其它方式有无法比拟的效率以及性能,因c#在的socket在内部使用的完成端口方式,在程序实现上异常简单,请大家体会.


?

?




发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表