线程池的使用

使用ThreadPoolExecutor来创建一个线程池:

1
2
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
unit,workQueue, handler);

创建一个线程池时需要传入几个参数,各参数的作用如下:

  1. corePoolSize:线程池的基本大小

    提交一个任务到线程池时,如果当前执行的任务数小于线程池基本大小,线程池会新创建一个线程来执行任务。

  2. workQueue:工作任务队列

    当线程池的大小已经超过核心线程数后,新提交的任务保存至阻塞队列,阻塞队列的选择有以下几种:

    • ArrayBlockingQueue:基于数组结构的有界阻塞队列,此队列按FIFO原则对元素进行排序。
    • LinkedBlockingQueue:基于链表结构的阻塞队列,此队列按FIFO原则对元素进行排序。
    • SynchronousQueue:不存储元素的阻塞队列。每个插入操作必须等待一个移除操作,否则插入操作一直处于阻塞状态。
    • PriorityBlockingQueue:一个具有优先级的无界阻塞队列。
  3. maximumPoolSize:线程池最大数量

    如果提交的任务不能保存到队列中(队列已满),并且线程池的大小小于最大线程数,则线程池会再创建新的线程执行任务。

  4. keepAliveTime:线程活跃存活时间

    当线程池的工作线程空闲后,在活跃时间内没有执行任务,则对该线程进行回收

  5. unit:线程活动保持时间的单位

    通过TimeUnit类选择的单位有天、小时、分钟、毫秒、微秒和纳秒。

  6. handler:饱和策略

    当提交的任务不能保存到队列中且线程池的大小已经不小于最大线程数,那么说明该线程池处于饱和状态,此时需要采取一种饱和策略来处理新提交的任务。

    饱和策略的默认选择为AbortPolicy,表示当线程池饱和时抛出异常。线程池框架还提供了以下3种策略:

    • CallerRunsPolicy:直接在调用者线程中执行该任务。
    • DiscardOldestPolicy:执行队列的poll()方法,然后执行线程池的execute()方法再次提交到线程池。
    • DiscardPolicy:不进行任何处理,也就是将任务直接丢弃了。

    如果想要扩展饱和策略,可以实现RejectedExecutionHandler接口自定义饱和策略。

线程池的实现原理

当向线程池提交一个任务之后,线程池的简单处理流程如下:

  1. 如果线程池当前运行的线程没有超过核心线程数,则创建新线程来执行任务;否则将任务添加入队列中。
  2. 如果队列已满,则判断线程池当前运行的线程是否少于最大线程数,如果少于最大线程数,则创建新线程来执行任务。
  3. 如果创建新线程将使当前运行的线程超出最大线程数,将执行饱和策略。

线程池源码分析

使用线程池的execute()向线程池提交任务,先从execute()方法开始作为查看源码入口

execute()

execute()方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//如果线程池当前运行的线程少于核心线程数,则创建新线程来执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果线程池当前运行的线程不少于核心线程数,则将任务添加入队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次检查是否需要添加新的线程
if (! isRunning(recheck) && remove(command))
reject(command); //如果线程池处于非运行状态,执行饱和策略
//如果之前的线程已被销毁,新建一个线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果队列已满,则创建新的线程来处理任务
else if (!addWorker(command, false))
reject(command); //如果创建新线程失败了,执行饱和策略
}

addWorker()

当工作线程数小于核心线程数的时候,会调用 addWorker()方法创建一个工作线程,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
private boolean addWorker(Runnable firstTask, boolean core) {
retry: //goto
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

//判断线程池是否处于运行状态,处于SHUTDOWN状态时判断队列是否为空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c); //获得工作线程数
//如果工作线程数大于核心线程数或大于最大线程数,则返回false表示不能再添加新线程
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//cas增加工作线程数,cas失败则重试
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // 重新获取ctl的值
//如果线程状态发生变化,则重试
if (runStateOf(c) != rs)
continue retry;
}
}

