首页 > 学院 > 开发设计 > 正文

SignalR循序渐进(三)简易的集群通讯组件

2019-11-17 03:05:01
字体:
来源:转载
供稿:网友

SignalR循序渐进(三)简易的集群通讯组件

上一篇演示了泛型Hub的实现,微软于6月17日更新了SignalR 2.1.0,然后自带了泛型Hub,于是就不需要自己去实现了…(微软你为啥不早一个月自带啊…)。不过没关系,SignalR出彩之处不在泛型Hub,本篇为各位观众带来了基于SignalR的简易集群通讯组件Demo,可用于分布式定时任务。

说到集群,自然想到了NLB啊Cluster啊HPC啊等等。NLB受制于成员数量,Cluster用数量堆高可用性,HPC太复杂。本着SignalR的双向异步通讯的特点,其实是可以用来玩弹性计算的。初始状态由一台计算任务分发节点,一台监控以及一台计算节点构成。随着任务分发队列中的任务数越来越多,一台执行节点无法及时消耗待执行任务,达到某个阈值的时候,动态的加入一个计算节点来增加计算吞吐量。同样的,当队列中的任务基本处于很低的数量的时候,自动移除一个计算节点来减少资源消耗。当然,如果是大型的计算量之下,分发节点,队列都应该是集群的,还要考虑各种计算节点故障之类的问题,这不在本篇考虑的范畴内,本篇以初始状态模型来一步步实现简易集群通讯组件。

好,废话不说了,正篇开始。

任务分发节点

image

任务分发节点只有一个公开的行为,就是接受计算节点任务执行完成的消息。

下面是实现。

/// <summary>    /// 集群交换器    /// </summary>    public class ClusterHub : Hub<IClusterClient>    {        /// <summary>        ///         /// </summary>        static ClusterHub()        {            aliveDictionary = new ConcurrentDictionary<string, Guid>();        }                /// <summary>        ///         /// </summary>        /// <param name="dispatcher"></param>        public ClusterHub(IDispatcher dispatcher)        {            this.dispatcher = dispatcher;            db = OdbFactory.Open(localDbFileName);        }        /// <summary>        /// 本地数据库文件名        /// </summary>        const string localDbFileName = "ClusterStorage.dll";        /// <summary>        /// 监视器连接Id        /// </summary>        static string monitorConnectionId;        /// <summary>        /// 调度器        /// </summary>        IDispatcher dispatcher;        /// <summary>        /// 在线词典        /// </summary>        static ConcurrentDictionary<string, Guid> aliveDictionary;        /// <summary>        ///         /// </summary>        static IOdb db;        /// <summary>        /// 完成任务        /// </summary>        /// <param name="jobResult"></param>        public void Finished(Contracts.Messages.JobResultDto jobResult)        {            lock (db)            {                var members = db.AsQueryable<MemberDo>();                var member = members.SingleOrDefault(m => m.Id == Guid.Parse(jobResult.Id));                if (member != null)                {                    member.UpdateStatisticsInfo(jobResult.PRocessedTime);                    db.Store(member);                    if (!string.IsNullOrWhiteSpace(monitorConnectionId))                    {                        Clients.Client(monitorConnectionId).UpdateMemberStatisticsInfo(new Contracts.Messages.MemberStatisticsInfoDto() { Id = member.Id.ToString(), AverageProcessedTime = member.AverageProcessedTime });                    }                }            }            Clients.Caller.RunJob(dispatcher.GetJobId());        }        /// <summary>        /// 加入        /// </summary>        void Join()        {            object ip = string.Empty;            var isMonitor = Context.Request.QueryString["ClientRole"] == "Monitor";            Context.Request.Environment.TryGetValue("server.RemoteIpAddress", out ip);            lock (db)            {                var members = db.AsQueryable<MemberDo>();                var member = members.SingleOrDefault(m => m.Ip == ip.ToString() && m.IsMonitor == isMonitor);                if (member != null)                {                    member.MemberStatusType = MemberStatusTypeEnum.Connectioned;                }                else                {                    member = new MemberDo(ip.ToString(), isMonitor);                    if (isMonitor)                    {                        monitorConnectionId = Context.ConnectionId;                    }                }                db.Store(member);                aliveDictionary.TryAdd(Context.ConnectionId, member.Id);                if (!isMonitor)                {                    if (!string.IsNullOrWhiteSpace(monitorConnectionId))                    {                        Clients.Client(monitorConnectionId).MemberJoin(member.Id);                    }                    Clients.Caller.GetId(member.Id.ToString());                    Clients.Caller.RunJob(dispatcher.GetJobId());                }            }        }        /// <summary>        /// 离开        /// </summary>        void Leave()        {            var id = Guid.Empty;            aliveDictionary.TryRemove(Context.ConnectionId, out id);            lock (db)            {                var members = db.AsQueryable<MemberDo>();                var member = members.SingleOrDefault(m => m.Id == id);                if (member != null)                {                    member.MemberStatusType = MemberStatusTypeEnum.Disconnectioned;                    db.Store(member);                    if (member.IsMonitor)                    {                        monitorConnectionId = string.Empty;                    }                    else if (!string.IsNullOrWhiteSpace(monitorConnectionId))                    {                        Clients.Client(monitorConnectionId).MemberLeave(id);                    }                }            }        }        public override Task OnConnected()        {            Console.WriteLine(Context.ConnectionId+":Connected");            Join();            return base.OnConnected();        }        public override Task OnDisconnected()        {            Console.WriteLine(Context.ConnectionId + ":Disconnected");            Leave();            return base.OnDisconnected();        }        public override Task OnReconnected()        {            Console.WriteLine(Context.ConnectionId + ":Reconnected");            return base.OnReconnected();        }    }

