首页 > 开发 > 综合 > 正文

续实例解析SOCKET编程模型之异步通信篇(上)

2024-07-21 02:23:36
字体:
来源:转载
供稿:网友
.net 框架的 socket 类实际上是 winsock32 api 提供的套接字服务的托管代码版本。其中socket 类为网络通信提供了一套丰富的方法和属性,大多数情况下,socket 类方法只是将数据封送到它们的本机win32 副本中并处理任何必要的安全检查。socket 类允许使用 protocoltype 枚举中所列出的任何一种协议执行异步和同步数据传输。socket 类遵循异步方法的 .net framework 命名模式;例如,同步 receive 方法对应于异步 beginreceive 和 endreceive 方法。

事实上socket可以象流stream一样被视为一个应用程序端(客户端)和远程服务器端之间数据通道,通过这个通道来对数据进行读取(接收)和写入(发送)。

异步模式所提供的革新之一就是调用方确定特定调用是否应是异步的。对于被调用的对象,没有必要执行附加的编程来用于支持其客户端的异步行为;在该模式中异步委托提供此功能。

如果应用程序在执行期间只需要一个线程,请使用我在《实例解析socket编程模型》中介绍的方法,这些方法适用于单线程同步操作模式。同步操作模式对执行网络操作的函数(如 send 和 receive)的调用一直等到操作完成后才将控制返回给调用程序。

若要在执行过程中使用单独的线程处理通信,请使用下面的方法,这些方法适用于异步操作模式。异步操作模式对执行网络操作的函数的调用立即返回。

如果当前使用的是面向连接的协议(如 tcp),则可使用 socket、beginconnect 和 endconnect 方法来连接侦听主机。通过使用 beginsend 和 endsend 方法,或者使用 beginreceive 和 endreceive 方法,可以进行异步数据通信。可以使用 beginaccept 和 endaccept 处理传入的连接请求。
如果当前使用的是无连接协议(如 udp),则可以使用 beginsendto 和 endsendto 来发送数据报,而使用 beginreceivefrom 和 endreceivefrom 来接收数据报。
当数据发送和数据接收完成之后,可使用 shutdown 方法来禁用 socket。在调用 shutdown 之后,可调用 close 方法来释放与 socket 关联的所有资源。

socket 类允许使用 setsocketoption 方法来配置 socket。可使用 getsocketoption 方法来检索这些设置。

注意 如果编写较简单的应用程序,而且只需同步数据传输,则可以考虑使用 tcpclient、tcplistener 和 udpclient。这些类为 socket 通信提供了更简单、对用户更友好的接口。


从tcp/ip模型上来看, 像 system.net命名空间中的httpwebreqeust类和httpwebresponse类属于请求/响应层。tcpclient、tcplistener 和 udpclient这些类属于应用协议层,处中间。而socket类处于传输层,是最底层。当其上面的请求/响应层和应用协议层不能满足应用程序的特殊需要时,就需要使用传输层进行socket套接字编程。



异步服务器套接字使用 .net framework 异步编程模型处理网络服务请求。socket 类遵循标准 .net framework 异步命名模式;例如,同步 accept 方法对应异步 beginaccept 和 endaccept 方法。

异步服务器套接字需要一个开始接受网络连接请求的方法,一个处理连接请求并开始接收网络数据的回调方法以及一个结束接收数据的回调方法。本节将进一步讨论所有这些方法。

在下面的源码中,为开始接受网络连接请求,方法 startlistening 初始化 socket,然后使用 beginaccept 方法开始接受新连接。当套接字上接收到新连接请求时,将调用接受回调方法。它负责获取将处理连接的 socket 实例,并将 socket 提交给将处理请求的线程。接受回调方法实现 asynccallback 委托;它返回 void,并带一个 iasyncresult 类型的参数。下面的示例是接受回调方法的外壳程序: private void acceptcallback(iasyncresult ar){}

beginaccept 方法带两个参数:指向接受回调方法的 asynccallback 委托和一个用于将状态信息传递给回调方法的对象。在下面的示例中,侦听 socket 通过状态参数传递给回调方法。本示例创建一个 asynccallback 开始一个异步操作来接受一个传入的连接尝试 listeningsocket.beginaccept(new asynccallback(acceptcallback),listeningsocket);//

