首页 > 开发 > 综合 > 正文

封装stream,在读写stream时提供事件通知

2024-07-21 02:30:10
字体:
来源:转载
供稿:网友

前阵子的工作涉及一些网络编程,使用了面向流的方式做传输数据。在代码过程中,遇到一个新需求就是要统计流量。其实最简单的办法就时在读写流的地方增加代码,把功能增加上去就可以。但是我觉得那样对我原理的代码框架影响较大,基于尽量不影响原来的代码的考虑,我想到了decorator设计模式。

先把代码贴出来,在做解释吧:

   

以下为引用的内容:
 public class eventstream : stream
    {
        public event eventhandler<fstreamdataeventargs> onbeforeread;
        public event eventhandler<fstreamdataeventargs> onbeforewrite;

        private stream stream;
        public eventstream(stream stream)
        {
            if (stream == null) throw new argumentnullexception("eventstream");
            this.stream = stream;
        }

        [ ==== stream members ==== ]#region [ ==== stream members ==== ]
        public override bool canread
        {
            get { return stream.canread; }
        }

        public override bool canseek
        {
            get { return stream.canseek; }
        }

        public override bool canwrite
        {
            get { return stream.canwrite; }
        }

        public override void flush()
        {
            stream.flush();
        }

        public override long length
        {
            get { return stream.length; }
        }

        public override long position
        {
            get
            {
                return stream.position;
            }
            set
            {
                stream.position = value;
            }
        }

        public override int read(byte[] buffer, int offset, int count)
        {
            int readsize = stream.read(buffer, offset, count);
            if (onbeforeread != null)
                onbeforeread(this, new fstreamdataeventargs(buffer, offset, readsize));
            return readsize;
        }

        public override long seek(long offset, seekorigin origin)
        {
            return stream.seek(offset, origin);
        }

        public override void setlength(long value)
        {
            stream.setlength(value);
        }

        public override void write(byte[] buffer, int offset, int count)
        {
            if (onbeforewrite != null)
                onbeforewrite(this, new fstreamdataeventargs(buffer, offset, count));
            stream.write(buffer, offset, count);
        }

        public override iasyncresult beginread(byte[] buffer, int offset, int count,
            asynccallback callback, object state)
        {
            internalasyncstate mystate = new internalasyncstate(
                              new fstreamdataeventargs(buffer, offset, count), state);
            asynccallback mycallback = new asynccallback(
                              new internalcallback(onbeforeread, callback).callback);
            return new eventstreamasyncresult(
                                 stream.beginread(buffer, offset, count, mycallback, mystate));
        }

        public override int endread(iasyncresult asyncresult)
        {
            eventstreamasyncresult esar = asyncresult as eventstreamasyncresult;
            if (esar != null)
                return stream.endread(esar.internalasyncresult);
            else
                return stream.endread(asyncresult);
        }

        public override iasyncresult beginwrite(byte[] buffer, int offset, int count, asynccallback callback, object state)
        {
            internalasyncstate mystate = new internalasyncstate(
                              new fstreamdataeventargs(buffer, offset, count), state);
            asynccallback mycallback = new asynccallback(
                              new internalcallback(onbeforewrite, callback).callback);
            return new eventstreamasyncresult(
                              stream.beginwrite(buffer, offset, count, mycallback, mystate));
        }

        public override void endwrite(iasyncresult asyncresult)
        {
            stream.endwrite(asyncresult);
        }

        #endregion

        private class internalcallback
        {
            private asynccallback callback;
            private eventhandler<fstreamdataeventargs> internalhandler;

            public internalcallback(eventhandler<fstreamdataeventargs> internalhandler, asynccallback callback)
            {
                this.internalhandler = internalhandler;
                this.callback = callback;
            }

            internal void callback(iasyncresult asyncresult)
            {
                internalasyncstate mystate = asyncresult.asyncstate as internalasyncstate;
                if (internalhandler != null && mystate != null)
                    internalhandler(this, mystate.streamdataeventargs);
                callback(new eventstreamasyncresult(asyncresult));
            }
        }

        private class internalasyncstate
        {
            object state;
            fstreamdataeventargs streamdataeventargs;

            public object state
            {
                get { return state; }
            }

            public fstreamdataeventargs streamdataeventargs
            {
                get { return streamdataeventargs; }
            }

            public internalasyncstate(fstreamdataeventargs streamdataeventargs, object state)
            {
                this.streamdataeventargs = streamdataeventargs;
                this.state = state;
            }
        }

        private class eventstreamasyncresult : iasyncresult
        {
            iasyncresult ar;

            public eventstreamasyncresult(iasyncresult ar)
            {
                if (ar == null) throw new argumentnullexception("eventstreamasyncresult");
                this.ar = ar;
            }
            iasyncresult members#region iasyncresult members

            public object asyncstate
            {
                get
                {
                    internalasyncstate mystate = ar.asyncstate as internalasyncstate;
                    if (mystate != null)
                        return mystate.state;
                    else
                        return ar.asyncstate;
                }
            }

            internal iasyncresult internalasyncresult
            {
                get { return ar; }
            }

            public system.threading.waithandle asyncwaithandle
            {
                get { return ar.asyncwaithandle; }
            }

            public bool completedsynchronously
            {
                get { return ar.completedsynchronously; }
            }

            public bool iscompleted
            {
                get { return ar.iscompleted; }
            }

            #endregion
        }
    }

    public class fstreamdataeventargs : eventargs
    {
        private byte[] buffer;
        private int offset;
        private int count;

        public fstreamdataeventargs(byte[] buffer, int offset, int count)
        {
            if(buffer == null) throw new argumentnullexception("fstreamdataeventargs");
            if(offset + count>buffer.length) throw new argumentoutofrangeexception("fstreamdataeventargs");

            this.buffer = buffer;
            this.offset = offset;
            this.count = count;
        }

        /**//// <summary>
        /// 数据缓存
        /// </summary>
        public byte[] buffer
        {
          get { return buffer; }
        }

        /**//// <summary>
        /// 数据开始位置
        /// </summary>
        public int offset
        {
          get { return offset; }
        }

        /**//// <summary>
        /// 数据长度
        /// </summary>
        public int count
        {
          get { return count; }
        }
    }

