首页 > 开发 > Java > 正文


2024-07-13 10:11:40




分布式队列是通用的数据结构,为了在 Zookeeper 中实现分布式队列,首先需要指定一个 Znode 节点作为队列节点(queue node), 各个分布式客户端通过调用 create() 函数向队列中放入数据,调用create()时节点路径名带"qn-"结尾,并设置顺序(sequence)节点标志。 由于设置了节点的顺序标志,新的路径名具有以下字符串模式:"_path-to-queue-node_/qn-X",X 是唯一自增号。需要从队列中获取数据/移除数据的客户端首先调用 getChildren() 函数,有数据则获取(获取数据后可以删除也可以不删),没有则在队列节点(queue node)上将 watch 设置为 true,等待触发并处理最小序号的节点(即从序号最小的节点中取数据)。





java;">package org.massive.common; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /**  * Created by Massive on 2016/12/18.  */ public class ZooKeeperClient {  private static String connectionString = "localhost:2181";  private static int sessionTimeout = 10000;  public static ZooKeeper getInstance() throws IOException, InterruptedException {  //--------------------------------------------------------------  // 为避免连接还未完成就执行zookeeper的get/create/exists操作引起的(KeeperErrorCode = ConnectionLoss)  // 这里等Zookeeper的连接完成才返回实例  //--------------------------------------------------------------  final CountDownLatch connectedSignal = new CountDownLatch(1);  ZooKeeper zk = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {   @Override   public void process(WatchedEvent event) {    if (event.getState() == Event.KeeperState.SyncConnected) {    connectedSignal.countDown();    } else if (event.getState() == Event.KeeperState.Expired) {    }   }   });  connectedSignal.await(sessionTimeout, TimeUnit.MILLISECONDS);  return zk;  }  public static int getSessionTimeout() {  return sessionTimeout;  }  public static void setSessionTimeout(int sessionTimeout) {  ZooKeeperClient.sessionTimeout = sessionTimeout;  } }