异步套接字使用系统线程池中的线程处理传入的连接。一个线程负责接受连接,另一线程用于处理每个传入的连接,还有一个线程负责接收连接数据。这些线程可以是同一个线程,具体取决于线程池所分配的线程。system.threading.manualresetevent 类挂起主线程的执行并在执行可以继续时发出信号。

接受回调方法(即前例中的 acceptcallback)负责向主应用程序发出信号,让它继续执行处理、建立与客户端的连接并开始异步读取客户端数据。下面的示例是 acceptcallback 方法实现的第一部分。该方法的此节向主应用程序线程发出信号,让它继续处理并建立与客户端的连接。

开始从客户端套接字接收数据的 acceptcallback 方法的此节首先初始化 stateobject 类的一个实例,然后调用 beginreceive 方法以开始从客户端套接字异步读取数据。需要为异步套接字服务器实现的 final 方法是返回客户端发送的数据的读取回调方法。与接受回调方法一样,读取回调方法也是一个 asynccallback 委托。该方法将来自客户端套接字的一个或多个字节读入数据缓冲区,然后再次调用 beginreceive 方法,直到客户端发送的数据完成为止。

创建线程时,将使用采用 threadstart 委托作为其唯一参数的构造函数创建 thread 类的新实例。但线程在调用 start 方法前不会开始执行。调用 start 后,将从由 threadstart 委托引用的方法的第一行开始执行。如下例所示: thread thread=new thread(new threadstart(threadproc));

thread.start();



在以下的源码中我们使用了manualresetevent 来允许线程通过发信号互相通信。通常,此通信涉及一个线程在其他线程进行之前必须完成的任务。

当线程开始一个活动(此活动必须在其他线程进行之前完成)时,它调用 reset 将 manualresetevent 设置为非终止状态。此线程可被视为控制 manualresetevent。调用 manualresetevent 上的 waitone 的线程将阻塞,并等待信号。当控制线程完成活动时,它调用 set 以发出等待线程可以继续进行的信号。并释放所有等待线程。

一旦它被终止,manualresetevent 将保持终止状态,直到它被手动重置。即对 waitone 的调用将立即返回。

可以通过将布尔值传递给构造函数来控制 manualresetevent 的初始状态,如果初始状态处于终止状态,为 true;否则为 false。



//以下是异步聊天服务器端详细实现方法

