整理者:郑昀@ultrapower
我们采用的是i/o complete port(以下简称iocp)处理机制。
简单的讲,当服务应用程序初始化时,它应该先创建一个i/o cp。我们在请求到来后,将得到的数据打包用postqueuedcompletionstatus发送到iocp中。这时需要创建一些个线程(7个线程/每cpu,再多就没有意义了)来处理发送到iocp端口的消息。实现步骤大致如下:
1 先在主线程中调用createiocompletionport创建iocp。
createiocompletionport的前三个参数只在把设备同complete port相关联时才有用。
此时我们只需传递invalid_handle_value,null和0即可。
第四个参数告诉端口同时能运行的最多线程数,这里设置为0,表示默认为当前计算机的cpu数目。
2 我们的threadfun线程函数执行一些初始化之后,将进入一个循环,该循环会在服务进程终止时才结束。
在循环中,调用getqueuedcompletionstatus,这样就把当前线程的id放入一个等待线程队列中,i/o cp内核对象就总能知道哪个线程在等待处理完成的i/o请求。
如果在idle_thread_timeout规定的时间内i/o cp上还没有出现一个completion packet,则转入下一次循环。在这里我们设置的idle_thread_timeout为1秒。
当端口的i/o完成队列中出现一项时,完成端口就唤醒等待线程队列中的这个线程,该线程将得到完成的i/o项中的信息: 传输的字节数、完成键和overlapped结构的地址。
在我们的程序中可以用智能指针或者bstr或者int来接受这个overlapped结构的地址的值,从而得到消息;然后在这个线程中处理消息。
getqueuedcompletionstatus的第一个参数hcompletionport指出了要监视哪一个端口,这里我们传送先前从createiocompletionport返回的端口句柄。
需要注意的是:
第一, 线程池的数目是有限制的,和cpu数目有关系。
第二, iocp是一种较为完美的睡眠/唤醒 线程机制;线程当前没有任务要处理时,就进入睡眠状态,从而不占用cpu资源,直到被内核唤醒;
第三, 最近一次刚执行完的线程,下次任务来的时候还会唤醒它;所以有可能比较少被调用的线程以后被调用的几率也少。
测试代码:
using system;
using system.threading; // included for the thread.sleep call
using continuum.threading;
using system.runtime.interopservices;
namespace iocpdemo
{
//=============================================================================
/**//// <summary> sample class for the threading class </summary>
public class utilthreadingsample
{
//*****************************************************************************
/**//// <summary> test method </summary>
static void main()
{
// create the mssql iocp thread pool
iocpthreadpool pthreadpool = new iocpthreadpool(0, 10, 20, new iocpthreadpool.user_function(iocpthreadfunction));
//for(int i =1;i<10000;i++)
{
pthreadpool.postevent(1234);
}
thread.sleep(100);
pthreadpool.dispose();
}
//********************************************************************
/**//// <summary> function to be called by the iocp thread pool. called when
/// a command is posted for processing by the socketmanager </summary>
/// <param name="ivalue"> the value provided by the thread posting the event </param>
static public void iocpthreadfunction(int ivalue)
{
try
{
console.writeline("value: {0}", ivalue.tostring());
thread.sleep(3000);
}
catch (exception pexception)
{
console.writeline(pexception.message);
}
}
}
}
类代码:
using system;
using system.threading;
using system.runtime.interopservices;
namespace iocpthreading
{
[structlayout(layoutkind.sequential, charset=charset.auto)]
public sealed class iocpthreadpool
{
[dllimport("kernel32", charset=charset.auto)]
private unsafe static extern uint32 createiocompletionport(uint32 hfile, uint32 hexistingcompletionport, uint32* puicompletionkey, uint32 uinumberofconcurrentthreads);
[dllimport("kernel32", charset=charset.auto)]
private unsafe static extern boolean closehandle(uint32 hobject);
[dllimport("kernel32", charset=charset.auto)]
private unsafe static extern boolean postqueuedcompletionstatus(uint32 hcompletionport, uint32 uisizeofargument, uint32* puiuserarg, system.threading.nativeoverlapped* poverlapped);
[dllimport("kernel32", charset=charset.auto)]
private unsafe static extern boolean getqueuedcompletionstatus(uint32 hcompletionport, uint32* psizeofargument, uint32* puiuserarg, system.threading.nativeoverlapped** ppoverlapped, uint32 uimilliseconds);
private const uint32 invalid_handle_value = 0xffffffff;
private const uint32 inifinite = 0xffffffff;
private const int32 shutdown_iocpthread = 0x7fffffff;
public delegate void user_function(int ivalue);
private uint32 m_hhandle;
private uint32 gethandle { get { return m_hhandle; } set { m_hhandle = value; } }
private int32 m_uimaxconcurrency;
private int32 getmaxconcurrency { get { return m_uimaxconcurrency; } set { m_uimaxconcurrency = value; } }
private int32 m_iminthreadsinpool;
private int32 getminthreadsinpool { get { return m_iminthreadsinpool; } set { m_iminthreadsinpool = value; } }
private int32 m_imaxthreadsinpool;
private int32 getmaxthreadsinpool { get { return m_imaxthreadsinpool; } set { m_imaxthreadsinpool = value; } }
private object m_pcriticalsection;
private object getcriticalsection { get { return m_pcriticalsection; } set { m_pcriticalsection = value; } }
private user_function m_pfnuserfunction;
private user_function getuserfunction { get { return m_pfnuserfunction; } set { m_pfnuserfunction = value; } }
private boolean m_bdisposeflag;
/**//// <summary> simtype: flag to indicate if the class is disposing </summary>
private boolean isdisposed { get { return m_bdisposeflag; } set { m_bdisposeflag = value; } }
private int32 m_icurthreadsinpool;
/**//// <summary> simtype: the current number of threads in the thread pool </summary>
public int32 getcurthreadsinpool { get { return m_icurthreadsinpool; } set { m_icurthreadsinpool = value; } }
/**//// <summary> simtype: increment current number of threads in the thread pool </summary>
private int32 inccurthreadsinpool() { return interlocked.increment(ref m_icurthreadsinpool); }
/**//// <summary> simtype: decrement current number of threads in the thread pool </summary>
private int32 deccurthreadsinpool() { return interlocked.decrement(ref m_icurthreadsinpool); }
private int32 m_iactthreadsinpool;
/**//// <summary> simtype: the current number of active threads in the thread pool </summary>
public int32 getactthreadsinpool { get { return m_iactthreadsinpool; } set { m_iactthreadsinpool = value; } }
/**//// <summary> simtype: increment current number of active threads in the thread pool </summary>
private int32 incactthreadsinpool() { return interlocked.increment(ref m_iactthreadsinpool); }
/**//// <summary> simtype: decrement current number of active threads in the thread pool </summary>
private int32 decactthreadsinpool() { return interlocked.decrement(ref m_iactthreadsinpool); }
private int32 m_icurworkinpool;
/**//// <summary> simtype: the current number of work posted in the thread pool </summary>
public int32 getcurworkinpool { get { return m_icurworkinpool; } set { m_icurworkinpool = value; } }
/**//// <summary> simtype: increment current number of work posted in the thread pool </summary>
private int32 inccurworkinpool() { return interlocked.increment(ref m_icurworkinpool); }
/**//// <summary> simtype: decrement current number of work posted in the thread pool </summary>
private int32 deccurworkinpool() { return interlocked.decrement(ref m_icurworkinpool); }
public iocpthreadpool(int32 imaxconcurrency, int32 iminthreadsinpool, int32 imaxthreadsinpool, user_function pfnuserfunction)
{
try
{
// set initial class state
getmaxconcurrency = imaxconcurrency;
getminthreadsinpool = iminthreadsinpool;
getmaxthreadsinpool = imaxthreadsinpool;
getuserfunction = pfnuserfunction;
// init the thread counters
getcurthreadsinpool = 0;
getactthreadsinpool = 0;
getcurworkinpool = 0;
// initialize the monitor object
getcriticalsection = new object();
// set the disposing flag to false
isdisposed = false;
unsafe
{
// create an io completion port for thread pool use
gethandle = createiocompletionport(invalid_handle_value, 0, null, (uint32) getmaxconcurrency);
}
// test to make sure the io completion port was created
if (gethandle == 0)
throw new exception("unable to create io completion port");
// allocate and start the minimum number of threads specified
int32 istartingcount = getcurthreadsinpool;
threadstart tsthread = new threadstart(iocpfunction);
for (int32 ithread = 0; ithread < getminthreadsinpool; ++ithread)
{
// create a thread and start it
thread ththread = new thread(tsthread);
ththread.name = "iocp " + ththread.gethashcode();
ththread.start();
// increment the thread pool count
inccurthreadsinpool();
}
}
catch
{
throw new exception("unhandled exception");
}
}
~iocpthreadpool()
{
if (!isdisposed)
dispose();
}
public void dispose()
{
try
{
// flag that we are disposing this object
isdisposed = true;
// get the current number of threads in the pool
int32 icurthreadsinpool = getcurthreadsinpool;
// shutdown all thread in the pool
for (int32 ithread = 0; ithread < icurthreadsinpool; ++ithread)
{
unsafe
{
bool bret = postqueuedcompletionstatus(gethandle, 4, (uint32*) shutdown_iocpthread, null);
}
}
// wait here until all the threads are gone
while (getcurthreadsinpool != 0) thread.sleep(100);
unsafe
{
// close the iocp handle
closehandle(gethandle);
}
}
catch
{
}
}
private void iocpfunction()
{
uint32 uinumberofbytes;
int32 ivalue;
try
{
while (true)
{
unsafe
{
system.threading.nativeoverlapped* pov;
// wait for an event
getqueuedcompletionstatus(gethandle, &uinumberofbytes, (uint32*) &ivalue, &pov, inifinite);
}
// decrement the number of events in queue
deccurworkinpool();
// was this thread told to shutdown
if (ivalue == shutdown_iocpthread)
break;
// increment the number of active threads
incactthreadsinpool();
try
{
// call the user function
getuserfunction(ivalue);
}
catch(exception ex)
{
throw ex;
}
// get a lock
monitor.enter(getcriticalsection);
try
{
// if we have less than max threads currently in the pool
if (getcurthreadsinpool < getmaxthreadsinpool)
{
// should we add a new thread to the pool
if (getactthreadsinpool == getcurthreadsinpool)
{
if (isdisposed == false)
{
// create a thread and start it
threadstart tsthread = new threadstart(iocpfunction);
thread ththread = new thread(tsthread);
ththread.name = "iocp " + ththread.gethashcode();
ththread.start();
// increment the thread pool count
inccurthreadsinpool();
}
}
}
}
catch
{
}
// relase the lock
monitor.exit(getcriticalsection);
// increment the number of active threads
decactthreadsinpool();
}
}
catch(exception ex)
{
string str=ex.message;
}
// decrement the thread pool count
deccurthreadsinpool();
}
//public void postevent(int32 ivalue
public void postevent(int ivalue)
{
try
{
// only add work if we are not disposing
if (isdisposed == false)
{
unsafe
{
// post an event into the iocp thread pool
postqueuedcompletionstatus(gethandle, 4, (uint32*) ivalue, null);
}
// increment the number of item of work
inccurworkinpool();
// get a lock
monitor.enter(getcriticalsection);
try
{
// if we have less than max threads currently in the pool
if (getcurthreadsinpool < getmaxthreadsinpool)
{
// should we add a new thread to the pool
if (getactthreadsinpool == getcurthreadsinpool)
{
if (isdisposed == false)
{
// create a thread and start it
threadstart tsthread = new threadstart(iocpfunction);
thread ththread = new thread(tsthread);
ththread.name = "iocp " + ththread.gethashcode();
ththread.start();
// increment the thread pool count
inccurthreadsinpool();
}
}
}
}
catch
{
}
// release the lock
monitor.exit(getcriticalsection);
}
}
catch (exception e)
{
throw e;
}
catch
{
throw new exception("unhandled exception");
}
}
public void postevent()
{
try
{
// only add work if we are not disposing
if (isdisposed == false)
{
unsafe
{
// post an event into the iocp thread pool
postqueuedcompletionstatus(gethandle, 0, null, null);
}
// increment the number of item of work
inccurworkinpool();
// get a lock
monitor.enter(getcriticalsection);
try
{
// if we have less than max threads currently in the pool
if (getcurthreadsinpool < getmaxthreadsinpool)
{
// should we add a new thread to the pool
if (getactthreadsinpool == getcurthreadsinpool)
{
if (isdisposed == false)
{
// create a thread and start it
threadstart tsthread = new threadstart(iocpfunction);
thread ththread = new thread(tsthread);
ththread.name = "iocp " + ththread.gethashcode();
ththread.start();
// increment the thread pool count
inccurthreadsinpool();
}
}
}
}
catch
{
}
// release the lock
monitor.exit(getcriticalsection);
}
}
catch
{
throw new exception("unhandled exception");
}
}
}
}
,欢迎访问网页设计爱好者web开发。
新闻热点
疑难解答