线程池的使用 使用ThreadPoolExecutor来创建一个线程池:
1 2 new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,workQueue, handler);
创建一个线程池时需要传入几个参数,各参数的作用如下:
corePoolSize:线程池的基本大小
提交一个任务到线程池时,如果当前执行的任务数小于线程池基本大小,线程池会新创建一个线程来执行任务。
workQueue:工作任务队列
当线程池的大小已经超过核心线程数后,新提交的任务保存至阻塞队列,阻塞队列的选择有以下几种:
ArrayBlockingQueue:基于数组结构的有界阻塞队列,此队列按FIFO原则对元素进行排序。
LinkedBlockingQueue:基于链表结构的阻塞队列,此队列按FIFO原则对元素进行排序。
SynchronousQueue:不存储元素的阻塞队列。每个插入操作必须等待一个移除操作,否则插入操作一直处于阻塞状态。
PriorityBlockingQueue:一个具有优先级的无界阻塞队列。
maximumPoolSize:线程池最大数量
如果提交的任务不能保存到队列中(队列已满),并且线程池的大小小于最大线程数,则线程池会再创建新的线程执行任务。
keepAliveTime:线程活跃存活时间
当线程池的工作线程空闲后,在活跃时间内没有执行任务,则对该线程进行回收
unit:线程活动保持时间的单位
通过TimeUnit类选择的单位有天、小时、分钟、毫秒、微秒和纳秒。
handler:饱和策略
当提交的任务不能保存到队列中且线程池的大小已经不小于最大线程数,那么说明该线程池处于饱和状态,此时需要采取一种饱和策略来处理新提交的任务。
饱和策略的默认选择为AbortPolicy,表示当线程池饱和时抛出异常。线程池框架还提供了以下3种策略:
CallerRunsPolicy:直接在调用者线程中执行该任务。
DiscardOldestPolicy:执行队列的poll()方法,然后执行线程池的execute()方法再次提交到线程池。
DiscardPolicy:不进行任何处理,也就是将任务直接丢弃了。
如果想要扩展饱和策略,可以实现RejectedExecutionHandler接口自定义饱和策略。
线程池的实现原理 当向线程池提交一个任务之后,线程池的简单处理流程如下:
如果线程池当前运行的线程没有超过核心线程数,则创建新线程来执行任务;否则将任务添加入队列中。
如果队列已满,则判断线程池当前运行的线程是否少于最大线程数,如果少于最大线程数,则创建新线程来执行任务。
如果创建新线程将使当前运行的线程超出最大线程数,将执行饱和策略。
线程池源码分析 使用线程池的execute()向线程池提交任务,先从execute()方法开始作为查看源码入口
execute() execute()方法源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public void execute (Runnable command) { if (command == null ) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
addWorker() 当工作线程数小于核心线程数的时候,会调用 addWorker()方法创建一个工作线程,源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
简单来讲就是使用cas操作将工作线程数递增,然后新建一个线程并启动。
Worker类 addWorker()方法中构建了一个Worker类,并且把firstTask 封装到 worker 中,Worker类的源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L ; final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); } public void run () { runWorker(this ); } protected boolean isHeldExclusively () { return getState() != 0 ; } protected boolean tryAcquire (int unused) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } protected boolean tryRelease (int unused) { setExclusiveOwnerThread(null ); setState(0 ); return true ; } public void lock () { acquire(1 ); } public boolean tryLock () { return tryAcquire(1 ); } public void unlock () { release(1 ); } public boolean isLocked () { return isHeldExclusively(); } void interruptIfStarted () { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
从源码中可以看到,worker中保存了一个实际执行任务的线程thread,以及初始化时要被执行的任务firstTask;最终执行任务时,实际是调用了runWorker()方法。
worker类继承了AbstractQueuedSynchronizer(AQS),使用AQS实现了独占锁的功能,但是它的tryAcquire()方法是不允许重入的,它的独占锁作用如下:
当前线程拥有独占锁,那就说明当前线程正在执行任务,当前线程不该被中断
如果当前线程没有拥有独占锁,说明当前线程处于空闲状态,可以对该线程进行中断操作
addWorkerFailed() 在addWorker()方法中,如果添加 Worker失败则调用addWorkerFailed()方法做失败后的处理,源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 private void addWorkerFailed (Worker w) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (w != null ) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
如果添加 Worker失败后,判断Worker是否已经构造好了,如果Worker不为空则从workers中删除,且原子递减核心线程数的数量,之后执行tryTerminate()方法尝试结束线程池。
runWorker() 调用Worker类的run()方法时,实际会调用runWorker()方法执行任务,该方法源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
runWorker()方法主要处理了以下两件事:
循环执行task任务,如果task为空则调用getTask()方法获取task任务
如果getTask()方法取得的任务依然位空,则runWorker()方法执行完毕
getTask() 在runWorker()方法中,worker 线程调用getTask()方法从阻塞队列中获取需要执行的任务,源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 private Runnable getTask () { boolean timedOut = false ; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
processWorkerExit() runWorker()方法中的的 while 循环执行完毕以后,在 finally 块中会调用 processWorkerExit()方法来销毁工作线
程。processWorkerExit()方法源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private void processWorkerExit (Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1 ; if (workerCountOf(c) >= min) return ; } addWorker(null , false ); } }
到此为止,从执行execute()方法提交任务开始,创建worke线程到执行任务以及最后到销毁线程的全部过程就结束了。