月泉的博客

线程池原理剖析

字数统计: 5.8k阅读时长: 23 min
2018/11/09 Share

食用该文章最好具有队列和AQS的知识,关于AQS可以从我以前书写的文章中找出

线程池是什么

线程大家都清楚是什么?那么线程池是什么?在使用线程的时候有没有考虑过,在平时使用的时候系统中只要你想随处都可以创建线程并且很难管控,基本上一个线程使用完就销毁掉了,要在使用便新建一个线程

直接使用线程的缺陷(针对线程池)

  • 线程数量无法限制,想创建多少个就多少个
  • 线程无法复用,创建启动和销毁线程是会带来一定的开销

线程池的出现主要就是解决2个问题,一个是限制线程的数量和线程复用,在这个扩展上面可以再自行扩展出监控等。

线程池的使用

Java的Executors工具类就提供了几种现成的创建线程池实例的方法()

  • newFixedThreadPool
  • newSingleThreadPool
  • newCachedThreadPool
  • newScheduledThreadPool

他们最终的放回值都是返回一个ExecutorService,但newScheduledThreadPool返回的是ScheduledExecutorService但该接口也是继承至ExecutorService接口。

newFixedThreadPool

该方法是创建一个固定线程数量的线程池,其核心线程数和最大线程数都是设的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class PrintTask implements Runnable{

private int sequence;
private long sleepMillis;

public PrintTask(int sequence, long sleepMillis) {
this.sequence = sequence;
this.sleepMillis = sleepMillis;
}

@Override
public void run() {
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " print: " + sequence);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 12; i++) {
executor.execute(new PrintTask(i+1, 3000L));
}
executor.shutdown();
System.out.println("Is shutdown : " + executor.isShutdown());
System.out.println("Is terminated : " + executor.isTerminated());
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(executor.isTerminated());
}

从示例代码中可以看出,首先我创建了一个固定大小的线程池,其固定数量为10,然后execute了12个任务,在执行的过程中当前10个任务没有执行完时,第11个和第12个任务会被阻塞到当有空余的线程数量时开始执行,从输出的结果上来看,第11个和第12个任务并没有在新创建线程来执行任务而是复用线程来执行,我全部execute掉了以后我尝试调用了shutdown方法来关闭线程池,当然调用了以后并没有真正的关闭线程池,它会等待线程池中所有的任务(包括阻塞队列中没有执行完的任务)都执行完在关闭,通过isShutdown可以获取到是否调用过shutdown方法,通过isTerminated获取线程池是否已经终止,也就是停止掉了,此时返回的是false因为线程池中的任务还没有执行完,当过了5秒以后再次执行isTerminated方法返回true因为线程池已经完全shutdown

newSingleThreadPool

newSingleThreadPool会创建只有一个线程的线程池,线程由于只有一个,所以execute的任务会一个一个的执行

1
2
3
4
5
6
7
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 12; i++) {
executor.execute(new PrintTask(i+1, 3000L));
}
executor.shutdown();
}

newCachedThreadPool

这个线程池创建时在你的感知上是没有数量限制,因为它的实现,给予的最大空闲数量为Integer.MAX_VALUE,在执行任务时,如果没有多余的空闲线程执行该任务,就会创建一个新的线程来执行这个任务

1
2
3
4
5
6
7
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 12; i++) {
executor.execute(new PrintTask(i+1, 3000L));
}
executor.shutdown();
}

newScheduledThreadPool

1
2
3
4
5
6
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
executor.schedule(new PrintTask(1, 3000L), 1L, TimeUnit.SECONDS);
executor.scheduleAtFixedRate(new PrintTask(2, 2000L), 1, 1, TimeUnit.SECONDS);
executor.scheduleWithFixedDelay(new PrintTask(3,3000L), 1, 1, TimeUnit.SECONDS);
}

首先创建了一个核心线程数量10的线程池,其最大空闲数量背后的实现仍旧是Integer.MAXVALUE,然后调用了schedule该方法是首先是只调度一次任务,第一个参数为要调度的任务,第二参数为时间,第三个参数为时间单位,在接着使用了scheduleAtFixedRate这是相对频率重复调度任务的方法,第一个参数为要调度的任务,第二个参数为首次开始的延时时间,第三个参数为相对上一个任务开始的延迟执行时间,第四参数为时间单位,scheduleWithFixedDelay第一个参数为要调度的任务,第二个参数为首次延迟执行的时间,第三个参数为上一个任务执行结束后执行的间隔时间,第四个参数为时间单位

