资料总结 资料总结
首页
go
java
云原生
  • mysql
  • redis
  • MongoDB
  • 设计模式详解
  • 数据结构与算法
  • 前端
  • 项目
  • 理论基础
  • 运营
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

linghui Wu

一只努力学飞的鱼
首页
go
java
云原生
  • mysql
  • redis
  • MongoDB
  • 设计模式详解
  • 数据结构与算法
  • 前端
  • 项目
  • 理论基础
  • 运营
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • java-se

    • java集合
    • 计算机基础
    • 并发编程
    • java线程
    • java协程
    • synchronized
    • Unsafe&Atomic
    • AQS
    • Lock
    • JUC工具杂记
    • Queue
    • 线程池原理
      • 目的
      • 应用场景
      • 优势
        • ThreadPoolExecutor
      • API
      • 线程池状态
        • 5种状态
        • 状态切换流程图
      • 构造参数解释
      • 线程池监控
      • 基础内容
      • ThreadPoolExecutor源码解析
        • execute方法: 入口;提交优先级
        • addWorker方法 : 构建Worker并开启线程;记录线程信息
        • Worker类结构: 实现了AQS和Runnable,重点是run方法是调用runWorker
        • runwoker方法: 是否中断线程;task.run(); 执行扩展点;计数
        • getTask方法:
      • 参考资料
        • ScheduledThreadPoolExecutor
      • 基础内容
      • 源码解析
        • 类结构 : 继承了ThreadPoolExecutor
        • delayedExecute方法: 入口
      • 参考资料
    • Future
    • ForkJoin
    • BIO,NIO,AIO
  • jvm

  • mybatis

  • Netty

  • 爬虫 webmagic

  • spring

  • spring-cloud

  • 中间件

  • flowable

  • idea工具

  • maven

  • ms

  • java部署

  • 原生安卓

  • java
  • java-se
wulinghui
2022-02-11
目录

线程池原理

# 线程池

# 目的

优化频繁的创建和销毁线程代价太大

# 应用场景

  • 单个任务处理时间比较短
  • 需要处理的任务数量很大
  • 多个任务可重用,一般都满足(程序不会动态的变化)

# 优势

  • 重用存在的线程,减少线程创建,消亡的开销,提高性能
  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。

# ThreadPoolExecutor

  • Executor接口是线程池框架中最基础的部分,定义了一个用于执行Runnable的execute方法。
  • Executor子接口ExecutorService定义了线程池的具体行为
  • ThreadPoolExecutor是ExecutorService的实现.

# API

  • execute(Runnable command):履行Ruannable类型的任务,
  • submit(task):可用来提交Callable或Runnable任务,并返回代表此任务的Future对象
  • shutdown():在完成已提交的任务后封闭办事,不再接管新任务,
  • shutdownNow():停止所有正在履行的任务并封闭办事。
  • isTerminated():测试是否所有任务都履行完毕了。
  • isShutdown():测试是否该ExecutorService已被关闭。

# 线程池状态

# 5种状态

  • RUNNING 线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
  • SHUTDOWN 线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
  • STOP 线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
  • TIDYING 所有的任务已终止
  • TERMINATED 线程池彻底终止

# 状态切换流程图

# 构造参数解释

    • corePoolSize 核心线程数

      • 如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
    • maximumPoolSize 允许的最大线程数

      • 如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务
    • keepAliveTime

      • 维护线程核心线程外的所允许的空闲时间,核心线程不会销毁
    • unit

      • keepAliveTime的单位
    • workQueue

      • 用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口

      • JDK中提供了如下阻塞队列

        • ArrayBlockingQueue

          • 基于数组结构的有界阻塞队列
        • LinkedBlockingQuene

          • 基于链表(可看作无界)结构的阻塞队列
        • SynchronousQuene

          • 同步插入操作的,不存储元素的阻塞队列

            • 每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态
        • priorityBlockingQuene

          • (可排序)优先级的无界阻塞队列
    • threadFactory

      • 用来创建新线程
    • RejectedExecutionHandler

      • 拒绝策略处理者

      • 当阻塞队列满了,且没有空闲的工作线程超过了maximumPoolSize

      • 4种策略

        • AbortPolicy

          • 直接抛出异常,默认策略
        • CallerRunsPolicy

          • 用调用者所在的线程来执行任务
        • DiscardPolicy

          • 直接丢弃任务;
        • DiscardOldestPolicy

          • 丢弃阻塞队列中靠最前的任务,并执行当前任务;

# 线程池监控

    • long getTaskCount() //线程池已执行与未执行的任务总数
    • long getCompletedTaskCount() //已完成的任务数
    • int getPoolSize() //线程池当前的线程数
    • int getActiveCount() //线程池中正在执行任务的线程数量