刚开始以为很简单,事实上写下来还挺多行代码的,decorator模式嘛,当然先继承stream,把stream本来该做的事情先完成了。这个很简单类里面包含一个内部的stream,stream该有的接口都由它来完成了。接下来就是增加两个事件,分别是onbeforeread、onbeforewrite。名字里面都有before,其实我考虑到数据流都会通过这两个事件开放出来,你想做加密什么的都可以,当然也包括我想要的统计数据流量。

接下来就是在读写流的时候触发这两个事件就可以了。看看同步的read、write方法,简单的调用就可以了。
关键的地方就在于异步的读写。

我们先看看一般stream的异步调用代码是怎么样的:

以下为引用的内容:

stream.beginread(buffer, 0, byte2read, new asynccallback(endreadcallback), state);

private void endreadcallback(iasyncresult asyncresult)
{
    object state = asyncresult.asyncstate;
    nreadsize = stream.endread(asyncresult);
            //
}

在不更改这个“client”代码的情况下,要怎么样在stream那边知道这里的确实读了多少数据呢?

显然在调用beginread的时候是不知道,那就只能对这个asynccallback做手脚了。可以预想到framework内部会在完成了read的操作之后会调用asynccallback委托来通知结果。于是我就传一个我定义好的asynccallback委托给beginread。当然还要把“client”提供的asynccallback给包装起来,在做完我的事情(事件通知)之后,还是要把“client”要我办的事情给也给办了(调用"client"的asynccallback委托来通知结果)。

这就在实现了“在客户代码与framework之间插一脚”。

再来看看我是怎么做到事件通知的。首先要把我要的数据给传过去,于是有了internalasyncstate,这里面要有我触发事件需要的事件参数,还应该要包括用户可能传入的state。具体大家看看internalasyncstate的实现。

最后多考虑了一点就是,假如“client”代码不是像我写的那样,而是不断的通过检查stream.beginread 方法返回的iasyncresult的iscompleted属性来确定是否read完成的话,那我的代码就有问题了,我返回的iasyncresult根本就不是原理的iasyncresult了。eventstreamasyncresult类就是为这个而写的。
下面是使用的代码:

以下为引用的内容:
public void getresponsestream()
{
        eventstream es = new eventstream(tcpclient.netstream);
        es.onbeforeread += new eventhandler<fstreamdataeventargs>(eventstream_onbeforeread);
        es.onbeforewrite += new eventhandler<fstreamdataeventargs>(eventstream_onbeforewrite);
        return es;
}

回头看看代码,其实都在用decorator模式的思想,把原来的framework中的类都给包装起来,并在完成原来的功能之余另外加了自己的功能。

文笔一般,希望能对你有帮助。

注册会员,创建你的web开发资料库,
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表