首页 > 学院 > 开发设计 > 正文

【Mrpc】Demo2 基于Zookeeper的服务器感知及负载均衡

2019-11-14 09:14:48
字体:
来源:转载
供稿:网友

由于服务器经常会维护,把服务实现服务端的地址直接写在客户端(包括配置文件里)都是不太好的。比较好的办法是服务端启动的时候把自己的地址注册到ZooKeeper集群中,然后客户端启动时即可获得所有服务器列表,并选择一个合适的服务器为自己提供服务。

这样解决了服务端地址的获取问题,同时也解决了服务端的负载均衡问题。

来看一下相应的代码。项目源码已经上传到http://download.csdn.net/detail/mrbcy/9747568

package tech.mrbcy.mrpc.demo.demo2;import java.io.IOException;import java.util.ArrayList;import java.util.List;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.Watcher.Event.EventType;import org.apache.zookeeper.ZooDefs.Ids;import org.apache.zookeeper.data.Stat;public class ServerAddrHelper { public static final String DEFAULT_GROUP_NAME = "/MrpcServers"; PRivate String connString; private String groupName; private ZooKeeper zk; private ServerChangeListener listener; private static int sessionTimeout = 2000; public ServerAddrHelper(String connString){ this.connString = connString; this.groupName = DEFAULT_GROUP_NAME; } /** * * @param connString zk连接字符串 * @param groupName 父节点路径,位于/下,需要带/ 示例值:"/MrpcServers" */ public ServerAddrHelper(String connString, String groupName){ this.connString = connString; if(!groupName.startsWith("/")){ groupName = "/" + groupName; } this.groupName = groupName; } /** * 向ZooKeeper集群注册服务器 * @param registPath 服务器节点路径,示例值"server" * @param address 服务器地址及端口号,用于客户端连接 * @throws Exception 连接服务器失败 */ public void registServer(String registPath, String address) throws Exception{ zk = new ZooKeeper(connString,sessionTimeout,null); // 判断父目录是否存在,不存在则创建 Stat groupStat = zk.exists(groupName, false); if(groupStat == null){ zk.create(groupName, "Mrpc server list".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } // 注册服务器 if(!registPath.startsWith("/")){ registPath = "/" + registPath; } String registAddr = zk.create(groupName+registPath, address.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("Server is starting, reg addr:" + registAddr); } /** * 发现服务器 * @param listener 监听器,如果不是null,等到服务器列表发生变化时,监听器会收到通知 * @return * @throws Exception */ public List<String> discoverServers(ServerChangeListener listener) throws Exception{ this.listener = listener; zk = new ZooKeeper(connString,sessionTimeout,new Watcher(){ public void process(WatchedEvent event) { if(event.getType() == EventType.NodeChildrenChanged){ // 服务器列表发生变化 try { List<String> servers = getServerList(); if(ServerAddrHelper.this.listener != null){ ServerAddrHelper.this.listener.onChange(servers); } } catch (Exception e) { e.printStackTrace(); } } } }); return getServerList(); } private List<String> getServerList() throws Exception { zk.getChildren(groupName, true); List<String> children = zk.getChildren(groupName, true); List<String> servers = new ArrayList<String>(); for(String child : children) { byte[] data = zk.getData(groupName+"/"+child, null, null); servers.add(new String(data)); } return servers; }}

一般来讲registServer由服务端框架调用,向zk集群注册自己。discoverServer用客户端框架调用。取得当前可用的服务器列表。并且提供了服务器列表发生变化时的监听接口。

另外还实现了一个负载均衡器,使用最简单的随机选择算法来挑选服务器。后续还可以进行改进,比如根据服务端当前的负载来选择服务器。代码如下:

package tech.mrbcy.mrpc.demo.demo2;import java.net.InetSocketAddress;import java.util.List;import java.util.Random;public class ServerLoadBalancer { /** * 选择一个服务器 * @param servers 服务器列表 示例值:133.122.5.88:8888 或 anode2:5884 * @return 连接服务器地址 */ public static InetSocketAddress chooseServer(List<String> servers){ if(servers == null || servers.size() == 0){ return null; } // 随机选择一个服务器 String serverAddr = servers.get(0); if(servers.size() > 1){ int index = new Random().nextInt(servers.size()); serverAddr = servers.get(index); } String[] addrAndPort = serverAddr.split(":"); if(addrAndPort.length != 2){ throw new RuntimeException("不合法的server地址:" + serverAddr); } return new InetSocketAddress(addrAndPort[0], Integer.parseInt(addrAndPort[1])); }}

最后给出一个测试类的代码。其他的详细信息请看上传的项目代码吧。

package tech.mrbcy.mrpc.demo.demo2;import java.net.InetSocketAddress;import java.util.List;import org.junit.Test;public class MockClient { private InetSocketAddress serverAddress; @Test public void testClient(){ ServerAddrHelper serverHelper = new ServerAddrHelper("amaster:2181,anode1:2181,anode2:2181"); ServerAddrHelper helper = new ServerAddrHelper("amaster:2181,anode1:2181,anode2:2181"); try { serverHelper.registServer("ServiceImplServer", "localhost:10000"); List<String> serverList = helper.discoverServers(new ServerChangeListener() { public void onChange(List<String> servers) { System.out.println("服务器列表发生变化,当前服务器列表为:"); System.out.println(servers); changeToServer(servers); } }); System.out.println(serverList); if(serverList == null || serverList.size() == 0){ System.out.println("没有可用的服务器"); } changeToServer(serverList); Thread.sleep(1000); serverHelper.registServer("ServiceImplServer", "localhost:10001"); Thread.sleep(1000); serverHelper.registServer("ServiceImplServer", "localhost:10002"); // 这期间可以手动删除已经连接的服务器,测试服务器断线的情况 Thread.sleep(50000); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } private void changeToServer(List<String> servers) { if(servers == null || servers.size() == 0){ return; } // 未指定服务器地址或原服务器已失效,迁移到新的服务器 boolean valid = false; if(servers.size() > 0 && serverAddress != null){ for(String server:servers){ if(server.equals(serverAddress.getHostString() + ":" + serverAddress.getPort())){ valid = true; break; } } } if(serverAddress == null || !valid){ serverAddress = ServerLoadBalancer.chooseServer(servers); System.out.println("未指定服务器地址或原服务器已失效,迁移到新的服务器:" + serverAddress.getHostString() + ":" + serverAddress.getPort()); } }}

输出结果如下:

Server is starting, reg addr:/MrpcServers/ServiceImplServer0000000030[localhost:10000]未指定服务器地址或原服务器已失效,迁移到新的服务器:localhost:10000Server is starting, reg addr:/MrpcServers/ServiceImplServer0000000031服务器列表发生变化,当前服务器列表为:[localhost:10001, localhost:10000]Server is starting, reg addr:/MrpcServers/ServiceImplServer0000000032服务器列表发生变化,当前服务器列表为:[localhost:10001, localhost:10002, localhost:10000]服务器列表发生变化,当前服务器列表为:[localhost:10001, localhost:10002]未指定服务器地址或原服务器已失效,迁移到新的服务器:localhost:10002
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表