以上4种线程池的不同点及场景

以下都是我个人观点,若有异议可以邮箱交流 yuequan1997@gmail.com

newFixedThreadPool

特征

固定大小的线程数量,超出线程数量的任务会阻塞等待到有空闲的线程时执行且长度是原则上无限的(毕竟还受JVM参数或硬件内存大小等影响)

适用场景

可预计或不可预计的并发任务数量且时间是模糊的不确定时间的长短,可预计是指例如我可能每次并发添加10个任务,那么便提前预备好线程数量5个或线程数量10或者20个,以便”高效”的并发执行,不可预计任务数量是指根据当前资源的分配情况下来合理分配线程池中的线程数量,限定它最大并发处理任务的数量。

newSingleThreadPool

特征

固定大小为1的线程数量,重复的利用这1个线程去执行任务,且同时只能执行1个任务,在任务未执行完之前后续添加的任务都会被阻塞且长度是原则上无限的(毕竟还受JVM参数或硬件内存大小等影响)

使用场景

资源有限一次只执行一个任务(本来还想说执行有先后依赖顺序的,其实这个真的不推荐这样设计,所以忽略),其它任务加入队列挨个执行,重复利用一个线程节省线程创建和销毁的开销

newCachedThreadPool

特征

其最大线程数量为Integer.MAX_VALUE,在任务来时无空闲线程的时候则新开一个线程去执行它,若有空闲则使用空闲线程,每一个新开的线程,都有一个空闲存活60秒的时间,其长度原则上没有限制(毕竟还受JVM参数或硬件内存大小等影响)

使用场景

大部分任务执行时间比较短,而且频繁,使用该线程池即可复用线程又可在突增并发数时创建新的线程从而达到最大并发效率(并发数猛增的时候可能会出现意外,所以如果有这种场景扩展下线程池做个最大限制最为合理)

newScheduledThreadPool

特征

核心大小的线程数为传入的数量,其最大线程数量为Integer.MAX_VALUE,线程有10秒的空闲存活时间,该线程池最主要的特征其实就是定时调度一次或重复调度

使用场景

任务需要在相对时间频率下执行

线程池的实现

Executor

Executor 是最顶层的接口,它只定义了一个接口方法,用来执行任务

1
2
3
public interface Executor {
void execute(Runnable command);
}

ExecutorService

ExecutorService接口继承至Executor接口,在它之上扩展了线程池中的生命周期的管理和异步执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public interface ExecutorService extends Executor {

void shutdown();

List<Runnable> shutdownNow();

boolean isShutdown();

boolean isTerminated();

boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

接口方法大意如下:

  • shutdown

    尝试关闭线程池,首先会拒绝接收新的任务,其次会等待正在执行中的所有任务执行完(包括阻塞等待中的)

  • shutdownNow

    尝试关闭线程池,首先也会拒绝接收新的任务,对正在执行中的所有任务发出中断请求,同时抛弃队列中还在等待的任务。

  • isShutdown

    是否尝试关闭线程池,尝试过则返回true否则返回false

  • isTerminated

    线程池是否已经终止,用来判断shutdownshutdownNow是否已经完全关闭了线程池

  • awaitTermination

    根据传入的时间延迟获取线程池关闭的状态,这里需要注意的是是阻塞等待

  • submit

    提交任务至线程池中

  • invokeAll

    批量给定任务,返回执行完的所有任务

  • invokeAny

    批量给定任务,返回一个已执行完的任务

AbstractExecutorService

AbstractExecutorService实现至ExecutorService其主要实现了invokeAllinvokeAnysubmit及对于异步任务的cancel操作,这个类不是最主要关心的。

ThreadPoolExecutor

该类是AbstractExecutorService的子类,同时也实现了上述所有未实现的接口,简而言之它就是线程池概念抽象的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

private final BlockingQueue<Runnable> workQueue;

private final ReentrantLock mainLock = new ReentrantLock();

private final HashSet<Worker> workers = new HashSet<>();

private final Condition termination = mainLock.newCondition();

private int largestPoolSize;

private long completedTaskCount;

private volatile ThreadFactory threadFactory;

private volatile RejectedExecutionHandler handler;

private volatile long keepAliveTime;

private volatile boolean allowCoreThreadTimeOut;

private volatile int corePoolSize;

private volatile int maximumPoolSize;

......................
}

