并发编程五ThreadPoolExecutor原理解析

概述

由于线程的创建跟销毁是比较消耗资源的,也是比较耗时的。可能为了程序的需要,我们会创建很多线程,所以很有必要对线程进行一个统一的管理,所以就出现了线程池。通过线程池,我们可以重复利用一些线程资源,同时可以统一管理应用内的线程,防止内存泄露。

运行机制

Executor
当我们创建一个任务之后,放进线程池之后,线程池会做如下判断

  • 1.判断核心线程池里的线程是否都在执行任务:否的话则将新任务放入线程池中进行执行,否则进行下一步。
  • 2.判断缓存队列是否未满:是的话,将新任务放入缓存队列,否则进行下一步
  • 3.判断线程池的线程是否都处于工作状态:是的话就就执行线程抛弃策略,否则就执行当前任务

继承关系

ScheduledThreadPoolExecutor

Executor 接口定义了线程池最基本的方法,提交Runnable 任务

1
2
3
public interface Executor {
void execute(Runnable command);
}

ExecutorService 扩充了提交任务的类型,并且定义了线程池关闭任务的方法。
ExecutorService

AbstractExecutorService 是抽象类,主要是对ExecutorService 的一些具体实现
ThreadPoolExecutor 是最核心的一个类,下面会具体分析其源码。
ScheduledThreadPoolExecutor则是在 在 ThreadPoolExecutor 的基础上增加了时间调度的功能

成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
 //32-3=29,线程数量所占位数
private static final int COUNT_BITS = Integer.SIZE – 3;
//低29位表示最大线程数,2的29次幂-1
private static final int CAPACITY = (1 << COUNT_BITS) – 1;
//线程池自身的状态

//符号位101
private static final int RUNNING = -1 << COUNT_BITS;
//高3位000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//高3位001
private static final int STOP = 1 << COUNT_BITS;
//高3位010
private static final int TIDYING = 2 << COUNT_BITS;
//高3位011
private static final int TERMINATED = 3 << COUNT_BITS;
//缓存队列,等待中的线程任务队列
private final BlockingQueue<Runnable> workQueue;
//线程池中工作的线程集合
private final HashSet<Worker> workers = new HashSet<>();
//最大线程数
private int largestPoolSize;
//完成任务的线程数量
private long completedTaskCount;
//创建线程池的工厂类
private volatile ThreadFactory threadFactory;
//线程池丢弃策略
private volatile RejectedExecutionHandler handler;
//在等待执行任务的线程的最大等待时间
private volatile long keepAliveTime;
//核心线程数
private volatile int corePoolSize;
//线程池最大可容纳的线程数
private volatile int maximumPoolSize;
//默认的线程丢弃策略
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
//int 型变量,低3位表示线程池状态,剩余的位数表示最大线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

构造方法

constructors
其实前面的三个构造方法最终都调用了最后一个构造方法,所以就来看看最后一个构造方法

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
}

参数比较多,下面来解释一下

  • corePoolSize核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
  • maximumPoolSize线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
  • keepAliveTime表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过- corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
  • unit:keepAliveTime的时间单位,有如下几种取值
时间单位 解释
TimeUnit.DAYS
TimeUnit.HOURS 小时
TimeUnit.MINUTES 分钟
TimeUnit.SECONDS
TimeUnit.MILLISECONDS 毫秒
TimeUnit.MILLISECONDS 微妙
TimeUnit.NANOSECONDS 纳秒
  • workQueue : 一个阻塞队列,用来存储等待执行的任务,参考下图

BlockingQueue

阻塞队列,如果BlockingQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒,同样,如果BlockingQueue是满的,任何试图往里存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有空间时才会被唤醒继续操作。

  • ArrayBlockingQueue(有界队列): FIFO 队列,规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小

  • LinkedBlockingQueue(无界队列):FIFO 队列,大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定。

  • PriorityBlockingQueue:优先级队列, 类似于LinkedBlockingQueue,但队列中元素非 FIFO, 依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序

  • SynchronousQueue(直接提交策略): 交替队列,队列中操作时必须是先放进去,接着取出来,交替着去处理元素的添加和移除

threadFactory::创建线程池的工厂

RejectedExecutionHandler: 线程丢弃策略,常见的有如下几种

丢弃策略 解释
DiscardPolicy 丢弃任务,但是不抛出异常
CallerRunsPolicy 由调用线程处理该任务
AbortPolicy 丢弃任务并抛出RejectedExecutionException
DiscardOldestPolicy 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

execute方法

提交Runnable任务

