不断的学习,我们才能不断的前进
一个好的程序员是那种过单行线马路都要往两边看的人

ThreadPoolExecutor

使用线程池的目的是:

  • 降低资源消耗:避免重复的创建和销毁线程造成的消耗。
  • 提高响应速度:当任务到达后,任务可以不需要等到线程创建就能立即执行。
  • 提供对线程的可管理性:提供对线程的统一分配、调优和监控。

核心参数

7大核心参数:核心线程池大小、最大线程池大小、空闲线程等待工作的超时时间、时间单元、线程工厂、阻塞队列
4大拒绝策略:拒绝任务并抛出异常(AbortPolicy,默认)、拒绝任务不抛出异常(DiscardPolicy)、和最早的任务竞争(DiscardOldestPolicy)、由当前线程执行任务(CallerRunsPolicy)。
Worker表示工作中的任务,继承于AQS模版, Worker初始化的时候会自己创建线程工厂。
线程池的状态:RUNNING表示可以正常接受任务并处理阻塞队列中任务、SHUTDOWN不能接受新的任务但是可以处理阻塞队列里的任务、STOP不能接受任务也不能处理阻塞队列里的任务、TIDYING所有任务都终止、TERMINATED什么都不做

xian-cheng-chi-zhuang-tai

    public class ThreadPoolExecutor extends AbstractExecutorService {
        // 四大拒绝策略 
        public static class CallerRunsPolicy implements RejectedExecutionHandler{}
        public static class AbortPolicy implements RejectedExecutionHandler{}
        public static class DiscardPolicy implements RejectedExecutionHandler{}
        public static class DiscardOldestPolicy implements RejectedExecutionHandler{}
        
        // worker 线程的运行任务 
        private final class Worker extends AbstractQueuedSynchronizer implements Runnable{}
        
        // 锁,在工作时需要加锁,因为workers是HashSet类型的,是线程不安全的
        private final ReentrantLock mainLock = new ReentrantLock();
        // 线程池所有工作线程的集合,只在有 mainLock锁才能访问
        private final HashSet<Worker> workers = new HashSet<Worker>();
        private final Condition termination = mainLock.newCondition();
        
        private final BlockingQueue<Runnable> workQueue;// 阻塞队列
        private long completedTaskCount; // 完成任务的数量
        private volatile ThreadFactory threadFactory; // 线程工厂,创建线程
        private volatile RejectedExecutionHandler handler;//拒绝策略
        private volatile long keepAliveTime;// 空闲线程等待工作的超时失活时间,
        private volatile boolean allowCoreThreadTimeOut;// 是否允许核心线程 失活
        private volatile int corePoolSize; // 核心线程数
        private volatile int maximumPoolSize; // 最大线程数
        
        // ctl表示线程池控制状态,第一个参数表示运行状态、第二个是workerCount(有效线程数量)
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        
        // 线程池的运行状态
        private static final int RUNNING    = -1 << COUNT_BITS;// RUNNING表示可以正常接受任务并处理阻塞队列中任务
        private static final int SHUTDOWN   =  0 << COUNT_BITS;// SHUTDOWN不能接受新的任务但是可以处理阻塞队列里的任务
        private static final int STOP       =  1 << COUNT_BITS;// STOP不能接受任务也不能处理阻塞队列里的任务
        private static final int TIDYING    =  2 << COUNT_BITS;// TIDYING所有任务都终止
        private static final int TERMINATED =  3 << COUNT_BITS;// TERMINATED什么都不做
    }

execute方法

  1. 如果工作线程少于corePoolSize线程,则使用addWorker 创建线程并执行任务
  2. 如果工作线程大于核心线程 且 阻塞队列 未满,则把新的任务放入阻塞队列。
  3. 如果阻塞队列已满,并且工作线程大于核心线程,小于最大线程数据,则创建线程执行任务
  4. 如果阻塞队列已满,并且工作线程大于等于最大线程,则执行拒绝策略。

addWorker流程
在线程池中创建一个新的线程并执行,firstTask: 新增线程执行的任务,null表示不执行
core: true表示使用核心线程池大小,false 表示使用最大线程池大小

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();// clt记录着runState和workerCount
        
        //如果工作线程少于corePoolSize线程, 创建线程并执行任务,
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get(); // // 如果失败则重新获取 runState和 workerCount
        }
        
        // 如果工作线程大于核心线程 且 阻塞队列 未满,则把新的任务放入阻塞队列。
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
            //第一个参数为null,表示在线程池中创建一个线程,但不去启动
            // 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize
                addWorker(null, false);
        }
        // 如果阻塞队列已满,并且工作线程大于核心线程,小于最大线程数据,则创建线程执行任务
        else if (!addWorker(command, false)) 
            reject(command); //如果阻塞队列已满,并且工作线程大于最大线程,则执行拒绝策略。
    }
    
    // 在线程池中创建一个新的线程并执行
    // firstTask: 新增线程执行的任务,null表示不执行
    // core: true表示使用核心线程池大小,false 表示使用最大线程池大小
     private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 如果状态值 >= SHUTDOWN (不接新任务&不处理队列任务)并且
            // 并且 !(rs为SHUTDOWN 且 firsTask为空 且 阻塞队列不为空. 退出
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                // 工作线程数大于容量,且大于核心或者最大线程,则退出
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 增加线程数 成功则退出
                if (compareAndIncrementWorkerCount(c))
                    break retry; 
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // 失败则自旋保证成功
            }
        }
        // 成功添加线程数量后

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask); // 创建工作线程
            final Thread t = w.thread;
            if (t != null) {
                // 当前线程获取锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w); // 往工作线程池里面添加 线程
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true; // 添加线程成功
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start(); // 添加线程成功 则启动线程
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

为什么需要持有mainLock?
因为workers是HashSet类型的,不能保证线程安全。w = new Worker(firstTask)相当于创建里一个线程,因为Worker继承于AQS 并实现了Runnable接口。
为什么不用ReentrantLock
因为AQS的tryAcquire方法它是不允许重入的,而ReentrantLock是可重入锁。对于线程来说,在执行过程是不允许其他锁可重入进来的,只需要执行 或者空闲两种状态。

run方法

runWorker在while循环中,不断地通过getTask()方法从workerQueue中获取任务,

  1. 如果线程池正在停止,则中断线程。
  2. 否则调用task.run()执行任务;如果task为null则跳出循环,执行processWorkerExit()方法,销毁线程workers.remove(w);
    public void run() {
            runWorker(this);
    }
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread(); // 当前线程
        Runnable task = w.firstTask; // 当前任务
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
             // 任务不为空,或者从阻塞队列里面拿到任务不为空getTask()
            while (task != null || (task = getTask()) != null) {
                w.lock();
               
                // 如果线程池是大于等于stop状态或者线程是没有中断的 则线程中断
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run(); // 执行run方法
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null; // 执行完任务为null,解锁
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

runWorker方法中,为什么要加锁?
shutdown方法与getTask方法存在竞态条件为

shutdown

获取当前mainLock锁,然后设置线程运行状态为SHUTDOWN

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

shutdownNow

设置为stop状态

submit

submit方法来自ThreadPoolExecutor的父类AbstractExecutorService,
对于传入的runnable接口会封装成FutureTask类,result为null,或者传入的结果,然后再执行execute方法。
对于传入的Callable接口直接封装成FutureTask。
FutureTask请看


目录