# 基础内容

  • 线程池的处理流程和原理 和 拒绝策略 (opens new window)

  • 线程池中的提交优先级和执行优先级_缘丶沐逸尘的博客-CSDN博客_提交优先级和执行优先级 (opens new window)

    提交优先级 核心线程 > 工作队列 > 非核心线程 执行优先级 核心线程 > 非核心线程 > 工作队列

  • 线程池中execute和submit的区别 - 简书 (jianshu.com) (opens new window)

public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) {
            throw new NullPointerException();
        } else {
            // 区别1: 包装了任务
            RunnableFuture<T> ftask = this.newTaskFor(task, result);
            this.execute(ftask);
            // 区别2: 有返回值
            return ftask;
        }
    }
1
2
3
4
5
6
7
8
9
10
11

submit()可以进行Exception处理;可以接受的任务类型不同;submit()有返回值,而execute()没有

# ThreadPoolExecutor源码解析

# execute方法: 入口;提交优先级

int c = ctl.get();
1、判断当前的线程数是否小于corePoolSize如果是,
使用入参任务通过addWord方法创建一个新的线程,
如果能完成新线程创建exexute方法结束,成功提交任务;
if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
        return;
    c = ctl.get();
}
2、在第一步没有完成任务提交;状态为运行并且能成功加入任务到工作队列后,
再进行一次check,如果状态在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了)
非运行状态下当然是需要reject;
然后再判断当前`所有`的线程数量是否为0(有可能这个时候线程数变为了0),如是,新增一个线程;
if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);  // 拒绝策略
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false); 判断当前工作线程池数是否为0  
        如果是创建一个null任务,任务在堵塞队列存在了就会从队列中取出 这样做的意义是
        保证线程池在running状态必须有一个任务在执行
        
        
        
}
3、如果不能加入任务到工作队列,将尝试使用任务新增一个线程,如果失败,
则是线程池已经shutdown或者线程池已经达到饱和状态,所以reject;
从上面新增任务的execute方法也可以看出,拒绝策略不仅仅是在饱和状态下使用,
在线程池进入到关闭阶段同样需要使用到;
上面的几行代码还不能完全清楚这个新增任务的过程,
else if (!addWorker(command, false))
    reject(command); // 拒绝策略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
  • 提交优先级: 先核心线程,再放队列交给,再交给非核心线程
  • 拒绝策略不仅仅是在饱和状态下使用,在线程池进入到关闭阶段同样需要使用到;

# addWorker方法 : 构建Worker并开启线程;记录线程信息

private boolean addWorker(Runnable firstTask, boolean core) {
    retry: goto写法 用于重试
    for (;;) { 
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
               线程状态非运行并且非shutdown状态任务为空,队列非空就不能新增线程了
               
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                当前现场大于等于最大值 
                等于核心线程数 非核心大于等于线程池数 说明达到了阈值 
                最大线程数 就不新增线程
                return false;
            if (compareAndIncrementWorkerCount(c))  // ctl+1 工作线程总数量+1 如果成功
            就跳出死循环。
            cas操作 如果为true 新增成功 退出
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry; 进来的状态和此时的状态发生改变 重头开始 重试 
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // 上面主要是对ctl工作现场+1

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask); 内部类 封装了线程和任务 通过threadfactory创建线程
        
        final Thread t = w.thread; 毎一个worker就是一个线程数
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                重新获取当前线程池的状态
                int rs = runStateOf(ctl.get());
                 小于shutdown就是running状态
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                        SHUTDOWN 和firstTask 为空是从队列中处理任务 那就可以放到集合中
                       线程还没start是正常的,如果是alive就直接异常
                     if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s; 记录目前的最大线程数
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();  //启动线程
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);//失败回退 从wokers移除w 线程数减1 尝试结束线程池
    }
    return workerStarted;
}   
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78

真正做事的线程是Worker里面包装的.

# Worker类结构: 实现了AQS和Runnable,重点是run方法是调用runWorker

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    正在运行woker线程
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    传入的任务
    Runnable firstTask;
    /** Per-thread task counter */
    完成的任务数 监控用
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        // 禁止线程中断 
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 调用线程池的工厂创建包装的线程.
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }
}    
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

扩展点: 利用ThreadFactory,可以包装run的方法.做一些控制。

