public static void main(String[] args) throws InterruptedException{ final Basket basket = new Basket(); // 定义一个 producer Runnable producer = new Runnable() { public void run() { try { basket.produce(); } catch (InterruptedException ex) { ex.printStackTrace(); } } }; // 定义一个 consumer Runnable consumer = new Runnable() { public void run() { try { basket.consume(); } catch (InterruptedException ex) { ex.printStackTrace(); } } }; // 各产生 10 个 consumer 和 producer ExecutorService service = Executors.newCachedThreadPool(); for(int i=0; i < 10; i++) service.submit(consumer); Thread.sleep(2000); for(int i=0; i<10; i++) service.submit(producer); service.shutdown(); } }
5: Synchronizer :同步装置 Java 5.0 里新加了 4 个协调线程间进程的同步装置,它们分别是 Semaphore, CountDownLatch, CyclicBarrier 和 Exchanger. Semaphore: 用来管理一个资源池的工具, Semaphore 可以看成是个通行证,线程要想从资源池拿到资源必须先拿到通行证, Semaphore 提供的通行证数量和资源池的大小一致。如果线程暂时拿不到通行证,线程就会被阻断进入等待状态。以下是一个例子: public class Pool { ArrayList pool = null; Semaphore pass = null; public Pool(int size){ // 初始化资源池 pool = new ArrayList(); for(int i=0; i pool.add("Resource "+i); } //Semaphore 的大小和资源池的大小一致 pass = new Semaphore(size); } public String get() throws InterruptedException{ // 获取通行证 , 只有得到通行证后才能得到资源 pass.acquire(); return getResource(); } public void put(String resource){ // 归还通行证,并归还资源 pass.release(); releaseResource(resource); } private synchronized String getResource() { String result = pool.get(0); pool.remove(0); System.out.println("Give out "+result); return result; } private synchronized void releaseResource(String resource) { System.out.println("return "+resource); pool.add(resource); } }
SemaphoreTest: public class SemaphoreTest { public static void main(String[] args){ final Pool aPool = new Pool(2); Runnable worker = new Runnable() { public void run() { String resource = null; try { // 取得 resource resource = aPool.get(); } catch (InterruptedException ex) { ex.printStackTrace(); } // 用 resource 做工作 System.out.println("I worked on "+resource); // 归还 resource aPool.put(resource); } }; ExecutorService service = Executors.newCachedThreadPool(); for(int i=0; i<20; i++){ service.submit(worker); } service.shutdown(); } }
CountDownLatch: CountDownLatch 是个计数器,它有一个初始数,等待这个计数器的线程必须等到计数器倒数到零时才可继续。比如说一个 Server 启动时需要初始化 4 个部件, Server 可以同时启动 4 个线程去初始化这 4 个部件,然后调用 CountDownLatch(4).await() 阻断进入等待,每个线程完成任务后会调用一次 CountDownLatch.countDown() 来倒计数 , 当 4 个线程都结束时 CountDownLatch 的计数就会降低为 0 ,此时 Server 就会被唤醒继续下一步操作。 CountDownLatch 的方法主要有: await() :使调用此方法的线程阻断进入等待 countDown(): 倒计数,将计数值减 1 getCount(): 得到当前的计数值 CountDownLatch 的例子:一个 server 调了三个 ComponentThread 分别去启动三个组件,然后 server 等到组件都启动了再继续。 public class Server { public static void main(String[] args) throws InterruptedException{ System.out.println("Server is starting."); // 初始化一个初始值为 3 的 CountDownLatch CountDownLatch latch = new CountDownLatch(3); // 起 3 个线程分别去启动 3 个组件 ExecutorService service = Executors.newCachedThreadPool(); service.submit(new ComponentThread(latch, 1)); service.submit(new ComponentThread(latch, 2)); service.submit(new ComponentThread(latch, 3)); service.shutdown(); // 进入等待状态 latch.await(); // 当所需的三个组件都完成时, Server 就可继续了 System.out.println("Server is up!"); } }
public class ComponentThread implements Runnable{ CountDownLatch latch; int ID; /** Creates a new instance of ComponentThread */ public ComponentThread(CountDownLatch latch, int ID) { this.latch = latch; this.ID = ID; } public void run() { System.out.println("Component "+ID + " initialized!"); // 将计数减一 latch.countDown(); } }
运行结果: Server is starting. Component 1 initialized! Component 3 initialized! Component 2 initialized! Server is up!
CyclicBarrier: CyclicBarrier 类似于 CountDownLatch 也是个计数器,不同的是 CyclicBarrier 数的是调用了 CyclicBarrier.await() 进入等待的线程数,当线程数达到了 CyclicBarrier 初始时规定的数目时,所有进入等待状态的线程被唤醒并继续。 CyclicBarrier 就象它名字的意思一样,可看成是个障碍,所有的线程必须到齐后才能一起通过这个障碍。 CyclicBarrier 初始时还可带一个 Runnable 的参数,此 Runnable 任务在 CyclicBarrier 的数目达到后,所有其它线程被唤醒前被执行。 CyclicBarrier 提供以下几个方法: await() :进入等待 getParties() :返回此 barrier 需要的线程数 reset() :将此 barrier 重置 以下是使用 CyclicBarrier 的一个例子:两个线程分别在一个数组里放一个数,当这两个线程都结束后,主线程算出数组里的数的和(这个例子比较无聊,我没有想到更合适的例子) public class MainThread { public static void main(String[] args) throws InterruptedException, BrokenBarrierException, TimeoutException{ final int[] array = new int[2]; CyclicBarrier barrier = new CyclicBarrier(2, new Runnable() {// 在所有线程都到达 Barrier 时执行 public void run() { System.out.println("Total is:"+(array[0]+array[1])); } }); // 启动线程 new Thread(new ComponentThread(barrier, array, 0)).start(); new Thread(new ComponentThread(barrier, array, 1)).start(); } }
public class ComponentThread implements Runnable{ CyclicBarrier barrier; int ID; int[] array; public ComponentThread(CyclicBarrier barrier, int[] array, int ID) { this.barrier = barrier; this.ID = ID; this.array = array; } public void run() { try { array[ID] = new Random().nextInt(); System.out.println(ID+ " generates:"+array[ID]); // 该线程完成了任务等在 Barrier 处 barrier.await(); } catch (BrokenBarrierException ex) { ex.printStackTrace(); } catch (InterruptedException ex) { ex.printStackTrace(); } } }