首页 > 学院 > 开发设计 > 正文

ZooKeeper(3.4.5)

2019-11-14 23:07:34
字体:
来源:转载
供稿:网友
ZooKeeper(3.4.5) - 使用 Curator(2.7.0) 监听事件

ZooKeeper原生的API支持通过注册Watcher来进行事件监听,但是Watcher通知是一次性的,因此开发过程中需要反复注册Watcher,比较繁琐。Curator引入了Cache来监听ZooKeeper服务端的事件。Cache对ZooKeeper事件监听进行了封装,能够自动处理反复注册监听,简化了ZooKeeper原生API繁琐的开发过程。

简单的示例:

package com.huey.dream.demo;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.recipes.cache.NodeCache;import org.apache.curator.framework.recipes.cache.NodeCacheListener;import org.apache.curator.framework.recipes.cache.PathChildrenCache;import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;import org.apache.curator.retry.ExponentialBackoffRetry;/** * Curator事件监听 * @author  huey * @version 1.0  * @created 2015-3-2 */public class CarutorDemo {    public static void main(String[] args) throws Exception {        CuratorFramework client = CuratorFrameworkFactory.builder()            .connectString("192.168.1.109:2181")            .sessionTimeoutMs(5000)            .connectionTimeoutMs(3000)            .retryPolicy(new ExponentialBackoffRetry(1000, 3))            .build();        client.start();                client.create()            .creatingParentsIfNeeded()            .forPath("/zk-huey/cnode", "hello".getBytes());                /**         * 在注册监听器的时候,如果传入此参数,当事件触发时,逻辑由线程池处理         */        ExecutorService pool = Executors.newFixedThreadPool(2);                /**         * 监听数据节点的变化情况         */        final NodeCache nodeCache = new NodeCache(client, "/zk-huey/cnode", false);        nodeCache.start(true);        nodeCache.getListenable().addListener(            new NodeCacheListener() {                @Override                public void nodeChanged() throws Exception {                    System.out.PRintln("Node data is changed, new data: " +                         new String(nodeCache.getCurrentData().getData()));                }            },             pool        );                /**         * 监听子节点的变化情况         */        final PathChildrenCache childrenCache = new PathChildrenCache(client, "/zk-huey", true);        childrenCache.start(StartMode.POST_INITIALIZED_EVENT);        childrenCache.getListenable().addListener(            new PathChildrenCacheListener() {                @Override                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)                        throws Exception {                        switch (event.getType()) {                        case CHILD_ADDED:                            System.out.println("CHILD_ADDED: " + event.getData().getPath());                            break;                        case CHILD_REMOVED:                            System.out.println("CHILD_REMOVED: " + event.getData().getPath());                            break;                        case CHILD_UPDATED:                            System.out.println("CHILD_UPDATED: " + event.getData().getPath());                            break;                        default:                            break;                    }                }            },            pool        );                client.setData().forPath("/zk-huey/cnode", "world".getBytes());                Thread.sleep(10 * 1000);        pool.shutdown();        client.close();    }}


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表