ClusterHub承载着2种客户端角色的交互,计算节点和监控。

这边采用了一个轻量级的基于C#开发的无引擎对象数据库来存储客户端信息。

先说重载的部分:

OnConnected - 当有客户端连接的时候,执行Join方法。

OnDisconnected - 当有客户端离线的时候,执行Leave方法。

然后是私有方法:

Join - 根据QueryString来区分客户端类型是计算节点还是监视器,如果是计算节点,就直接通知监视器有成员加入,然后通过IDispatcher来获取任务Id,通知计算节点开始执行任务。

Leave - 计算节点离线的时候通知监视器。

公开方法:

Finished - 计算节点完成任务后就调用该方法,Hub将计算的一些统计信息更新到本地存储,同时通知监视器更新计算结果。

私有变量:

IDispatcher– 任务调度器接口,由外部组件来负责具体的实现。

计算节点

image

计算节点有两个行为:

GetId - 获取节点身份。

RunJob - 执行任务。

/// <summary>    /// 集群客户端    /// </summary>    public class ClusterClient    {        /// <summary>        ///         /// </summary>        /// <param name="jobProvider"></param>        public ClusterClient(IJobProvider jobProvider)        {            this.jobProvider = jobProvider;            url = ConfigurationManager.AppSettings["HubAddress"];            var queryStrings = new Dictionary<string, string>();            queryStrings.Add("ClientRole", "Normal");            connection = new HubConnection(url, queryStrings);            hubProxy = connection.CreateHubProxy(typeof(IClusterHub).GetCustomAttributes(typeof(DescriptionAttribute), false).OfType<DescriptionAttribute>().First().Description);            InitClientEvents();            connection.Start().Wait();        }        string url;        HubConnection connection;        IHubProxy hubProxy;        IJobProvider jobProvider;        string id;        /// <summary>        ///         /// </summary>        void InitClientEvents()        {            hubProxy.On("GetId", (id) => GetId(id));            hubProxy.On("RunJob", (jobId) => Ru
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表