boolean workerStarted = false; //工作线程启动标识
boolean workerAdded = false; //工作线程添加成功标识
Worker w = null;
try {
w = new Worker(firstTask); //构建一个Worker,传入Runnable对象
final Thread t = w.thread; //从worker中取出线程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //使用重入锁,避免并发问题
try {
int rs = runStateOf(ctl.get());
//只有线程池是运行状态或是(SHUTDOWN且firstTask为空),才添加到workers中
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//判断线程是否活跃
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w); //将新创建的Worker添加到workers中
int s = workers.size(); //集合中的工作线程数
//如果集合中的工作线程数大于最大线程数
if (s > largestPoolSize)
largestPoolSize = s; //更新线程池出现过的最大线程数
workerAdded = true; //工作线程添加成功标识
}
} finally {
mainLock.unlock(); //释放锁
}
//如果worker添加成功则启动线程
if (workerAdded) {
t.start();
workerStarted = true; //工作线程启动标识
}
}
} finally {
if (! workerStarted)
//如果添加失败,递减实际工作线程数,因为方法开始时增加了工作线程数
addWorkerFailed(w);
}
return workerStarted; //返回结果
}

简单来讲就是使用cas操作将工作线程数递增,然后新建一个线程并启动。

Worker类

addWorker()方法中构建了一个Worker类,并且把firstTask 封装到 worker 中,Worker类的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;

/** 这就是实际运行的工作线程 */
final Thread thread;
/** 这是需要执行的任务 */
Runnable firstTask;
/** 线程完成的任务数 */
volatile long completedTasks;

Worker(Runnable firstTask) {
setState(-1); // 初始状态-1,防止runWorker()前中断
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

public void run() {
runWorker(this);
}

protected boolean isHeldExclusively() {
return getState() != 0;
}

protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

从源码中可以看到,worker中保存了一个实际执行任务的线程thread,以及初始化时要被执行的任务firstTask;最终执行任务时,实际是调用了runWorker()方法。

worker类继承了AbstractQueuedSynchronizer(AQS),使用AQS实现了独占锁的功能,但是它的tryAcquire()方法是不允许重入的,它的独占锁作用如下:

  • 当前线程拥有独占锁,那就说明当前线程正在执行任务,当前线程不该被中断
  • 如果当前线程没有拥有独占锁,说明当前线程处于空闲状态,可以对该线程进行中断操作

addWorkerFailed()

在addWorker()方法中,如果添加 Worker失败则调用addWorkerFailed()方法做失败后的处理,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}

如果添加 Worker失败后,判断Worker是否已经构造好了,如果Worker不为空则从workers中删除,且原子递减核心线程数的数量,之后执行tryTerminate()方法尝试结束线程池。

runWorker()

调用Worker类的run()方法时,实际会调用runWorker()方法执行任务,该方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); //表示允许中断
boolean completedAbruptly = true;
try {
//循环执行任务,如果task为空,调用getTask()方法获取task
while (task != null || (task = getTask()) != null) {
w.lock(); //加锁使得shutdown()方法不会终止正在执行任务的worker
// 线程池为stop时不接受新任务,不执行任务队列的任务,且中断正在执行的任务
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); //可以继承ThreadpoolExecutor重写
Throwable thrown = null;
try {
task.run(); //执行任务的run()方法
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); //可以继承ThreadpoolExecutor重写
}
} finally {
task = null; //将完成的任务置空以便下一次循环
w.completedTasks++; //增加完成任务数
w.unlock(); //释放锁
}
}
completedAbruptly = false;
} finally {
//删除worker,及判断释放补充新的worker
processWorkerExit(w, completedAbruptly);
}
}

runWorker()方法主要处理了以下两件事:

  1. 循环执行task任务,如果task为空则调用getTask()方法获取task任务
  2. 如果getTask()方法取得的任务依然位空,则runWorker()方法执行完毕

getTask()

在runWorker()方法中,worker 线程调用getTask()方法从阻塞队列中获取需要执行的任务,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private Runnable getTask() {
boolean timedOut = false; //判断是否超时

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

//判断线程池在shutdown状态时队列是否为空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null; //线程池在shutdown状态时队列为空或线程池为stop状态
}

int wc = workerCountOf(c);

//如果线程池中的线程数量大于核心线程数量,则需要进行超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
//如果timed为true,表示需要进行超时控制
//通过阻塞队列的poll()方法实现超时控制,参数则为keepaliveTime
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//如果取到的任务为空,则表示超时,否则返回取到的任务
if (r != null)
return r;
timedOut = true; //超时回收标记
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

processWorkerExit()

runWorker()方法中的的 while 循环执行完毕以后,在 finally 块中会调用 processWorkerExit()方法来销毁工作线

程。processWorkerExit()方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) //判断是否调整工作线程数
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}

tryTerminate();

int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; //结束
}
addWorker(null, false);
}
}

到此为止,从执行execute()方法提交任务开始,创建worke线程到执行任务以及最后到销毁线程的全部过程就结束了。