线程池原理
# 线程池
# 目的
优化频繁的创建和销毁线程代价太大
# 应用场景
- 单个任务处理时间比较短
- 需要处理的任务数量很大
- 多个任务可重用,一般都满足(程序不会动态的变化)
# 优势
- 重用存在的线程,减少线程创建,消亡的开销,提高性能
- 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
- 提高线程的可管理性。
# ThreadPoolExecutor
- Executor接口是线程池框架中最基础的部分,定义了一个用于执行Runnable的execute方法。
- Executor子接口ExecutorService定义了线程池的具体行为
- ThreadPoolExecutor是ExecutorService的实现.
# API
- execute(Runnable command):履行Ruannable类型的任务,
- submit(task):可用来提交Callable或Runnable任务,并返回代表此任务的Future对象
- shutdown():在完成已提交的任务后封闭办事,不再接管新任务,
- shutdownNow():停止所有正在履行的任务并封闭办事。
- isTerminated():测试是否所有任务都履行完毕了。
- isShutdown():测试是否该ExecutorService已被关闭。
# 线程池状态
# 5种状态
- RUNNING 线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
- SHUTDOWN 线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
- STOP 线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
- TIDYING 所有的任务已终止
- TERMINATED 线程池彻底终止
# 状态切换流程图
# 构造参数解释
corePoolSize 核心线程数
- 如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
maximumPoolSize 允许的最大线程数
- 如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务
keepAliveTime
- 维护线程核心线程外的所允许的空闲时间,核心线程不会销毁
unit
- keepAliveTime的单位
workQueue
用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口
JDK中提供了如下阻塞队列
ArrayBlockingQueue
- 基于数组结构的有界阻塞队列
LinkedBlockingQuene
- 基于链表(可看作无界)结构的阻塞队列
SynchronousQuene
同步插入操作的,不存储元素的阻塞队列
- 每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态
priorityBlockingQuene
- (可排序)优先级的无界阻塞队列
threadFactory
- 用来创建新线程
RejectedExecutionHandler
拒绝策略处理者
当阻塞队列满了,且没有空闲的工作线程超过了maximumPoolSize
4种策略
AbortPolicy
- 直接抛出异常,默认策略
CallerRunsPolicy
- 用调用者所在的线程来执行任务
DiscardPolicy
- 直接丢弃任务;
DiscardOldestPolicy
- 丢弃阻塞队列中靠最前的任务,并执行当前任务;
# 线程池监控
- long getTaskCount() //线程池已执行与未执行的任务总数
- long getCompletedTaskCount() //已完成的任务数
- int getPoolSize() //线程池当前的线程数
- int getActiveCount() //线程池中正在执行任务的线程数量
# 基础内容
线程池中的提交优先级和执行优先级_缘丶沐逸尘的博客-CSDN博客_提交优先级和执行优先级 (opens new window)
提交优先级 核心线程 > 工作队列 > 非核心线程 执行优先级 核心线程 > 非核心线程 > 工作队列
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) {
throw new NullPointerException();
} else {
// 区别1: 包装了任务
RunnableFuture<T> ftask = this.newTaskFor(task, result);
this.execute(ftask);
// 区别2: 有返回值
return ftask;
}
}
2
3
4
5
6
7
8
9
10
11
submit()可以进行Exception处理;可以接受的任务类型不同;submit()有返回值,而execute()没有
# ThreadPoolExecutor源码解析
# execute方法: 入口;提交优先级
int c = ctl.get();
1、判断当前的线程数是否小于corePoolSize如果是,
使用入参任务通过addWord方法创建一个新的线程,
如果能完成新线程创建exexute方法结束,成功提交任务;
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
2、在第一步没有完成任务提交;状态为运行并且能成功加入任务到工作队列后,
再进行一次check,如果状态在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了)
非运行状态下当然是需要reject;
然后再判断当前`所有`的线程数量是否为0(有可能这个时候线程数变为了0),如是,新增一个线程;
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command); // 拒绝策略
else if (workerCountOf(recheck) == 0)
addWorker(null, false); 判断当前工作线程池数是否为0
如果是创建一个null任务,任务在堵塞队列存在了就会从队列中取出 这样做的意义是
保证线程池在running状态必须有一个任务在执行
}
3、如果不能加入任务到工作队列,将尝试使用任务新增一个线程,如果失败,
则是线程池已经shutdown或者线程池已经达到饱和状态,所以reject;
从上面新增任务的execute方法也可以看出,拒绝策略不仅仅是在饱和状态下使用,
在线程池进入到关闭阶段同样需要使用到;
上面的几行代码还不能完全清楚这个新增任务的过程,
else if (!addWorker(command, false))
reject(command); // 拒绝策略
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
- 提交优先级: 先核心线程,再放队列交给,再交给非核心线程
- 拒绝策略不仅仅是在饱和状态下使用,在线程池进入到关闭阶段同样需要使用到;
# addWorker方法 : 构建Worker并开启线程;记录线程信息
private boolean addWorker(Runnable firstTask, boolean core) {
retry: goto写法 用于重试
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
线程状态非运行并且非shutdown状态任务为空,队列非空就不能新增线程了
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
当前现场大于等于最大值
等于核心线程数 非核心大于等于线程池数 说明达到了阈值
最大线程数 就不新增线程
return false;
if (compareAndIncrementWorkerCount(c)) // ctl+1 工作线程总数量+1 如果成功
就跳出死循环。
cas操作 如果为true 新增成功 退出
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry; 进来的状态和此时的状态发生改变 重头开始 重试
// else CAS failed due to workerCount change; retry inner loop
}
}
// 上面主要是对ctl工作现场+1
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); 内部类 封装了线程和任务 通过threadfactory创建线程
final Thread t = w.thread; 毎一个worker就是一个线程数
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
重新获取当前线程池的状态
int rs = runStateOf(ctl.get());
小于shutdown就是running状态
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
SHUTDOWN 和firstTask 为空是从队列中处理任务 那就可以放到集合中
线程还没start是正常的,如果是alive就直接异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s; 记录目前的最大线程数
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); //启动线程
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);//失败回退 从wokers移除w 线程数减1 尝试结束线程池
}
return workerStarted;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
真正做事的线程是Worker里面包装的.
# Worker类结构: 实现了AQS和Runnable,重点是run方法是调用runWorker
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
正在运行woker线程
final Thread thread;
/** Initial task to run. Possibly null. */
传入的任务
Runnable firstTask;
/** Per-thread task counter */
完成的任务数 监控用
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 禁止线程中断
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 调用线程池的工厂创建包装的线程.
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
扩展点: 利用ThreadFactory,可以包装run的方法.做一些控制。
# runwoker方法: 是否中断线程;task.run(); 执行扩展点;计数
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();//获取当前线程
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts 把state从-1改为0 意思是可以允许中断
boolean completedAbruptly = true;
try {
//task不为空 或者 (可以说再去)阻塞队列中拿到了任务
//执行优先级的体现
//退出这个循环的条件是 task为空且 getTask()也为空
while (task != null || (task = getTask()) != null) {
w.lock(); // 自锁...保证线程安全
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
如果当前线程池状态等于stop 就中断
//Thread.interrupted() 中断标志,线程中自己控制中断标记。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt(); //还是会继续往下走
try {
beforeExecute(wt, task); // 这里可以报错退出循环,控制权限校验. 但是不会进afterExecute扩展点..
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null; 这设置为空 等下次循环就会从队列里面获取
w.completedTasks++; 完成任务数+1
w.unlock();
}
}
// 循环正常结束
completedAbruptly = false;
} finally {
// 退出相关操作
processWorkerExit(w, completedAbruptly); // private 不可扩展.
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
- 扩展点: beforeExecute 、afterExecute 方法重写
- 当线程池stop时,可以控制worker里面的Thread里面的中断操作,还是会执行task.run(); 可以再task里面做中断判断操作。
- 执行优先级的体现 : 如果worker里面有task就先执行,如果没有再从队列里面获得。 这也代表着: 如果一开始队列就慢了的话,核心和非核心的线程获得到的任务先执行,之后才执行队列里面的任务.
# getTask方法:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);//获取线程池运行状态
// 线程池 shuitdown 的操作。。
// shuitdown 并且 stop以上或者队列为空, 那就工作现场-1 同时返回为null
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
重新获取工作线程数
int wc = workerCountOf(c);
timed是标志超时销毁
allowCoreThreadTimeOut true 核心线程池也是可以销毁的
// 非核心线程标志
// 判断当前线程数小于核心线程数,就认为是核心线程的操作..
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 允许核心线程超时,或者是非核心线程
// 2种情况都返回为null
// 1. wc 大于了 规定的线程 => 多线程操作线程池的情况,且并发高的情况
// 2. 非核心线程标志 & 已经poll超时了一次 & 不止自己或者队列为空.
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 一定要csa成功才为null..
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 核心线程就一直阻塞等任务,非核心线程超时就跳出去.
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false; // 队列被中单...
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
allowCoreThreadTimeOut 标记为true时注意以下的注意情况: 所有都是poll操作...
注意: 核心线程是通过数量对比确定的,可能核心线程是会变化的。一般情况并发不高的是不会变化的。
timed && timedOut 代表 至少执行一次poll操作。
# 参考资料
线程池是如何保证核心线程不死亡,核心线程和非核心线程的区别? (opens new window)
有关线程池的10个问题 (opens new window)
阿里为何不推荐使用Executors来创建线程池 (opens new window)
newCachedThreadPool 没有限制最大值线程数cpu100%, 其他的用LinkedBlockingQueue没有限制队列数量OOM。
Java中的线程池的集中管理和监控 (opens new window)
线程池源码分析之Worker线程管理 (opens new window) 捕获Java线程池执行任务抛出的异常 futrue获取结果异常情况 (opens new window)
- Thread.setDefaultUncaughtExceptionHandler
- ThreadPoolExecutor.afterExecute 来覆盖操作.
- Future.get 也可以获得.
# ScheduledThreadPoolExecutor
# 基础内容
schedule:延迟多长时间之后只执行一次; scheduledAtFixedRate固定:延迟指定时间后执行一次,之后按照固定的时长周期执行; scheduledWithFixedDelay非固定:延迟指定时间后执行一次,之后按照:上一次任务执行时长 + 周期的时长 的时间去周期执行;
- 编码注意事项: 在自己编写的run方法中,一定要加try-catch 不然报错定时他会自动中断且不会打印任何日志..... 写过这个bug.
# 源码解析
# 类结构 : 继承了ThreadPoolExecutor
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService采用ScheduledThreadPoolExecutor.DelayedWorkQueue 队列.
使用ScheduledFutureTask 做任务包装.
# delayedExecute方法: 入口
有看再仔细看看,这里用的比较少.