execute方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void execute(Runnable command) {
//为空的话,抛出空指针异常
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//判断加入当前任务后,线程数是否小于核心线程数
if (workerCountOf(c) < corePoolSize) {
//如果小于核心线程数,则将立即执行当前任务
if (addWorker(command, true))
return;
c = ctl.get();
}
//核心线程数满了之后,先判断线程池的状态,然后判断缓存队列是否还能进行缓存
if (isRunning(c) && workQueue.offer(command)) {
//如果两者都满足条件,则将线程加入缓存队列
int recheck = ctl.get();
//如果当前任务没有在运行已经被移除,执行线程丢弃策略
if (!isRunning(recheck) && remove(command))
reject(command);
// 正在运行的线程数如果是0,则直接运行当前线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//加入当前任务失败,则执行丢弃策略
else if (!addWorker(command, false))
reject(command);
}

addWorker方法

有两个参数,一个是firstTask,表示加入的Runnable任务,一个是core,表示是否添加到核心线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
    private boolean addWorker(Runnable firstTask, boolean core) {
retry://定义了循环的名称,便于后面直接中断循环
for (;;) {
int c = ctl.get();//获取状态与数量的标志位
int rs = runStateOf(c);//判断线程状态
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
//线程池处于关闭状态,firstTask为null,或者缓存队列为空,返回false
return false;
//死循环
for (;;) {
int wc = workerCountOf(c);//获取线程池数量
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
//比对核心线程数与最大线程数
return false;
if (compareAndIncrementWorkerCount(c))
//添加线程成功,中断循环
break retry;
c = ctl.get(); //重新获取线程状态与数量的标志位
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建一个心的worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//给当前线程上锁
mainLock.lock();
try {
//获取当前线程池状态跟线程数的标记为
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//将worker添加到缓存队列中去
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
//添加成功,改变标记位
workerAdded = true;
}
} finally {
//释放锁
mainLock.unlock();
}
if (workerAdded) {
//添加成功之后,开启线程执行任务
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//添加失败,调用addWorkerFailed方法
addWorkerFailed(w);
}
return workerStarted;
}

submit

FutureTask

继承关系

先看一下FutureTask的继承关系
FutureTask

Runnable
很常见的接口,定义了run方法

1
2
3
public interface Runnable {
public abstract void run();
}

Future
带有返回值的泛型接口

1
2
3
4
5
6
7
8
9
public interface Future<V> {
boolean isCancelled();//任务是否取消
boolean isDone();//任务是否完成
//同步方法,任务执行的返回值
V get() throws InterruptedException, ExecutionException;
//timeout后获取等待结果
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

RunnableFuture
继承自Runnable,Future的泛型返回接口

1
2
3
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}

Callable
//带有返回值的Runnable额接口

1
2
3
public interface Callable<V> {
V call() throws Exception;
}

构造方法

Callable构造方法

1
2
3
4
5
6
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}

Runnable+Result构造方法

1
2
3
4
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}

可以看到不管是Callable还是Runnable构造方法,最后都是使用Callable来进行构造的,之所以这么做,是因为FutureTask需要返回值

提交任务

提交Runnable任务

1
2
3
4
5
6
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

提交callable任务+返回值

1
2
3
4
5
6
7
8
9
10
11

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

提交callable任务

1
2
3
4
5
6
7
8
9
public <T> Future<T> submit(Callable<T> task) {
//首先判断是否为空
if (task == null) throw new NullPointerException();
//将Callable转换成Future
RunnableFuture<T> ftask = newTaskFor(task);
//执行execute方法
execute(ftask);//最后依然会调用execute Runnable方法
return ftask;
}

不管是提交什么样task,最后都会被包装成Runnable方法来执行,还是会调用Executor的execute方法。

tryTerminate

这个方法是当线程池关闭的时候会调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
final void tryTerminate() {
//开启死循环
for (;;) {
//获取线程池状态跟数量的标志位
int c = ctl.get();
//判断三个条件
//1.线程是否在运行
//2.线程池状态小于TIDYING,TERMINATED
//3.线程池已经关闭并且队列为空
满足上面的任意一个条件就会直接返回,很好理解
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) {
// 如果线程数不为0,才有资格去终止
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//CAS设置状态成功,调用terminated,默认空实现
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}

advanceRunState();

更改当前线程池的状态

1
2
3
4
5
6
7
8
9
10
private void advanceRunState(int targetState) {
for (;;) {
//获取当前线程的状态及数量的标志位
int c = ctl.get();
//更改线程状态
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}

shutdown

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void shutdown() {
//锁住线程池
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 线程池状态设为SHUTDOWN,如果已经至少是这个状态那么则直接返回
advanceRunState(SHUTDOWN);
// 中断等待中的线程
interruptIdleWorkers();
onShutdown();
} finally {
//释放锁
mainLock.unlock();
}
tryTerminate();
}

shutdownNow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//传入Stop状态,其余跟shutdown保持一致
advanceRunState(STOP);
//中断所有线程
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

shutdownNow() 和 shutdown()的大体流程相似,差别是:

  • 1、advanceRunState传入的是Stop
  • 2、调用 interruptWorkers() 中断所有线程,包括正在运行的线程
  • 3、将workQueue中待处理的任务移到一个List中,并在方法最后返回,说明shutdownNow()后不会再处理workQueue中的任务

Executors

Executors是Java提供的一个线程池的帮助类,可以帮助我们快速的处理线程池。

构造线程池

由于Java的线程池的构造方法比较复杂,所以Java又提供了Executors这个辅助类,帮助我们更快速地创建ThreadPoolExecutor,可以帮助我们创建4种类型的ThreadPool
Executors_create

  • 单线程异步队列:Executors.newSingleThreadExecutor(),创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行>所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池>保证所有任务的执行顺序按照任务的提交顺序执行。

  • 周期性调度:Executors.newFixedThreadPool(),创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

  • 可缓存的线程:Executors.newCachedThreadPool(int size),创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。

  • 多线程周期性调度:Executors.newScheduledThreadPool(1),创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

Callable转换

executors_change

参考资料

https://www.cnblogs.com/trust-freedom/p/6693601.html
https://zhuanlan.zhihu.com/p/27232156