以上代码片段是该类定义的所有的实例变量及常量,以下是对这些实例变量及常量的一个简要解释

  • ctl

    该变量是一个复合含义的变量,其本身可以看作是一个Integer变量,该变量的高3位代表线程池的状态,那么后29位(从低位往高位数)代表该线程池数量

  • COUNT_BITS

    数量的位数

  • COUNT_MASK

    数量位数的掩码

  • RUNNING

    表示运行中的状态标识

  • SHUTDOWN

    表示关闭中的状态标识

  • STOP

    表示已停止的状态标识

  • TIDYING

    表示当前所有任务已经终止,任务数量为0时的状态标识

  • TERMINATED

    表示线程池已经完全终止(关闭),关于线程池的关闭状态

    线程池状态变换

  • workQueue

    用来保存等待任务执行的阻塞队列

  • mainLock

    可重入锁,方法里面会大量使用,很多变量的操作都需要使用该锁

  • workers

    该集合中包含了所有在工作的线程

  • termination

    锁条件队列,主要用于awaitTermination

  • largestPoolSize

    记录线程池最大工作线程的数量(可能是个历史值)

  • completedTaskCount

    完成任务的计时器,仅在中止工作任务时更新

  • threadFactory

    用于创建线程的工厂

  • handler

    饱和策略的回调,当队列已满且线程个数达到maximumPoolSize时采取的策略

    有以下几种策略

    • AbortPolicy

    • CallerRunsPolicy

    • DiscardOldestPolicy

    • DiscardPolicy

      分别为:抛出异常、使用调用者当前的线程来执行任务、调用队列的poll丢弃一个任务,执行当前任务、默默丢弃该任务。

  • keepAliveTime

    空闲存活时间,如果线程池中的线程数量比核心线程数量还要多时,并且多出的这些线程都是闲置状态,该变量则是这些闲置状态的线程的存活时间啊

  • allowCoreThreadTimeOut

    默认为false,即时是空闲核心线程也会处于活动状态,如果设为true那么核心线程也会遵循keepAliveTime的时间来做闲置处理

  • corePoolSize

    线程池核心线程数量

  • maximumPoolSize

    线程池最大线程数量

在大致清楚基础的变量后,我们从入口execute开始

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void execute(Runnable command) {
if (command == null) // 1
throw new NullPointerException();
int c = ctl.get(); // 2
if (workerCountOf(c) < corePoolSize) { // 3
if (addWorker(command, true)) // 3.1
return;
c = ctl.get(); // 3.2
}
if (isRunning(c) && workQueue.offer(command)) { // 4
int recheck = ctl.get(); // 5
if (! isRunning(recheck) && remove(command)) //6
reject(command);
else if (workerCountOf(recheck) == 0) // 7
addWorker(null, false);
}
else if (!addWorker(command, false)) //8
reject(command);
}

接下来按照注释的序号对其一一解释

  1. 提交的任务如果是个空的则抛出NullPointerException

  2. 获取复合变量(记录了线程池状态和当前线程池线程数量)

  3. 判断当前线程池的线程数量是否在限定的核心线程数量的访问楼内

    1. 如果在那么就直接调用addWorker添加一个核心线程,然后return
    2. 添加失败,重新获取复合变量
  4. 判断线程池是否是运行状态并且添加至阻塞等待队列中

  5. 重新获取状态(可能添加的过程中关闭过线程池之类的并发操作)

  6. 判断线程池是否是运行状态,如果不是将添加的任务删除并采取拒绝措施

  7. 判断线程池中的工作线程数量是否为0,如果为空则添加一个工作线程

  8. 队列添加失败,尝试调用addWorker以非核心线程的方式添加一条非核心线程执行,失败则采用饱和策略拒绝该任务

    从上面的源码可以看到,如果核心线程数量未达到限定范围则会优先创建核心线程来执行该任务,否则将其加入阻塞等待队列中,如果添加至阻塞等待队列中失败后,则尝试创建一个非核心线程来执行该任务如果失败则采用饱和策略,该方法大量都与addWorker方法相关。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) { // 1
int c = ctl.get(); // 2
int rs = runStateOf(c); // 3
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty())) // 4
return false;