package org.massive.queue; import org.apache.commons.lang3.RandomUtils; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.massive.common.ZooKeeperClient; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.List; import java.util.SortedSet; import java.util.TreeSet; /**  * Created by Allen on 2016/12/22.  */ public class ZooKeeperQueue {  private ZooKeeper zk;  private int sessionTimeout;  private static byte[] ROOT_QUEUE_DATA = {0x12,0x34};  private static String QUEUE_ROOT = "/QUEUE";  private String queueName;  private String queuePath;  private Object mutex = new Object();  public ZooKeeperQueue(String queueName) throws IOException, KeeperException, InterruptedException {  this.queueName = queueName;  this.queuePath = QUEUE_ROOT + "/" + queueName;  this.zk = ZooKeeperClient.getInstance();  this.sessionTimeout = zk.getSessionTimeout();  //----------------------------------------------------  // 确保队列根目录/QUEUE和当前队列的目录的存在  //----------------------------------------------------  ensureExists(QUEUE_ROOT);  ensureExists(queuePath);  }  public byte[] consume() throws InterruptedException, KeeperException, UnsupportedEncodingException {  List<String> nodes = null;  byte[] returnVal = null;  Stat stat = null;  do {   synchronized (mutex) {   nodes = zk.getChildren(queuePath, new ProduceWatcher());   //----------------------------------------------------   // 如果没有消息节点,等待生产者的通知   //----------------------------------------------------   if (nodes == null || nodes.size() == 0) {    mutex.wait();   } else {    SortedSet<String> sortedNode = new TreeSet<String>();    for (String node : nodes) {    sortedNode.add(queuePath + "/" + node);    }    //----------------------------------------------------    // 消费队列里序列号最小的消息    //----------------------------------------------------    String first = sortedNode.first();    returnVal = zk.getData(first, false, stat);    zk.delete(first, -1);    System.out.print(Thread.currentThread().getName() + " ");    System.out.print("consume a message from queue:" + first);    System.out.println(", message data is: " + new String(returnVal,"UTF-8"));    return returnVal;   }   }  } while (true);  }  class ProduceWatcher implements Watcher {  @Override  public void process(WatchedEvent event) {   //----------------------------------------------------   // 生产一条消息成功后通知一个等待线程   //----------------------------------------------------   synchronized (mutex) {   mutex.notify();   }  }  }  public void produce(byte[] data) throws KeeperException, InterruptedException, UnsupportedEncodingException {  //----------------------------------------------------  // 确保当前队列目录存在  // example: /QUEUE/queueName  //----------------------------------------------------  ensureExists(queuePath);  String node = zk.create(queuePath + "/", data,   ZooDefs.Ids.OPEN_ACL_UNSAFE,   CreateMode.PERSISTENT_SEQUENTIAL);  System.out.print(Thread.currentThread().getName() + " ");  System.out.print("produce a message to queue:" + node);  System.out.println(" , message data is: " + new String(data,"UTF-8"));  }  public void ensureExists(String path) {  try {   Stat stat = zk.exists(path, false);   if (stat == null) {   zk.create(path, ROOT_QUEUE_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);   }  } catch (KeeperException e) {   e.printStackTrace();  } catch (InterruptedException e) {   e.printStackTrace();  }  }  public static void main(String[] args) throws IOException, InterruptedException, KeeperException {  String queueName = "test";  final ZooKeeperQueue queue = new ZooKeeperQueue(queueName);  for (int i = 0; i < 10; i++) {   new Thread(new Runnable() {   @Override   public void run() {    try {    queue.consume();    System.out.println("--------------------------------------------------------");    System.out.println();    } catch (InterruptedException e) {    e.printStackTrace();    } catch (KeeperException e) {    e.printStackTrace();    } catch (UnsupportedEncodingException e) {    e.printStackTrace();    }   }   }).start();  }  new Thread(new Runnable() {   @Override   public void run() {   for (int i = 0; i < 10; i++) {    try {    Thread.sleep(RandomUtils.nextInt(100 * i, 200 * i));    queue.produce(("massive" + i).getBytes());    } catch (InterruptedException e) {    e.printStackTrace();    } catch (KeeperException e) {    e.printStackTrace();    } catch (UnsupportedEncodingException e) {    e.printStackTrace();    }   }   }  },"Produce-thread").start();  } }



Produce-thread produce a message to queue:/QUEUE/test/0000000000 , message data is: massive0 Thread-8 consume a message from queue:/QUEUE/test/0000000000, message data is: massive0 -------------------------------------------------------- Produce-thread produce a message to queue:/QUEUE/test/0000000001 , message data is: massive1 Thread-6 consume a message from queue:/QUEUE/test/0000000001, message data is: massive1 -------------------------------------------------------- Produce-thread produce a message to queue:/QUEUE/test/0000000002 , message data is: massive2 Thread-3 consume a message from queue:/QUEUE/test/0000000002, message data is: massive2 -------------------------------------------------------- Produce-thread produce a message to queue:/QUEUE/test/0000000003 , message data is: massive3 Thread-0 consume a message from queue:/QUEUE/test/0000000003, message data is: massive3 -------------------------------------------------------- Produce-thread produce a message to queue:/QUEUE/test/0000000004 , message data is: massive4 Thread-5 consume a message from queue:/QUEUE/test/0000000004, message data is: massive4 -------------------------------------------------------- Produce-thread produce a message to queue:/QUEUE/test/0000000005 , message data is: massive5 Thread-2 consume a message from queue:/QUEUE/test/0000000005, message data is: massive5 -------------------------------------------------------- Produce-thread produce a message to queue:/QUEUE/test/0000000006 , message data is: massive6 Thread-4 consume a message from queue:/QUEUE/test/0000000006, message data is: massive6 -------------------------------------------------------- Produce-thread produce a message to queue:/QUEUE/test/0000000007 , message data is: massive7 Thread-9 consume a message from queue:/QUEUE/test/0000000007, message data is: massive7 -------------------------------------------------------- Produce-thread produce a message to queue:/QUEUE/test/0000000008 , message data is: massive8 Thread-7 consume a message from queue:/QUEUE/test/0000000008, message data is: massive8 -------------------------------------------------------- Produce-thread produce a message to queue:/QUEUE/test/0000000009 , message data is: massive9 Thread-1 consume a message from queue:/QUEUE/test/0000000009, message data is: massive9 




发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表