线程池能够复用线程,减少线程创建,销毁,恢复等状态切换的开销,提高程序的性能。一个线程池管理了一组工作线程,同时它还包括了一个用于放置等待执行的任务的队列。
ThreadPoolExecutor类中定义了一些与线程状态与活动线程数相关的一些变量,如下:
[java] view plain copyPRint?private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 将整型的24位分为高3位和低29位,高3位表示线程池的状态,低29位表示活动的线程数 private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1;// 29位能表示的最大二进制整数,也就是活动线程数 // 高3位数值代表的线程池状态 private static final int RUNNING = -1 << COUNT_BITS; // running 线程池能接受新任务 private static final int SHUTDOWN = 0 << COUNT_BITS; // shutdown 线程池不再接受新任务 private static final int STOP = 1 << COUNT_BITS; // stop 线程池不再接受新任务,不再执行队列中的任务,而且要中断正在处理的任务 private static final int TIDYING = 2 << COUNT_BITS; // tidying 线程池所有任务均已终止 private static final int TERMINATED = 3 << COUNT_BITS; // terminated terminated()方法执行结束private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 将整型的24位分为高3位和低29位,高3位表示线程池的状态,低29位表示活动的线程数 private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1;// 29位能表示的最大二进制整数,也就是活动线程数 // 高3位数值代表的线程池状态 private static final int RUNNING = -1 << COUNT_BITS; // running 线程池能接受新任务 private static final int SHUTDOWN = 0 << COUNT_BITS; // shutdown 线程池不再接受新任务 private static final int STOP = 1 << COUNT_BITS; // stop 线程池不再接受新任务,不再执行队列中的任务,而且要中断正在处理的任务 private static final int TIDYING = 2 << COUNT_BITS; // tidying 线程池所有任务均已终止 private static final int TERMINATED = 3 << COUNT_BITS; // terminated terminated()方法执行结束由如上可知:
ctl是一个AtomicInteger类型的原子对象。ctl记录了"线程池中的任务数量"和"线程池状态"2个信息。ctl共包括32位。其中,高3位表示"线程池状态",低29位表示"线程池中的任务数量"。
RUNNING -- 对应的高3位值是111SHUTDOWN -- 对应的高3位值是000STOP -- 对应的高3位值是001TIDYING -- 对应的高3位值是010TERMINATED -- 对应的高3位值是011线程池各个状态之间的切换如下图所示:线程池各个状态间的转换的详细解释如下所示。
1> RUNNING(111) -> SHUTDOWN(000) : 调用了shutdown方法,线程池实现了finalize方法,在里面调用了shutdown方法,因此shutdown可能是在finalize中被隐式调用的
2> (RUNNING(111) or SHUTDOWN(000)) -> STOP(001) 调用了shutdownNow方法
3> SHUTDOWN(000) -> TIDYING(010) : 当队列和线程池均为空的时候
4> STOP(001) -> TIDYING(010) : 当线程池为空的时候
5> TIDYING(010) -> TERMINATED(011) : terminated()方法调用完毕
说明:扩号后的3位数字表示ctl的高3位二进制值,并不关注低29位二进制的值
还有一些对常量的操作方法,只说明部分,其他的有兴趣自己可以去查看,如下:
[java] view plain copyprint?private static int runStateOf(int c) { return c & ~CAPACITY; } // 得到线程运行状态 private static int workerCountOf(int c) { return c & CAPACITY; } // 得到活动线程数 private static int ctlOf(int rs, int wc) { return rs | wc; } // 得到两者表示的值private static int runStateOf(int c) { return c & ~CAPACITY; } // 得到线程运行状态 private static int workerCountOf(int c) { return c & CAPACITY; } // 得到活动线程数 private static int ctlOf(int rs, int wc) { return rs | wc; } // 得到两者表示的值来看一下ThreadPoolExecutor()中最主要的一个构造函数,如下:
[java] view plain copyprint?public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;}调用Executors方法中的几个方法,如newCachedThreadPool()、newFixedThreadPool()时,都会间接调用上面的构造方法来初始化所有的线程池相关变量。
1、创建线程池并执行任务
有了Executor对象后,就可以调用execute()方法执行任务了。方法的源代码如下:
[java] view plain copyprint?public void execute(Runnable command) { if (command == null) // 任务为null,则抛出异常 throw new NullPointerException(); int c = ctl.get(); // 取出记录着runState和workerCount 的 ctl的当前值 /* * 通过workerCountOf方法从ctl所表示的int值中提取出低29位的值,也就是当前活动的线程数。如果当前 * 活动的线程数少于corePoolSize,则通过addWorker(command, true)新建一个线程,并将任务(command) * 添加到该线程中 */ if (workerCountOf(c) < corePoolSize) { /* * addWorker()返回值表示: * 1、true 表示需要检测当前运行的线程是否小于corePoolSize * 2、false 表示需要检测当前运行的线程数量是否小于maxPoolSize */ if (addWorker(command, true)) return; // 新线程创建成功,终止该方法的执行 c = ctl.get(); // 任务添加到线程失败,取出记录着runState和workerCount 的 ctl的当前值 } /* * 方法解释: * isRunning(c) 当前线程池是否处于运行状态。源代码是通过判断c < SHUTDOWN 来确定返回值。由于RUNNING才会接收新任务,且只有这个值-1才小于SHUTDOWN * workQueue.offer(command) 任务添加到缓冲队列 */ if (isRunning(c) && workQueue.offer(command)) {// 当前线程处于运行状态且成功添加到缓冲队列 int recheck = ctl.get(); /* * 如果 线程池已经处于非运行状态,则从缓冲队列中移除任务然后采用线程池指定的策略拒绝任务 * 如果 线程池中任务数量为0,则通过addWorker(null, false)尝试新建一个线程,新建线程对应的任务为null */ if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) // 得到活动线程数为0 addWorker(null, false); } /* * 当不满足以下两个条件时执行如下代码: * 1. 当前线程池并不处于Running状态 * 2. 当前线程池处于Running状态,但是缓冲队列已经满了 */ else if (!addWorker(command, false)) reject(command); // 采用线程池指定的策略拒绝任务 }public void execute(Runnable command) { if (command == null) // 任务为null,则抛出异常 throw new NullPointerException(); int c = ctl.get(); // 取出记录着runState和workerCount 的 ctl的当前值 /* * 通过workerCountOf方法从ctl所表示的int值中提取出低29位的值,也就是当前活动的线程数。如果当前 * 活动的线程数少于corePoolSize,则通过addWorker(command, true)新建一个线程,并将任务(command) * 添加到该线程中 */ if (workerCountOf(c) < corePoolSize) { /* * addWorker()返回值表示: * 1、true 表示需要检测当前运行的线程是否小于corePoolSize * 2、false 表示需要检测当前运行的线程数量是否小于maxPoolSize */ if (addWorker(command, true)) return; // 新线程创建成功,终止该方法的执行 c = ctl.get(); // 任务添加到线程失败,取出记录着runState和workerCount 的 ctl的当前值 } /* * 方法解释: * isRunning(c) 当前线程池是否处于运行状态。源代码是通过判断c < SHUTDOWN 来确定返回值。由于RUNNING才会接收新任务,且只有这个值-1才小于SHUTDOWN * workQueue.offer(command) 任务添加到缓冲队列 */ if (isRunning(c) && workQueue.offer(command)) {// 当前线程处于运行状态且成功添加到缓冲队列 int recheck = ctl.get(); /* * 如果 线程池已经处于非运行状态,则从缓冲队列中移除任务然后采用线程池指定的策略拒绝任务 * 如果 线程池中任务数量为0,则通过addWorker(null, false)尝试新建一个线程,新建线程对应的任务为null */ if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) // 得到活动线程数为0 addWorker(null, false); } /* * 当不满足以下两个条件时执行如下代码: * 1. 当前线程池并不处于Running状态 * 2. 当前线程池处于Running状态,但是缓冲队列已经满了 */ else if (!addWorker(command, false)) reject(command); // 采用线程池指定的策略拒绝任务 }当前活动的线程小于corePoolSize了,那么等于和大于corePoolSize怎么处理呢?
1> 当前活动的线程数量 >= corePoolSize 的时候,都是优先添加到队列中,直到队列满了才会去创建新的线程,在这里第20行的if语句已经体现出来了。这里利用了&&的特性,只有当第一个条件会真时才会去判断第二个条件,第一个条件是isRunning(),判断线程池是否处于RUNNING状态,因为只有在这个状态下才会接受新任务,否则就拒绝,如果正处于RUNNING状态,那么就加入队列,如果加入失败可能就是队列已经满了,这时候直接执行第29行。
2> 在execute()方法中,当 当前活动的线程数量 < corePoolSize 时,会执行addWorker()方法,关于addWorker(),它是用来直接新建线程用的,之所以叫addWorker而不是addThread是因为在线程池中,所有的线程都用一个Worker对象包装着,来看一下这个方法:
[java] view plain copyprint?/** * 创建并执行新线程 * @param firstTack 用于指定新增的线程执行的第一个任务 * * @param core true表示在新增线程时会判断当前活动线程数是否少于corePoolSize, * false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize * * @return 是否成功新增一个线程 */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); // 获取记录着runState和workCount的int变量的当前值 int rs = runStateOf(c); // 获取当前线程池运行的状态 /* 这个条件代表着以下几个情景,就直接返回false说明线程创建失败: 1.rs > SHUTDOWN; 此时不再接收新任务,且所有的任务已经执行完毕 2.rs = SHUTDOWN; 此时不再接收新任务,但是会执行队列中的任务,在后买年的或语句中,第一个不成立,firstTask != null成立 3.rs = SHUTDOWN;此时不再接收新任务,fistTask == null,任务队列workQueue已经空了 */ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //获取当前活动的线程数 int wc = workerCountOf(c); //先判断当前活动的线程数是否大于最大值,如果超过了就直接返回false说明线程创建失败 //如果没有超过再根据core的值再进行以下判断 /* 1.core为true,则判断当前活动的线程数是否大于corePoolSize 2.core为false,则判断当前活动线程数是否大于maximumPoolSize */ if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //比较当前值是否和c相同,如果相同,则改为c+1,并且跳出大循环,直接执行Worker进行线程创建 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // 获取ctl的当前值 if (runStateOf(c) != rs) //检查下当前线程池的状态是否已经发生改变 continue retry; //如果已经改变了,则进行外层retry大循环,否则只进行内层的循环 // else CAS failed due to workerCount change; retry inner loop } } //下面这里就是开始创建新的线程了 //Worker的也是Runnable的实现类 Worker w = new Worker(firstTask); //因为不可以直接在Worker的构造方法中进行线程创建 //所以要把它的引用赋给t方便后面进行线程创建 Thread t = w.thread; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //再次取出ctl的当前值,用于进行状态的检查,防止线程池的已经状态改变了 int c = ctl.get(); int rs = runStateOf(c); //将if语句中的条件转换为一个等价实现 :t == null || (rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null)) //有个t == null是因为如果使用的是默认的ThreadFactory的话,那么它的newThread()可能会返回null /* 1. 如果t == null, 则减少一个线程数,如果线程池处于的状态 > SHUTDOWN,则尝试终止线程池 2. 如果t != null,且rs == SHUTDOWN,则不再接收新任务,若firstTask != null,则此时也是返回false,创建线程失败 3. 如果t != null, 且rs > SHUTDOWN,同样不再接受新任务,此时也是返回false,创建线程失败 */ if (t == null || (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null))) { decrementWorkerCount(); //减少一个活动的当前线程数 tryTerminate(); //尝试终止线程池 return false; //返回线程创建失败 } workers.add(w); //将创建的线程添加到workers容器中 int s = workers.size(); //获取当前线程活动的数量 if (s > largestPoolSize) //判断当前线程活动的数量是否超过线程池最大的线程数量 largestPoolSize = s; //当池中的工作线程创新高时,会将这个数记录到largestPoolSize字段中。然后就可以启动这个线程t了 } finally { mainLock.unlock(); } t.start(); //开启线程 //若start后,状态又变成了SHUTDOWN状态(如调用了shutdownNow方法)且新建的线程没有被中断过, //就要中断该线程(shutdownNow方法要求中断正在执行的线程), //shutdownNow方法本身也会去中断存储在workers中的所有线程 if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted()) t.interrupt(); return true; }/** * 创建并执行新线程 * @param firstTack 用于指定新增的线程执行的第一个任务 * * @param core true表示在新增线程时会判断当前活动线程数是否少于corePoolSize, * false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize * * @return 是否成功新增一个线程 */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); // 获取记录着runState和workCount的int变量的当前值 int rs = runStateOf(c); // 获取当前线程池运行的状态 /* 这个条件代表着以下几个情景,就直接返回false说明线程创建失败: 1.rs > SHUTDOWN; 此时不再接收新任务,且所有的任务已经执行完毕 2.rs = SHUTDOWN; 此时不再接收新任务,但是会执行队列中的任务,在后买年的或语句中,第一个不成立,firstTask != null成立 3.rs = SHUTDOWN;此时不再接收新任务,fistTask == null,任务队列workQueue已经空了 */ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //获取当前活动的线程数 int wc = workerCountOf(c); //先判断当前活动的线程数是否大于最大值,如果超过了就直接返回false说明线程创建失败 //如果没有超过再根据core的值再进行以下判断 /* 1.core为true,则判断当前活动的线程数是否大于corePoolSize 2.core为false,则判断当前活动线程数是否大于maximumPoolSize */ if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //比较当前值是否和c相同,如果相同,则改为c+1,并且跳出大循环,直接执行Worker进行线程创建 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // 获取ctl的当前值 if (runStateOf(c) != rs) //检查下当前线程池的状态是否已经发生改变 continue retry; //如果已经改变了,则进行外层retry大循环,否则只进行内层的循环 // else CAS failed due to workerCount change; retry inner loop } } //下面这里就是开始创建新的线程了 //Worker的也是Runnable的实现类 Worker w = new Worker(firstTask); //因为不可以直接在Worker的构造方法中进行线程创建 //所以要把它的引用赋给t方便后面进行线程创建 Thread t = w.thread; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //再次取出ctl的当前值,用于进行状态的检查,防止线程池的已经状态改变了 int c = ctl.get(); int rs = runStateOf(c); //将if语句中的条件转换为一个等价实现 :t == null || (rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null)) //有个t == null是因为如果使用的是默认的ThreadFactory的话,那么它的newThread()可能会返回null /* 1. 如果t == null, 则减少一个线程数,如果线程池处于的状态 > SHUTDOWN,则尝试终止线程池 2. 如果t != null,且rs == SHUTDOWN,则不再接收新任务,若firstTask != null,则此时也是返回false,创建线程失败 3. 如果t != null, 且rs > SHUTDOWN,同样不再接受新任务,此时也是返回false,创建线程失败 */ if (t == null || (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null))) { decrementWorkerCount(); //减少一个活动的当前线程数 tryTerminate(); //尝试终止线程池 return false; //返回线程创建失败 } workers.add(w); //将创建的线程添加到workers容器中 int s = workers.size(); //获取当前线程活动的数量 if (s > largestPoolSize) //判断当前线程活动的数量是否超过线程池最大的线程数量 largestPoolSize = s; //当池中的工作线程创新高时,会将这个数记录到largestPoolSize字段中。然后就可以启动这个线程t了 } finally { mainLock.unlock(); } t.start(); //开启线程 //若start后,状态又变成了SHUTDOWN状态(如调用了shutdownNow方法)且新建的线程没有被中断过, //就要中断该线程(shutdownNow方法要求中断正在执行的线程), //shutdownNow方法本身也会去中断存储在workers中的所有线程 if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted()) t.interrupt(); return true; }那么在创建线程的时候,线程执行的是什么的呢?
我们前面提到Worker继承的其实也是Runnable,它在创建线程的时候是以自身作为任务传进先创建的线程中的,这段比较简单,我就不一一注释了,只是给出源代码给大家看吧。
Worker(Runnable firstTask) { this.firstTask = firstTask; //this指的是worker对象本身 this.thread = getThreadFactory().newThread(this); }它以自身的对象作为线程任务传进去,那么它的run方法又是怎样的呢?
public void run() { runWorker(this); }竟然只有一句话调用runWorker()方法,这个可是重头戏,我们来看看,究竟运行的是什么。
[java] view plain copyprint?/** * 执行Worker中的任务,它的执行流程是这样的: * 若存在第一个任务,则先执行第一个任务,否则,从队列中拿任务,不断的执行, * 直到getTask()返回null或执行任务出错(中断或任务本身抛出异常),就退出while循环。 * @param w woker */ final void runWorker(Worker w) { Runnable task = w.firstTask; //将当前Worker中的任务取出来交给task,并释放掉w.firstTask占用的内存 w.firstTask = null; //用于判断线程是否由于异常终止,如果不是异常终止,在后面将会将该变量的值改为false //该变量的值在processWorkerExit()会使用来判断线程是否由于异常终止 boolean completedAbruptly = true; try { //执行任务,直到getTask()返回的值为null,在此处就相当于复用了线程,让线程执行了多个任务 while (task != null || (task = getTask()) != null) { w.lock(); clearInterruptsForTaskRun();//对线程池状态进行一次判断,后面我们会讲解一下该方法 try { beforeExecute(w.thread, task); //在任务执行前需要做的逻辑方法,该方面可以由用户进行重写自定义 Throwable thrown = null; try { task.run(); //开始执行任务 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); //在任务执行后需要做的逻辑方法,该方面可以由用户进行重写自定义 } } finally { task = null; w.completedTasks++; //增加该线程完成的任务 w.unlock(); } } completedAbruptly = false; //线程不是异常终止 } finally { processWorkerExit(w, completedAbruptly); //结束该线程 } }/** * 执行Worker中的任务,它的执行流程是这样的: * 若存在第一个任务,则先执行第一个任务,否则,从队列中拿任务,不断的执行, * 直到getTask()返回null或执行任务出错(中断或任务本身抛出异常),就退出while循环。 * @param w woker */ final void runWorker(Worker w) { Runnable task = w.firstTask; //将当前Worker中的任务取出来交给task,并释放掉w.firstTask占用的内存 w.firstTask = null; //用于判断线程是否由于异常终止,如果不是异常终止,在后面将会将该变量的值改为false //该变量的值在processWorkerExit()会使用来判断线程是否由于异常终止 boolean completedAbruptly = true; try { //执行任务,直到getTask()返回的值为null,在此处就相当于复用了线程,让线程执行了多个任务 while (task != null || (task = getTask()) != null) { w.lock(); clearInterruptsForTaskRun();//对线程池状态进行一次判断,后面我们会讲解一下该方法 try { beforeExecute(w.thread, task); //在任务执行前需要做的逻辑方法,该方面可以由用户进行重写自定义 Throwable thrown = null; try { task.run(); //开始执行任务 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); //在任务执行后需要做的逻辑方法,该方面可以由用户进行重写自定义 } } finally { task = null; w.completedTasks++; //增加该线程完成的任务 w.unlock(); } } completedAbruptly = false; //线程不是异常终止 } finally { processWorkerExit(w, completedAbruptly); //结束该线程 } }下面就是线程在执行任务之前对线程池状态的一次判断:[java] view plain copyprint?/** * 对线程的结束做一些清理和数据同步 * @param w 封装线程的Worker * @param completedAbruptly 表示该线程是否结束于异常 */ private void processWorkerExit(Worker w, boolean completedAbruptly) { // 如果completedAbruptly值为true,则说明线程是结束于异常 //如果不是结束于异常,那么它降在runWorker方法的while循环中的getTask()方法中已经减一了 if (completedAbruptly) decrementWorkerCount(); //此时将线程数量减一 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; //统计总共完成的任务数 workers.remove(w); //将该线程数从workers容器中移除 } finally { mainLock.unlock(); } tryTerminate(); //尝试终止线程池 int c = ctl.get(); //接下来的这个if块要做的事儿了。当池的状态还是RUNNING, //又要分两种情况,一种是异常结束,一种是正常结束。异常结束比较好弄,直接加个线程替换死掉的线程就好了, //也就是最后的addWorker操作 if (runStateLessThan(c, STOP)) { //如果当前运行状态为RUNNING,SHUTDOWN if (!completedAbruptly) { //如果线程不是结束于异常 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //是否允许线程超时结束 if (min == 0 && ! workQueue.isEmpty()) //如果允许把那个且队列不为空 min = 1; //至少要保留一个线程来完成任务 //如果当前活动的线程数大于等于最小的值 // 1.不允许核心线程超时结束,则必须要使得活动线程数超过corePoolSize数才可以 // 2. 允许核心线程超时结束,但是队列中有任务,必须留至少一个线程 if (workerCountOf(c) >= min) return; // replacement not needed } //直接加个线程 addWorker(null, false); } }/** * 对线程的结束做一些清理和数据同步 * @param w 封装线程的Worker * @param completedAbruptly 表示该线程是否结束于异常 */ private void processWorkerExit(Worker w, boolean completedAbruptly) { // 如果completedAbruptly值为true,则说明线程是结束于异常 //如果不是结束于异常,那么它降在runWorker方法的while循环中的getTask()方法中已经减一了 if (completedAbruptly) decrementWorkerCount(); //此时将线程数量减一 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; //统计总共完成的任务数 workers.remove(w); //将该线程数从workers容器中移除 } finally { mainLock.unlock(); } tryTerminate(); //尝试终止线程池 int c = ctl.get(); //接下来的这个if块要做的事儿了。当池的状态还是RUNNING, //又要分两种情况,一种是异常结束,一种是正常结束。异常结束比较好弄,直接加个线程替换死掉的线程就好了, //也就是最后的addWorker操作 if (runStateLessThan(c, STOP)) { //如果当前运行状态为RUNNING,SHUTDOWN if (!completedAbruptly) { //如果线程不是结束于异常 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //是否允许线程超时结束 if (min == 0 && ! workQueue.isEmpty()) //如果允许把那个且队列不为空 min = 1; //至少要保留一个线程来完成任务 //如果当前活动的线程数大于等于最小的值 // 1.不允许核心线程超时结束,则必须要使得活动线程数超过corePoolSize数才可以 // 2. 允许核心线程超时结束,但是队列中有任务,必须留至少一个线程 if (workerCountOf(c) >= min) return; // replacement not needed } //直接加个线程 addWorker(null, false); } }前面我们的方法遇见过很多次tryTerminate()方法,到底他是怎样尝试结束线程池的呢?[java] view plain copyprint?/** * 执行该方法,根据线程池状态进行 判断是否结束线程池 */ final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || //线程池正在运行中,自然不能结束线程池啦 runStateAtLeast(c, TIDYING) || //如果状态为TIDYING或TERMINATED,池中的活动线程数已经是0,自然也不需要做什么操作了 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) //线程池出于SHUTDOWN状态,但是任务队列不为空,自然不能结束线程池啦 return; if (workerCountOf(c) != 0) { // Eligible to terminate /* 调用这个方法的目的是将shutdown信号传播给其它线程。 调用shutdown方法的时候会去中断所有空闲线程,如果这时候池中所有的线程都正在执行任务, 那么就不会有线程被中断,调用shutdown方法只是设置了线程池的状态为SHUTDOWN, 在取任务(getTask,后面会细说)的时候,假如很多线程都发现队列里还有任务(没有使用锁,存在竞态条件), 然后都去调用take,如果任务数小于池中的线程数,那么必然有方法调用take后会一直等待(shutdown的时候这些线程正在执行任务, 所以没能调用它的interrupt,其中断状态没有被设置),那么在没有任务且线程池的状态为SHUTDWON的时候, 这些等待中的空闲线程就需要被终止iinterruptIdleWorkers(ONLY_ONE)回去中断一个线程,让其从take中退出, 然后这个线程也进入同样的逻辑,去终止一个其它空闲线程,直到池中的活动线程数为0。 */ interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { /* 当状态为SHUTDOWN,且活动线程数为0的时候,就可以进入TIDYING状态了, 进入TIDYING状态就可以执行方法terminated(), 该方法执行结束就进入了TERMINATED状态(参考前文中各状态的含义以及可能的状态转变) */ if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); //执行该方法,结束线程池 } finally { ctl.set(ctlOf(TERMINATED, 0)); /* 当线程池shutdown后,外部可能还有很多线程在等待线程池真正结束, 即调用了awaitTermination方法,该方法中,外部线程就是在termination上await的, 所以,线程池关闭之前要唤醒这些等待的线程,告诉它们线程池关闭结束了。 */ termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }/** * 执行该方法,根据线程池状态进行 判断是否结束线程池 */ final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || //线程池正在运行中,自然不能结束线程池啦 runStateAtLeast(c, TIDYING) || //如果状态为TIDYING或TERMINATED,池中的活动线程数已经是0,自然也不需要做什么操作了 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) //线程池出于SHUTDOWN状态,但是任务队列不为空,自然不能结束线程池啦 return; if (workerCountOf(c) != 0) { // Eligible to terminate /* 调用这个方法的目的是将shutdown信号传播给其它线程。 调用shutdown方法的时候会去中断所有空闲线程,如果这时候池中所有的线程都正在执行任务, 那么就不会有线程被中断,调用shutdown方法只是设置了线程池的状态为SHUTDOWN, 在取任务(getTask,后面会细说)的时候,假如很多线程都发现队列里还有任务(没有使用锁,存在竞态条件), 然后都去调用take,如果任务数小于池中的线程数,那么必然有方法调用take后会一直等待(shutdown的时候这些线程正在执行任务, 所以没能调用它的interrupt,其中断状态没有被设置),那么在没有任务且线程池的状态为SHUTDWON的时候, 这些等待中的空闲线程就需要被终止iinterruptIdleWorkers(ONLY_ONE)回去中断一个线程,让其从take中退出, 然后这个线程也进入同样的逻辑,去终止一个其它空闲线程,直到池中的活动线程数为0。 */ interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { /* 当状态为SHUTDOWN,且活动线程数为0的时候,就可以进入TIDYING状态了, 进入TIDYING状态就可以执行方法terminated(), 该方法执行结束就进入了TERMINATED状态(参考前文中各状态的含义以及可能的状态转变) */ if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); //执行该方法,结束线程池 } finally { ctl.set(ctlOf(TERMINATED, 0)); /* 当线程池shutdown后,外部可能还有很多线程在等待线程池真正结束, 即调用了awaitTermination方法,该方法中,外部线程就是在termination上await的, 所以,线程池关闭之前要唤醒这些等待的线程,告诉它们线程池关闭结束了。 */ termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }2、关闭线程池关闭时使用shutdown()方法,源码如下:[java] view plain copyprint?public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownaccess(); // 检查终止线程池的线程是否有权限。 advanceRunState(SHUTDOWN);// 设置线程池的状态为关闭状态。 interruptIdleWorkers(); // 中断线程池中空闲的线程 onShutdown(); // 钩子函数,在ThreadPoolExecutor中没有任何动作 } finally { mainLock.unlock(); } tryTerminate(); // 尝试终止线程池 }public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 检查终止线程池的线程是否有权限。 advanceRunState(SHUTDOWN);// 设置线程池的状态为关闭状态。 interruptIdleWorkers(); // 中断线程池中空闲的线程 onShutdown(); // 钩子函数,在ThreadPoolExecutor中没有任何动作 } finally { mainLock.unlock(); } tryTerminate(); // 尝试终止线程池 }转载自http://blog.csdn.net/mazhimazh/
新闻热点
疑难解答