# runwoker方法: 是否中断线程;task.run(); 执行扩展点;计数

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();//获取当前线程
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts 把state从-1改为0 意思是可以允许中断
    boolean completedAbruptly = true;
    try { 
        //task不为空 或者 (可以说再去)阻塞队列中拿到了任务
        //执行优先级的体现
        //退出这个循环的条件是 task为空且 getTask()也为空
        while (task != null || (task = getTask()) != null) {
            w.lock(); // 自锁...保证线程安全
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            如果当前线程池状态等于stop 就中断
            //Thread.interrupted() 中断标志,线程中自己控制中断标记。
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt(); //还是会继续往下走
            try {
                beforeExecute(wt, task); // 这里可以报错退出循环,控制权限校验. 但是不会进afterExecute扩展点..
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null; 这设置为空 等下次循环就会从队列里面获取
                w.completedTasks++; 完成任务数+1
                w.unlock();
            }
        }
        // 循环正常结束
        completedAbruptly = false;
    } finally { 
        // 退出相关操作
        processWorkerExit(w, completedAbruptly); // private 不可扩展.
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
  • 扩展点: beforeExecute 、afterExecute 方法重写
  • 当线程池stop时,可以控制worker里面的Thread里面的中断操作,还是会执行task.run(); 可以再task里面做中断判断操作。
  • 执行优先级的体现 : 如果worker里面有task就先执行,如果没有再从队列里面获得。 这也代表着: 如果一开始队列就慢了的话,核心和非核心的线程获得到的任务先执行,之后才执行队列里面的任务.

# getTask方法:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);//获取线程池运行状态
		
        // 线程池 shuitdown 的操作。。
        // shuitdown 并且 stop以上或者队列为空, 那就工作现场-1 同时返回为null 
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
             重新获取工作线程数
        int wc = workerCountOf(c);
        timed是标志超时销毁
        allowCoreThreadTimeOut  true 核心线程池也是可以销毁的
        // 非核心线程标志
        // 判断当前线程数小于核心线程数,就认为是核心线程的操作..
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 允许核心线程超时,或者是非核心线程
        
        // 2种情况都返回为null
        // 1. wc 大于了 规定的线程 => 多线程操作线程池的情况,且并发高的情况
        // 2. 非核心线程标志 & 已经poll超时了一次 &  不止自己或者队列为空.
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // 一定要csa成功才为null..
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 核心线程就一直阻塞等任务,非核心线程超时就跳出去.
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false; // 队列被中单...
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
  • allowCoreThreadTimeOut 标记为true时注意以下的注意情况: 所有都是poll操作...

  • 注意: 核心线程是通过数量对比确定的,可能核心线程是会变化的。一般情况并发不高的是不会变化的。

  • timed && timedOut 代表 至少执行一次poll操作。

# 参考资料

线程池是如何保证核心线程不死亡,核心线程和非核心线程的区别? (opens new window)

有关线程池的10个问题 (opens new window)

阿里为何不推荐使用Executors来创建线程池 (opens new window)

newCachedThreadPool 没有限制最大值线程数cpu100%, 其他的用LinkedBlockingQueue没有限制队列数量OOM。

Java中的线程池的集中管理和监控 (opens new window)

线程池源码分析之Worker线程管理 (opens new window) 捕获Java线程池执行任务抛出的异常 futrue获取结果异常情况 (opens new window)

  1. Thread.setDefaultUncaughtExceptionHandler
  2. ThreadPoolExecutor.afterExecute 来覆盖操作.
  3. Future.get 也可以获得.

# ScheduledThreadPoolExecutor

# 基础内容

  1. 常用的api (opens new window)

schedule:延迟多长时间之后只执行一次; scheduledAtFixedRate固定:延迟指定时间后执行一次,之后按照固定的时长周期执行; scheduledWithFixedDelay非固定:延迟指定时间后执行一次,之后按照:上一次任务执行时长 + 周期的时长 的时间去周期执行;

  1. 编码注意事项: 在自己编写的run方法中,一定要加try-catch 不然报错定时他会自动中断且不会打印任何日志..... 写过这个bug.

# 源码解析

# 类结构 : 继承了ThreadPoolExecutor

  • public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService

  • 采用ScheduledThreadPoolExecutor.DelayedWorkQueue 队列.

  • 使用ScheduledFutureTask 做任务包装.

# delayedExecute方法: 入口

有看再仔细看看,这里用的比较少.

# 参考资料

ScheduledThreadPoolExecutor使用

深入理解Java线程池:ScheduledThreadPoolExecutor (opens new window)

编辑 (opens new window)
上次更新: 2023/01/24, 15:21:15
Queue
Future

← Queue Future→

最近更新
01
架构升级踩坑之路
02-27
02
总结
02-27
03
语法学习
02-27
更多文章>
| Copyright © 2021-2025 Wu lingui |
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式