首页 > 开发 > 综合 > 正文

[C#]I/O完成端口的类定义和测试实例

2024-07-21 02:17:43
字体:
来源:转载
供稿:网友
从william kennedy那里整理过来的,不同之处在于他自己定义了一个overlapped,而我们这里直接使用
system.threading.nativeoverlapped.附一段我以前的win32下的iocp文档,如果您了解iocp也可以直接跳过看后面的c#测试示范:

 

整理者:郑昀@ultrapower

 

我们采用的是i/o complete port(以下简称iocp)处理机制。

简单的讲,当服务应用程序初始化时,它应该先创建一个i/o cp。我们在请求到来后,将得到的数据打包用postqueuedcompletionstatus发送到iocp中。这时需要创建一些个线程(7个线程/每cpu,再多就没有意义了)来处理发送到iocp端口的消息。实现步骤大致如下:

1     先在主线程中调用createiocompletionport创建iocp。

createiocompletionport的前三个参数只在把设备同complete port相关联时才有用。

此时我们只需传递invalid_handle_value,null和0即可。

第四个参数告诉端口同时能运行的最多线程数,这里设置为0,表示默认为当前计算机的cpu数目。

2     我们的threadfun线程函数执行一些初始化之后,将进入一个循环,该循环会在服务进程终止时才结束。

在循环中,调用getqueuedcompletionstatus,这样就把当前线程的id放入一个等待线程队列中,i/o cp内核对象就总能知道哪个线程在等待处理完成的i/o请求。

如果在idle_thread_timeout规定的时间内i/o cp上还没有出现一个completion packet,则转入下一次循环。在这里我们设置的idle_thread_timeout为1秒。

 

当端口的i/o完成队列中出现一项时,完成端口就唤醒等待线程队列中的这个线程,该线程将得到完成的i/o项中的信息:       传输的字节数、完成键和overlapped结构的地址。

 

在我们的程序中可以用智能指针或者bstr或者int来接受这个overlapped结构的地址的值,从而得到消息;然后在这个线程中处理消息。

getqueuedcompletionstatus的第一个参数hcompletionport指出了要监视哪一个端口,这里我们传送先前从createiocompletionport返回的端口句柄。

 

需要注意的是:

第一,   线程池的数目是有限制的,和cpu数目有关系。

第二,   iocp是一种较为完美的睡眠/唤醒 线程机制;线程当前没有任务要处理时,就进入睡眠状态,从而不占用cpu资源,直到被内核唤醒;

第三,   最近一次刚执行完的线程,下次任务来的时候还会唤醒它;所以有可能比较少被调用的线程以后被调用的几率也少。

 
测试代码:

using system;
using system.threading;  // included for the thread.sleep call
using continuum.threading;
using system.runtime.interopservices;

namespace iocpdemo
{
    //=============================================================================
    /**//// <summary> sample class for the threading class </summary>
    public class utilthreadingsample
    {
        //*****************************************************************************   
        /**//// <summary> test method </summary>
        static void main()
        {
            // create the mssql iocp thread pool
            iocpthreadpool pthreadpool = new iocpthreadpool(0, 10, 20, new iocpthreadpool.user_function(iocpthreadfunction));
      
            //for(int i =1;i<10000;i++)
            {
                pthreadpool.postevent(1234);
            }
      
            thread.sleep(100);
      
            pthreadpool.dispose();
        }
    
        //********************************************************************
        /**//// <summary> function to be called by the iocp thread pool.  called when
        ///           a command is posted for processing by the socketmanager </summary>
        /// <param name="ivalue"> the value provided by the thread posting the event </param>
        static public void iocpthreadfunction(int ivalue)
        {
            try
            {
                console.writeline("value: {0}", ivalue.tostring());
                thread.sleep(3000);
            }
      
            catch (exception pexception)
            {
                console.writeline(pexception.message);
            }
        }
    }

}


类代码:
using system;
using system.threading;
using system.runtime.interopservices;

namespace iocpthreading
{
    [structlayout(layoutkind.sequential, charset=charset.auto)]

    public sealed class iocpthreadpool
    {
        [dllimport("kernel32", charset=charset.auto)]
        private unsafe static extern uint32 createiocompletionport(uint32 hfile, uint32 hexistingcompletionport, uint32* puicompletionkey, uint32 uinumberofconcurrentthreads);

        [dllimport("kernel32", charset=charset.auto)]
        private unsafe static extern boolean closehandle(uint32 hobject);

        [dllimport("kernel32", charset=charset.auto)]
        private unsafe static extern boolean postqueuedcompletionstatus(uint32 hcompletionport, uint32 uisizeofargument, uint32* puiuserarg, system.threading.nativeoverlapped* poverlapped);

        [dllimport("kernel32", charset=charset.auto)]
        private unsafe static extern boolean getqueuedcompletionstatus(uint32 hcompletionport, uint32* psizeofargument, uint32* puiuserarg, system.threading.nativeoverlapped** ppoverlapped, uint32 uimilliseconds);

        private const uint32 invalid_handle_value = 0xffffffff;
        private const uint32 inifinite = 0xffffffff;
        private const int32 shutdown_iocpthread = 0x7fffffff;
        public delegate void user_function(int ivalue);
        private uint32 m_hhandle;
        private uint32 gethandle { get { return m_hhandle; } set { m_hhandle = value; } }

        private int32 m_uimaxconcurrency;

        private int32 getmaxconcurrency { get { return m_uimaxconcurrency; } set { m_uimaxconcurrency = value; } }


        private int32 m_iminthreadsinpool;

        private int32 getminthreadsinpool { get { return m_iminthreadsinpool; } set { m_iminthreadsinpool = value; } }

        private int32 m_imaxthreadsinpool;

        private int32 getmaxthreadsinpool { get { return m_imaxthreadsinpool; } set { m_imaxthreadsinpool = value; } }


        private object m_pcriticalsection;

        private object getcriticalsection { get { return m_pcriticalsection; } set { m_pcriticalsection = value; } }


        private user_function m_pfnuserfunction;

        private user_function getuserfunction { get { return m_pfnuserfunction; } set { m_pfnuserfunction = value; } }


        private boolean m_bdisposeflag;

        /**//// <summary> simtype: flag to indicate if the class is disposing </summary>

        private boolean isdisposed { get { return m_bdisposeflag; } set { m_bdisposeflag = value; } }

        private int32 m_icurthreadsinpool;

        /**//// <summary> simtype: the current number of threads in the thread pool </summary>

        public int32 getcurthreadsinpool { get { return m_icurthreadsinpool; } set { m_icurthreadsinpool = value; } }

        /**//// <summary> simtype: increment current number of threads in the thread pool </summary>

        private int32 inccurthreadsinpool() { return interlocked.increment(ref m_icurthreadsinpool); }

        /**//// <summary> simtype: decrement current number of threads in the thread pool </summary>

        private int32 deccurthreadsinpool() { return interlocked.decrement(ref m_icurthreadsinpool); }


        private int32 m_iactthreadsinpool;

        /**//// <summary> simtype: the current number of active threads in the thread pool </summary>

        public int32 getactthreadsinpool { get { return m_iactthreadsinpool; } set { m_iactthreadsinpool = value; } }

        /**//// <summary> simtype: increment current number of active threads in the thread pool </summary>

        private int32 incactthreadsinpool() { return interlocked.increment(ref m_iactthreadsinpool); }

        /**//// <summary> simtype: decrement current number of active threads in the thread pool </summary>

        private int32 decactthreadsinpool() { return interlocked.decrement(ref m_iactthreadsinpool); }


        private int32 m_icurworkinpool;

        /**//// <summary> simtype: the current number of work posted in the thread pool </summary>

        public int32 getcurworkinpool { get { return m_icurworkinpool; } set { m_icurworkinpool = value; } }

        /**//// <summary> simtype: increment current number of work posted in the thread pool </summary>

        private int32 inccurworkinpool() { return interlocked.increment(ref m_icurworkinpool); }

        /**//// <summary> simtype: decrement current number of work posted in the thread pool </summary>

        private int32 deccurworkinpool() { return interlocked.decrement(ref m_icurworkinpool); }

        public iocpthreadpool(int32 imaxconcurrency, int32 iminthreadsinpool, int32 imaxthreadsinpool, user_function pfnuserfunction)
        {
            try
            {
                // set initial class state

                getmaxconcurrency   = imaxconcurrency;

                getminthreadsinpool = iminthreadsinpool;

                getmaxthreadsinpool = imaxthreadsinpool;

                getuserfunction     = pfnuserfunction;


                // init the thread counters

                getcurthreadsinpool = 0;

                getactthreadsinpool = 0;

                getcurworkinpool    = 0;


                // initialize the monitor object

                getcriticalsection = new object();


                // set the disposing flag to false

                isdisposed = false;


                unsafe
                {

                    // create an io completion port for thread pool use
                    gethandle = createiocompletionport(invalid_handle_value, 0, null, (uint32) getmaxconcurrency);

                }


                // test to make sure the io completion port was created

                if (gethandle == 0)

                    throw new exception("unable to create io completion port");


                // allocate and start the minimum number of threads specified

                int32 istartingcount = getcurthreadsinpool;

        

                threadstart tsthread = new threadstart(iocpfunction);

                for (int32 ithread = 0; ithread < getminthreadsinpool; ++ithread)
                {

                    // create a thread and start it

                    thread ththread = new thread(tsthread);

                    ththread.name = "iocp " + ththread.gethashcode();

                    ththread.start();


                    // increment the thread pool count

                    inccurthreadsinpool();

                }

            }


            catch
            {

                throw new exception("unhandled exception");

            }

        }

        ~iocpthreadpool()
        {

            if (!isdisposed)

                dispose();

        }

        public void dispose()
        {

            try
            {

                // flag that we are disposing this object

                isdisposed = true;


                // get the current number of threads in the pool

                int32 icurthreadsinpool = getcurthreadsinpool;


                // shutdown all thread in the pool

                for (int32 ithread = 0; ithread < icurthreadsinpool; ++ithread)
                {
                    unsafe
                    {

                        bool bret = postqueuedcompletionstatus(gethandle, 4, (uint32*) shutdown_iocpthread, null);

                    }

                }


                // wait here until all the threads are gone

                while (getcurthreadsinpool != 0) thread.sleep(100);


                unsafe
                {

                    // close the iocp handle
                    closehandle(gethandle);

                }

            }

            catch
            {

            }

        }
        private void iocpfunction()
        {
            uint32 uinumberofbytes;

            int32  ivalue;

            try
            {
                while (true)
                {

                    unsafe
                    {

                        system.threading.nativeoverlapped* pov;


                        // wait for an event

                        getqueuedcompletionstatus(gethandle, &uinumberofbytes, (uint32*) &ivalue, &pov, inifinite);
                    }

                    // decrement the number of events in queue

                    deccurworkinpool();


                    // was this thread told to shutdown

                    if (ivalue == shutdown_iocpthread)

                        break;


                    // increment the number of active threads

                    incactthreadsinpool();


                    try
                    {
                        // call the user function
                        getuserfunction(ivalue);

                    }

                    catch(exception ex)
                    {
                        throw ex;
                    }


                    // get a lock

                    monitor.enter(getcriticalsection);


                    try
                    {

                        // if we have less than max threads currently in the pool

                        if (getcurthreadsinpool < getmaxthreadsinpool)
                        {

                            // should we add a new thread to the pool

                            if (getactthreadsinpool == getcurthreadsinpool)
                            {

                                if (isdisposed == false)
                                {

                                    // create a thread and start it

                                    threadstart tsthread = new threadstart(iocpfunction);

                                    thread ththread = new thread(tsthread);

                                    ththread.name = "iocp " + ththread.gethashcode();

                                    ththread.start();


                                    // increment the thread pool count

                                    inccurthreadsinpool();

                                }

                            }

                        }

                    }

                    catch
                    {

                    }


                    // relase the lock

                    monitor.exit(getcriticalsection);


                    // increment the number of active threads

                    decactthreadsinpool();

                }

            }


            catch(exception ex)
            {
                string str=ex.message;

            }


            // decrement the thread pool count

            deccurthreadsinpool();

        }

        //public void postevent(int32 ivalue
        public void postevent(int ivalue)
        {

            try
            {

                // only add work if we are not disposing

                if (isdisposed == false)
                {

                    unsafe
                    {

                        // post an event into the iocp thread pool

                        postqueuedcompletionstatus(gethandle, 4, (uint32*) ivalue, null);

                    }


                    // increment the number of item of work

                    inccurworkinpool();


                    // get a lock

                    monitor.enter(getcriticalsection);


                    try
                    {

                        // if we have less than max threads currently in the pool

                        if (getcurthreadsinpool < getmaxthreadsinpool)
                        {

                            // should we add a new thread to the pool

                            if (getactthreadsinpool == getcurthreadsinpool)
                            {

                                if (isdisposed == false)
                                {

                                    // create a thread and start it

                                    threadstart tsthread = new threadstart(iocpfunction);

                                    thread ththread = new thread(tsthread);

                                    ththread.name = "iocp " + ththread.gethashcode();

                                    ththread.start();


                                    // increment the thread pool count

                                    inccurthreadsinpool();

                                }

                            }

                        }

                    }


                    catch
                    {

                    }


                    // release the lock

                    monitor.exit(getcriticalsection);

                }

            }


            catch (exception e)
            {

                throw e;

            }


            catch
            {

                throw new exception("unhandled exception");

            }

        }  

        public void postevent()
        {

            try
            {

                // only add work if we are not disposing

                if (isdisposed == false)
                {

                    unsafe
                    {

                        // post an event into the iocp thread pool

                        postqueuedcompletionstatus(gethandle, 0, null, null);

                    }


                    // increment the number of item of work

                    inccurworkinpool();


                    // get a lock

                    monitor.enter(getcriticalsection);


                    try

                    {

                        // if we have less than max threads currently in the pool

                        if (getcurthreadsinpool < getmaxthreadsinpool)

                        {

                            // should we add a new thread to the pool

                            if (getactthreadsinpool == getcurthreadsinpool)

                            {

                                if (isdisposed == false)

                                {

                                    // create a thread and start it

                                    threadstart tsthread = new threadstart(iocpfunction);

                                    thread ththread = new thread(tsthread);

                                    ththread.name = "iocp " + ththread.gethashcode();

                                    ththread.start();


                                    // increment the thread pool count

                                    inccurthreadsinpool();

                                }

                            }

                        }

                    }


                    catch

                    {

                    }


                    // release the lock

                    monitor.exit(getcriticalsection);

                }

            }

            catch

            {

                throw new exception("unhandled exception");

            }

        }

    }

}
,欢迎访问网页设计爱好者web开发。

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表