ThreadPoolExecutor
使用线程池的目的是:
- 降低资源消耗:避免重复的创建和销毁线程造成的消耗。
- 提高响应速度:当任务到达后,任务可以不需要等到线程创建就能立即执行。
- 提供对线程的可管理性:提供对线程的统一分配、调优和监控。
核心参数
7大核心参数
:核心线程池大小、最大线程池大小、空闲线程等待工作的超时时间、时间单元、线程工厂、阻塞队列
4大拒绝策略
:拒绝任务并抛出异常(AbortPolicy,默认)、拒绝任务不抛出异常(DiscardPolicy)、和最早的任务竞争(DiscardOldestPolicy)、由当前线程执行任务(CallerRunsPolicy)。
Worker表示工作中的任务,继承于AQS模版
, Worker初始化的时候会自己创建线程工厂。
线程池的状态
:RUNNING表示可以正常接受任务并处理阻塞队列中任务、SHUTDOWN不能接受新的任务但是可以处理阻塞队列里的任务、STOP不能接受任务也不能处理阻塞队列里的任务、TIDYING所有任务都终止、TERMINATED什么都不做
public class ThreadPoolExecutor extends AbstractExecutorService {
// 四大拒绝策略
public static class CallerRunsPolicy implements RejectedExecutionHandler{}
public static class AbortPolicy implements RejectedExecutionHandler{}
public static class DiscardPolicy implements RejectedExecutionHandler{}
public static class DiscardOldestPolicy implements RejectedExecutionHandler{}
// worker 线程的运行任务
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{}
// 锁,在工作时需要加锁,因为workers是HashSet类型的,是线程不安全的
private final ReentrantLock mainLock = new ReentrantLock();
// 线程池所有工作线程的集合,只在有 mainLock锁才能访问
private final HashSet<Worker> workers = new HashSet<Worker>();
private final Condition termination = mainLock.newCondition();
private final BlockingQueue<Runnable> workQueue;// 阻塞队列
private long completedTaskCount; // 完成任务的数量
private volatile ThreadFactory threadFactory; // 线程工厂,创建线程
private volatile RejectedExecutionHandler handler;//拒绝策略
private volatile long keepAliveTime;// 空闲线程等待工作的超时失活时间,
private volatile boolean allowCoreThreadTimeOut;// 是否允许核心线程 失活
private volatile int corePoolSize; // 核心线程数
private volatile int maximumPoolSize; // 最大线程数
// ctl表示线程池控制状态,第一个参数表示运行状态、第二个是workerCount(有效线程数量)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 线程池的运行状态
private static final int RUNNING = -1 << COUNT_BITS;// RUNNING表示可以正常接受任务并处理阻塞队列中任务
private static final int SHUTDOWN = 0 << COUNT_BITS;// SHUTDOWN不能接受新的任务但是可以处理阻塞队列里的任务
private static final int STOP = 1 << COUNT_BITS;// STOP不能接受任务也不能处理阻塞队列里的任务
private static final int TIDYING = 2 << COUNT_BITS;// TIDYING所有任务都终止
private static final int TERMINATED = 3 << COUNT_BITS;// TERMINATED什么都不做
}
execute方法
- 如果工作线程少于corePoolSize线程,
则使用addWorker 创建线程并执行任务
, - 如果工作线程大于核心线程 且 阻塞队列 未满,则把新的任务放入阻塞队列。
- 如果阻塞队列已满,并且工作线程大于核心线程,小于最大线程数据,则创建线程执行任务
- 如果阻塞队列已满,并且工作线程大于等于最大线程,则执行拒绝策略。
addWorker流程
在线程池中创建一个新的线程并执行,firstTask: 新增线程执行的任务,null表示不执行
core: true表示使用核心线程池大小,false 表示使用最大线程池大小
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();// clt记录着runState和workerCount
//如果工作线程少于corePoolSize线程, 创建线程并执行任务,
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get(); // // 如果失败则重新获取 runState和 workerCount
}
// 如果工作线程大于核心线程 且 阻塞队列 未满,则把新的任务放入阻塞队列。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
//第一个参数为null,表示在线程池中创建一个线程,但不去启动
// 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize
addWorker(null, false);
}
// 如果阻塞队列已满,并且工作线程大于核心线程,小于最大线程数据,则创建线程执行任务
else if (!addWorker(command, false))
reject(command); //如果阻塞队列已满,并且工作线程大于最大线程,则执行拒绝策略。
}
// 在线程池中创建一个新的线程并执行
// firstTask: 新增线程执行的任务,null表示不执行
// core: true表示使用核心线程池大小,false 表示使用最大线程池大小
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果状态值 >= SHUTDOWN (不接新任务&不处理队列任务)并且
// 并且 !(rs为SHUTDOWN 且 firsTask为空 且 阻塞队列不为空. 退出
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 工作线程数大于容量,且大于核心或者最大线程,则退出
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 增加线程数 成功则退出
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// 失败则自旋保证成功
}
}
// 成功添加线程数量后
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // 创建工作线程
final Thread t = w.thread;
if (t != null) {
// 当前线程获取锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); // 往工作线程池里面添加 线程
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true; // 添加线程成功
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 添加线程成功 则启动线程
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
为什么需要持有mainLock?
因为workers是HashSet类型的,不能保证线程安全。w = new Worker(firstTask)相当于创建里一个线程,因为Worker继承于AQS 并实现了Runnable接口。
为什么不用ReentrantLock
因为AQS的tryAcquire方法它是不允许重入的,而ReentrantLock是可重入锁。对于线程来说,在执行过程是不允许其他锁可重入进来的,只需要执行 或者空闲两种状态。
run方法
runWorker在while循环中,不断地通过getTask()方法从workerQueue中获取任务,
- 如果线程池正在停止,则中断线程。
- 否则调用task.run()执行任务;如果task为null则跳出循环,执行processWorkerExit()方法,销毁线程workers.remove(w);
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread(); // 当前线程
Runnable task = w.firstTask; // 当前任务
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 任务不为空,或者从阻塞队列里面拿到任务不为空getTask()
while (task != null || (task = getTask()) != null) {
w.lock();
// 如果线程池是大于等于stop状态或者线程是没有中断的 则线程中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); // 执行run方法
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null; // 执行完任务为null,解锁
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
runWorker方法中,为什么要加锁?
shutdown方法与getTask方法存在竞态条件为
shutdown
获取当前mainLock锁,然后设置线程运行状态为SHUTDOWN
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
shutdownNow
设置为stop状态
submit
submit方法来自ThreadPoolExecutor的父类AbstractExecutorService,
对于传入的runnable接口会封装成FutureTask类,result为null,或者传入的结果,然后再执行execute方法。
对于传入的Callable接口直接封装成FutureTask。
FutureTask请看