for (;;) { // 5
int wc = workerCountOf(c); // 6
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) // 7
return false;
if (compareAndIncrementWorkerCount(c)) // 8
break retry;
c = ctl.get(); // 9
if (runStateOf(c) != rs) // 10
continue retry;
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // 11
final Thread t = w.thread; // 12
if (t != null) {
final ReentrantLock mainLock = this.mainLock; // 13
mainLock.lock(); // 14
try {
int rs = runStateOf(ctl.get()); // 15

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) { // 16
if (t.isAlive()) // 17
throw new IllegalThreadStateException();
workers.add(w); // 18
int s = workers.size(); // 19
if (s > largestPoolSize) // 20
largestPoolSize = s;
workerAdded = true; // 21
}
} finally {
mainLock.unlock(); // 22
}
if (workerAdded) { // 23
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted) // 24
addWorkerFailed(w);
}
return workerStarted; // 25
}

以下是对上述代码的分析

  1. 开始自旋
  2. 获取复合状态
  3. 拿到当前线程池运行状态
  4. 判断在必要时检查队列是否为空
    1. 线程池处于SHUTDOWN时并且有第一个任务时
    2. 当前线程池为STOPTIDYINGTERMINATED
    3. 当前线程池为SHUTDOWN时且任务队列为空时
    4. 以上三种情况都会返回false
  5. 开启第二轮自旋(其实第一轮自旋就只是检测运行状态)
  6. 获取线程数量
  7. 判断当前线程数量是否超出了最大容量限制或判断当前线程数量是否大于核心线程数或者最大线程数,具体判断是判断核心线程数还是最大线程数取决于调用时传入的core是否为true,如果超出了直接返回false
  8. CAS增加当前线程数量,更改成功结束自旋
  9. 重新获取复合状态
  10. 判断当前线程池状态是否还是运行中,如果不是则跳过第一层自旋的第一次自旋开始第二次
  11. 创建工作线程
  12. 获取工作者中的线程对象
  13. 拿到锁变量
  14. 尝试获取锁(在操作队列时采取同步措施)
  15. 获取当前线程池运行状态
  16. 判断线程池是否在运行状态,否则判断是否是SHUTDOWN状态且传入的任务为空(有时是只启动一个工作线程)
  17. 判断线程是否是alive状态
  18. 添加工作线程队列
  19. 取当前工作线程队列的数量
  20. 判断是否大于最大线程数量,如果大于则赋值给它
  21. 设置当前工作者加入线程队列的已添加的标识为true
  22. 释放锁
  23. 判断当前工作线程是否已经加入工作线程队列,如果以加入则启动该工作线程,并设置启动标识为true
  24. 判断工作线程启动标识如果为false则调用addWorkerFailed
  25. 返回启动标识来决定是否添加成功

步骤还挺多的,简单的总结一下,首先自旋的去增加工作者线程的数量,然后创建工作者(工作线程),然后涉及到队列的操作获取到锁然后添加到工作线程队列设置标识,如果未添加到线程队列中,该工作线程也不会启动,如果添加了那么启动该工作线程,然后设置启动标识,最后返回启动标识

1
2
3
4
5
6
7
8
9
10
11
12
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}

addWorkerFailed实际上就是从工作线程队列中移除当前添加失败的工作线程,然后对工作线程数量-1(其实这一步可以综合的说成是回滚操作)

1
2
3
4
5
6
7
8
9
10
11
12
13
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown();
} finally {
mainLock.unlock();
}
tryTerminate();
}

设置复合状态为SHUTDOWN,对所有工作线程发出中断请求,调用onShutdown,在该类中没有对该方法做任何操作,该方法是留给子类做扩展用的,类似于hook函数,最后释放锁,然后调用tryTerminate尝试将状态改为TERMINATED

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

将状态设为STOP,中断所有工作线程,丢弃所有等待中的队列,最后释放锁,然后调用tryTerminate尝试将状态改为TERMINATED

Worker

从上述的源码中可以看出,所有的任务都是放在一个一个的worker中执行的,那么Worker究竟是个啥?

1
2
3
4
5
6
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
.......
}

它继承至AQS,实现了Runnable接口,其利用AQS实现了一把简单的锁,它是ThreadPoolExecutor的一个内部类

1
2
3
4
5
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

其仅有一个带参的构造函数,其构造函数首先是将状态设为-1,其利用AQS的状态大意是

  • -1 还未准备好,禁止中断
  • 0 unlock
  • 1 lock

不熟悉AQS的可以自行在我博客中找关于AQS的文章

设置完状态后利用线程工厂创建了一个线程(注意参数,将自身实例传了进去)

看一个线程实例看什么?很简单看run就完事了