using system;
using system.drawing;
using system.collections;
using system.componentmodel;
using system.windows.forms;
using system.data;
using system.net;
using system.net.sockets;
using system.threading;
using system.text;
namespace 聊天_socket
{
/// <summary>
/// form1 的摘要说明。
/// </summary>
public class form1 : system.windows.forms.form
{
private system.windows.forms.statusbar statusbar1;
private system.windows.forms.label label1;
private system.windows.forms.label label2;
private system.windows.forms.label label3;
private system.windows.forms.label label4;
private system.windows.forms.richtextbox rtbreceive;
private system.windows.forms.richtextbox rtbsend;
private system.windows.forms.textbox txtserver;
private system.windows.forms.textbox txtport;
private system.windows.forms.button btnlisten;
private system.windows.forms.button btnsend;
private system.windows.forms.button btnstop;
private ipaddress hostipaddress=ipaddress.parse("127.0.0.1");
private ipendpoint server;
private socket listeningsocket;
private socket handler;
private socket mysocket;
private static manualresetevent done=new manualresetevent(false);
private const int buffersize=256;
private byte[] buffer=new byte[buffersize];
string port;
/// <summary>
/// 必需的设计器变量。
/// </summary>
private system.componentmodel.container components = null;

public form1()
{
//
// windows 窗体设计器支持所必需的
//
initializecomponent();

//
// todo: 在 initializecomponent 调用后添加任何构造函数代码
//
}

/// <summary>
/// 清理所有正在使用的资源。
/// </summary>
protected override void dispose( bool disposing )
{
if( disposing )
{
if (components != null)
{
components.dispose();
}
}
base.dispose( disposing );
}

#region windows 窗体设计器生成的代码
/// <summary>
/// 设计器支持所需的方法 - 不要使用代码编辑器修改
/// 此方法的内容。
/// </summary>
private void initializecomponent()
{
this.rtbreceive = new system.windows.forms.richtextbox();
this.rtbsend = new system.windows.forms.richtextbox();
this.txtserver = new system.windows.forms.textbox();
this.txtport = new system.windows.forms.textbox();
this.statusbar1 = new system.windows.forms.statusbar();
this.btnlisten = new system.windows.forms.button();
this.btnsend = new system.windows.forms.button();
this.btnstop = new system.windows.forms.button();
this.label1 = new system.windows.forms.label();
this.label2 = new system.windows.forms.label();
this.label3 = new system.windows.forms.label();
this.label4 = new system.windows.forms.label();
this.suspendlayout();
//
// rtbreceive
//
this.rtbreceive.location = new system.drawing.point(80, 56);
this.rtbreceive.name = "rtbreceive";
this.rtbreceive.size = new system.drawing.size(264, 96);
this.rtbreceive.tabindex = 0;
this.rtbreceive.text = "";
//
// rtbsend
//
this.rtbsend.location = new system.drawing.point(80, 152);
this.rtbsend.name = "rtbsend";
this.rtbsend.size = new system.drawing.size(264, 96);
this.rtbsend.tabindex = 1;
this.rtbsend.text = "";
//
// txtserver
//
this.txtserver.location = new system.drawing.point(72, 16);
this.txtserver.name = "txtserver";
this.txtserver.tabindex = 2;
this.txtserver.text = "127.0.0.1";
//
// txtport
//
this.txtport.location = new system.drawing.point(288, 16);
this.txtport.name = "txtport";
this.txtport.size = new system.drawing.size(48, 21);
this.txtport.tabindex = 3;
this.txtport.text = "19811";
//
// statusbar1
//
this.statusbar1.location = new system.drawing.point(0, 287);
this.statusbar1.name = "statusbar1";
this.statusbar1.showpanels = true;
this.statusbar1.size = new system.drawing.size(360, 22);
this.statusbar1.tabindex = 4;
this.statusbar1.text = "statusbar1";
//
// btnlisten
//
this.btnlisten.location = new system.drawing.point(32, 256);
this.btnlisten.name = "btnlisten";
this.btnlisten.tabindex = 5;
this.btnlisten.text = "开始监听";
this.btnlisten.click += new system.eventhandler(this.btnlisten_click);
//
// btnsend
//
this.btnsend.location = new system.drawing.point(144, 256);
this.btnsend.name = "btnsend";
this.btnsend.tabindex = 6;
this.btnsend.text = "发送信息";
this.btnsend.click += new system.eventhandler(this.btnsend_click);
//
// btnstop
//
this.btnstop.location = new system.drawing.point(256, 256);
this.btnstop.name = "btnstop";
this.btnstop.tabindex = 7;
this.btnstop.text = "停止监听";
this.btnstop.click += new system.eventhandler(this.btnstop_click);
//
// label1
//
this.label1.location = new system.drawing.point(16, 16);
this.label1.name = "label1";
this.label1.size = new system.drawing.size(56, 23);
this.label1.tabindex = 8;
this.label1.text = "服务器:";
//
// label2
//
this.label2.location = new system.drawing.point(216, 16);
this.label2.name = "label2";
this.label2.size = new system.drawing.size(64, 23);
this.label2.tabindex = 9;
this.label2.text = "监听端口:";
//
// label3
//
this.label3.location = new system.drawing.point(16, 64);
this.label3.name = "label3";
this.label3.size = new system.drawing.size(64, 23);
this.label3.tabindex = 10;
this.label3.text = "接收信息:";
//
// label4
//
this.label4.location = new system.drawing.point(16, 152);
this.label4.name = "label4";
this.label4.size = new system.drawing.size(64, 23);
this.label4.tabindex = 11;
this.label4.text = "发送信息:";
//
// form1
//
this.autoscalebasesize = new system.drawing.size(6, 14);
this.clientsize = new system.drawing.size(360, 309);
this.controls.add(this.label4);
this.controls.add(this.label3);
this.controls.add(this.label2);
this.controls.add(this.label1);
this.controls.add(this.btnstop);
this.controls.add(this.btnsend);
this.controls.add(this.btnlisten);
this.controls.add(this.statusbar1);
this.controls.add(this.txtport);
this.controls.add(this.txtserver);
this.controls.add(this.rtbsend);
this.controls.add(this.rtbreceive);
this.name = "form1";
this.text = "聊天程序-服务器";
this.topmost = true;
this.closing += new system.componentmodel.canceleventhandler(this.form1_closing);
this.resumelayout(false);

}
#endregion

/// <summary>
/// 应用程序的主入口点。
/// </summary>
[stathread]
static void main()
{
application.run(new form1());
}

private void btnlisten_click(object sender, system.eventargs e)
{
try
{
hostipaddress=ipaddress.parse(txtserver.text);
port=txtport.text;
}
catch{messagebox.show("请输入正确的ip地址格式");}
try
{ //通过组合服务的主机 ip 地址和端口号,ipendpoint 类形成到服务的连接点。
server=new ipendpoint(hostipaddress,int32.parse(port));
// create a socket object to establish a connection with the server
listeningsocket=new socket(addressfamily.internetwork,sockettype.stream,protocoltype.tcp);
listeningsocket.bind(server); //绑定该主机端口
listeningsocket.listen(50); //监听端口,等待客户端连接请求。50是队列中最多可容纳的等待接受的传入连接数
statusbar1.text="主机"+txtserver.text+"端口"+txtport.text+"开始监听.....";
//accept 以同步方式从侦听套接字的连接请求队列中提取第一个挂起的连接请求,然后创建并返回新的 socket。
//mysocket=listeningsocket.accept();
//一个进程可以创建一个或多个线程以执行与该进程关联的部分程序代码。使用 threadstart 委托指定由线程执行的程序代码。
thread thread=new thread(new threadstart(threadproc));
thread.start();
}
catch(exception ee){statusbar1.text=ee.message;}
}
private void threadproc()
{
//if(mysocket.connected)
//{
//statusbar1.text="与客户建立连接.";
while(true)
{
/*byte[] byterecv=new byte[256];
mysocket.receive(byterecv,byterecv.length,0);
string strrecv=encoding.bigendianunicode.getstring(byterecv);
rtbreceive.appendtext(strrecv+"/r/n");*/
done.reset(); //将状态设为非终止
listeningsocket.beginaccept(new asynccallback(acceptcallback),listeningsocket);//开始一个异步操作来接受一个传入的连接尝试
done.waitone(); //阻塞当前线程,直到当前线程收到信号。
}
//}
}
private void acceptcallback(iasyncresult ar)//ar表示异步操作的状态。
{
done.set();//设为终止
mysocket=(socket)ar.asyncstate; //获取状态
handler=mysocket.endaccept(ar); //异步接受传入的连接尝试,并创建新的 socket 来处理远程主机通信,获取结果
try
{
byte[] bytedata=encoding.bigendianunicode.getbytes("准备完毕,可以通话"+"/r/n");
//调用sendcallback异步发送数据,
handler.beginsend(bytedata,0,bytedata.length,0,new asynccallback(sendcallback),handler);
}
catch(exception ee){messagebox.show(ee.message);}
thread thread=new thread(new threadstart(threadrev));
thread.start();
}

private void sendcallback(iasyncresult ar)
{
try
{
handler=(socket)ar.asyncstate; //获取状态
int bytessent=handler.endsend(ar);//结束挂起的异步发送,返回向 socket 发送的字节数
}
catch{}
}
private void threadrev()
{
handler.beginreceive(buffer,0,buffersize,0,new asynccallback(readcallback),handler);
}

private void readcallback(iasyncresult ar)
{
int bytesread=handler.endreceive(ar); //结束挂起的异步读取,返回接收到的字节数。
stringbuilder sb=new stringbuilder(); //接收数据的可变字符字符串,在通过追加、移除、替换或插入字符而创建它后可以对它进行修改。
sb.append(encoding.bigendianunicode.getstring(buffer,0,bytesread));//追加字符串
string content=sb.tostring(); //转换为字符串
sb.remove(0,content.length); //清除sb内容
rtbreceive.appendtext(content+"/r/n");
handler.beginreceive(buffer,0,buffersize,0,new asynccallback(readcallback),handler);
}
private void btnstop_click(object sender, system.eventargs e)
{
try
{
listeningsocket.close();
statusbar1.text="主机"+txtserver.text+"端口"+txtport.text+"监听停止";
}
catch{messagebox.show("监听尚未开始,关闭无效");}
}

private void btnsend_click(object sender, system.eventargs e)
{
try
{
string strsend ="server--->"+rtbsend.text+"/r/n";
byte[] bytesend = encoding.bigendianunicode.getbytes(strsend);
handler.beginsend(bytesend,0,bytesend.length,0,new asynccallback(sendcallback),handler);
}
catch{messagebox.show("连接尚未建立,无法发送.");}
}

private void form1_closing(object sender, system.componentmodel.canceleventargs e)
{
try
{
listeningsocket.close();//在窗口关闭之前关闭scoket连接并释放所有关联的资源。
}
catch{}
}
}
}





发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表