1
2
3
public void run() {
runWorker(this);
}

从源码中可以看出,将行为委托给了runWorker并将自身实例传递了过去,runWorker是在ThreadPoolExecutor中定义的,其实这里主要是做的职责分离(不理解这个做法也无所谓完全无伤大雅)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
final void runWorker(Worker w) {
Thread wt = Thread.currentThread(); // 1
Runnable task = w.firstTask; // 2
w.firstTask = null; // 3
w.unlock(); // 4
boolean completedAbruptly = true; // 5
try {
while (task != null || (task = getTask()) != null) { // 6
w.lock(); // 7
if ((runStateAtLeast(ctl.get(), STOP) || // 8
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 9
try {
task.run(); // 10
afterExecute(task, null); // 11
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock(); // 12
}
}
completedAbruptly = false; // 13
} finally {
processWorkerExit(w, completedAbruptly); // 14
}
}
  1. 获取当前线程
  2. 拿到工作线程中的任务
  3. 将工作线程对象中的任务清空
  4. unlock设置锁状态,这里主要还是设置状态为可中断
  5. 设置一个标识,我习惯将这个表示称为“猝死”标识,它主要是标识这个线程是不是正常执行完,是不是意外中断了或者是执行
  6. 自旋判断当前任务是否为空,如果为空,则调用getTask拿去一个任务
  7. 获取锁
  8. 判断状态是否是STOP或者被中断了,如果是则发出中断请求
  9. 方法执行之前的一些Hook函数,说白了该函数啥都没干,留给子类扩展
  10. 运行任务里面的逻辑
  11. 方法执行之后的一些Hook函数
  12. 解锁
  13. 设置“猝死”标识为false没有被猝死233333
  14. 调用processWorkerExit故名思意,其实其背后就是去删除了workers中的当前退出工作线程对象和修改数量

getTask方法中除了从阻塞队列中拿去一个任务以外,还有一个作用就是维持当前线程活下去

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private Runnable getTask() {
boolean timedOut = false; // 1

for (;;) {
int c = ctl.get();

if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 2

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) { // 3
if (compareAndDecrementWorkerCount(c)) // 4
return null;
continue; // 5
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take(); // 6
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
  1. 超时标志
  2. 定时标志,首先判断是否允许核心线程超时(默认false)然后判断当前线程池线程数量是否大于核心线程数
  3. 判断当前线程池数是否超过了最大线程数 || 当前线程是否是定时并且已经超时 并且 线程数大于1 或 任务队列为空
  4. 线程数减1,返回null
  5. 跳过本轮自旋
  6. 从队列中拿取任务

其实所谓的核心线程就是保持它启动后保证在核心线程数内的线程不会挂掉一直在自旋,但如果是设置了allowCoreThreadTimeOut标志为true的话那么就意义不大了

Executors

以上线程池分析完了,该工具类是官方提供给我们创建线程池的一些工具集,接着可以看下常用的创建方式的源码

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

创建最大线程数和核心线程数都为传入的参数的大小,空闲存活时间为0,其使用的队列是无界链表阻塞队列

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

创建一个核心线程数为0,但最大线程数为Integer.MAX_VALUE的线程池,其线程空闲存活时间为60秒,使用同步队列

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

创建一个核心线程数为1,最大线程数为1的线程池,其线程空闲存活时间为1秒,使用阻塞无界链表队列

原文作者:yuequan

原文链接:http://www.lunaspring.com/2018/11/09/juc_thread_pool/

发表日期:November 9th 2018, 8:39:00 pm

更新日期:June 20th 2019, 4:08:32 pm

版权声明:© 月泉 - 邓亮泉 版权所有

CATALOG
  1. 1. 线程池是什么
  2. 2. 线程池的使用
    1. 2.1. newFixedThreadPool
    2. 2.2. newSingleThreadPool
    3. 2.3. newCachedThreadPool
    4. 2.4. newScheduledThreadPool
    5. 2.5. 以上4种线程池的不同点及场景
      1. 2.5.1. newFixedThreadPool
      2. 2.5.2. newSingleThreadPool
      3. 2.5.3. newCachedThreadPool
      4. 2.5.4. newScheduledThreadPool
  3. 3. 线程池的实现
    1. 3.1. Executor
    2. 3.2. ExecutorService
    3. 3.3. AbstractExecutorService
    4. 3.4. ThreadPoolExecutor
    5. 3.5. Worker
    6. 3.6. Executors