线程池

2023/03/17
Note

本篇基于Java11

1. 构造器

线程池的构造器很多,我们直接看参数最多的那一个:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
java
  • corePoolSize:线程池保存在线程池中的核心线程数,当线程数量超过该值后,将会回收多余的线程.
  • maximumPoolSize:线程池最大的线程数
  • keepAliveTime:当线程数大于corePoolSize时,多余的线程多久后被清除
  • unitkeepAliveTime的时间单位
  • workQueue:保存任务的工作队列
  • threadFactory:要创建新线程时使用的工厂类
  • handler:当任务由于超出线程池容量而被阻拦时将会执行此拦截器

线程池的基本工作流程如下:

  1. 调用execute执行一个任务
  2. 若工作线程数小于corePoolSize,则创建一个新的工作线程去执行这个任务
  3. 若工作线程数已经大于等于corePoolSize,则将任务添加到工作队列中,工作线程执行完后会自动去执行队列中的任务
  4. 若队列已满,则尝试增加工作线程去执行任务
  5. 若工作线程数超过maximumPoolSize,则执行拒绝策略

1.1 拒绝策略

ThreadPoolExecutor里有如下4个默认拒绝策略(RejectedExecutionHandler)。

  • CallerRunsPolicy:如果线程池没有被关闭,则由当前线程执行(即提交任务的那个线程)
  • AbortPolicy:直接抛出一个RejectedExecutionException
  • DiscardPolicy:直接丢弃,并且没有任何提示
  • DiscardOldestPolicy:丢弃任务队列中最早加入的一个任务,然后执行当前任务

其中AbortPolicy为构造器中缺省时的默认值。

1.2 工作队列

  • ArrayBlockingQueue:一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
  • LinkedBlockingQueue:一个基于链表结构的无界阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
  • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

2. ctl

ctl是一个表示线程当前状态的原子整型:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
java

它的低29位为当前正在运行的线程数,高3位表示线程池的状态:

状态源码状态位说明
RUNNING-1 << COUNT_BITS111线程池正在正常运行
SHUTDOWN0 << COUNT_BITS000线程池准备关闭,此时拒绝新的任务,在所有任务执行完后进入TIDYING状态
STOP1 << COUNT_BITS001线程池准备关闭,此时会中断所有正在运行的线程,不再接收新任务,也不会执行已经在队列里的任务,工作线程数为0时进入TIDYING状态
TIDYING2 << COUNT_BITS010所有任务已经终止,进行整理状态
TERMINATED3 << COUNT_BITS011线程池彻底终止运行

COUNT_BITS = Integer.SIZE - 3;

关闭线程池可以通过shutdown()或者shutdownNow()来分别进入SHUTDOWNSTOP状态

3. 执行任务

3.1 addWorker

private boolean addWorker(Runnable firstTask, boolean core) { retry: for (int c = ctl.get();;) { // 检查ctl是否大于等于SHUTDOWN(常量,为0),即判断是否进入了关闭状态 if (runStateAtLeast(c, SHUTDOWN) // 检查ctl是否大于等于STOP(常量,1<<29),大于等于STOP时时不再接收新任务 && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) // 表示添加失败 return false; for (;;) { // 判断当前正在工作的线程是否超出限制 if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false; // 将ctl自增,如果成功则结束最外层循环 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl // 原子自增失败,判断是否进入关闭状态 if (runStateAtLeast(c, SHUTDOWN)) // 回到外层循环 continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); // 这个Thread是根据ThreadFactory获取的 final Thread t = w.thread; if (t != null) { // 加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); if (isRunning(c) || // 判断当前状态是否为SHUTDOWN,并且任务为空 (runStateLessThan(c, STOP) && firstTask == null)) { if (t.getState() != Thread.State.NEW) throw new IllegalThreadStateException(); // 存到工作线程Set集合里 workers.add(w); workerAdded = true; int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; } } finally { mainLock.unlock(); } if (workerAdded) { // 启动任务 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
java

3.2 execute

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // ctl是一个原子整型,前3位为运行状态,后29位为运行中的线程数 int c = ctl.get(); // workerCountOf就是获取后29位 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 走到这里,说明addWorker执行失败了,或者工作线程数大于等于了corePoolSize // 如果线程池还在运行,将任务添加到队列里 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 重新判断线程数是否停止了,如果是则移除这个任务 if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 再次尝试添加 else if (!addWorker(command, false)) reject(command); }
java

3.3 线程池是怎么运行的

其实把上面的代码看完可能还是不清楚线程池是怎么运作的,没事,还记得addWorker里调用了线程的start方法吗,我们来看一下:

Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; ... t.start();
java

来看一下worker类:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable { Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker. */ public void run() { runWorker(this); } }
java

可以发现线程调用start方法后实则运行的是runWorker,在看runWorker前先来看一下getTask方法:

先来看它的注释:

Performs blocking or timed wait for a task, depending on current configuration settings, or returns null if this worker must exit because of any of: 1. There are more than maximumPoolSize workers (due to a call to setMaximumPoolSize). 2. The pool is stopped. 3. The pool is shutdown and the queue is empty. 4. This worker timed out waiting for a task, and timed-out workers are subject to termination (that is, allowCoreThreadTimeOut || workerCount > corePoolSize) both before and after the timed wait, and if the queue is non-empty, this worker is not the last thread in the pool.

大致意思是:执行阻塞或定时等待任务,当出现如下情况时返回null:

  • 工作线程数超过maximumPoolSize
  • 线程池关闭(调用shutdownNow方法)
  • 线程池关闭,并且等待队列为空(调用shutdown方法)
  • 该Worker超时等待一个任务
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); // Check if queue empty only if necessary. // &&优先级高于|| if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? // 允许常驻线程超时 || 线程数量是否大于核心线程数 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 线程池线程数量超过最大值 || 当前线程开启计时并且超时 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 删除worker if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 这里都是进行阻塞,直到workQueue弹出一个元素 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; // 标记超时 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
java

然后再来看runWorker

final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 在这里执行任务或者获取任务,当没有新任务时这个worker就会被删掉 while (task != null || (task = getTask()) != null) { // 加锁 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // 上面的注释意思是:如果线程池被关闭(shutdownNow),确保线程被中断,否则则确保没有被中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 这一步交给了子类实现 beforeExecute(wt, task); try { // 执行任务 task.run(); // 这一步同样交给了子类实现 afterExecute(task, null); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
java

3.4 为什么Worker要继承AQS

runWorker里我们可以发现,我们是通过worker来加了锁的,这里为什么要加锁呢?我们一个worker不是只有一个线程吗?

首先观察一下Worker的实现,可以发现它不是可重入锁,这个可以去类比ReentrantLock,然后我们还知道,我们调用shutdown时,并不会停掉还在运行中的线程,停掉的是那些正在等待任务的线程,那么我们怎么判断一个Worker正处于那种状态呢?

我们先从shutdown入手:

public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); // 中断闲置的worker interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); } private void interruptIdleWorkers() { interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; // 看到这个tryLock没,如果成功拿到锁了,说明当前线程没有执行任务 if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } } // Worker public boolean tryLock() { return tryAcquire(1); } // Worker protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
java

在回到Worker,我们再看一眼构造器:

Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker 禁止中断直到worker启动,即线程启动 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
java

所以说Worker的初始State为-1,用来禁止线程中断,在调用runWorker后,在最外部有一个unlock解锁操作用于将Worker状态设置为0

4. mainLock

mainLock的注释如下:

Lock held on access to workers set and related bookkeeping. While we could use a concurrent set of some sort, it turns out to be generally preferable to use a lock. Among the reasons is that this serializes interruptIdleWorkers, which avoids unnecessary interrupt storms, especially during shutdown. Otherwise exiting threads would concurrently interrupt those that have not yet interrupted. It also simplifies some of the associated statistics bookkeeping of largestPoolSize etc. We also hold mainLock on shutdown and shutdownNow, for the sake of ensuring workers set is stable while separately checking permission to interrupt and actually interrupting.

大致意思是:在访问Worker和一些其它计数变量时加锁。虽然可以用并发的集合来处理,但实际上使用锁更好。原因之一是这些序列化的闲置线程将会避免一些无意义的中断风暴,特别是在关闭期间,另外在退出线程时将同时中断尚未中断的线程((没读懂这句话,机翻的)。加锁还可以简化变量的相关记录。另外为了保证Worker的安全,应在退出时进行加锁。(翻译的稀碎,有能力建议自己翻译)

4.1 processWorkerExit

顾名思义,这个方法在Worker退出时会被调用

private void processWorkerExit(Worker w, boolean completedAbruptly) { // 判断是否由于用户异常导致退出 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; // 移除worker workers.remove(w); } finally { mainLock.unlock(); } // 尝试将线程池从stop或shutdown转换为terminate状态 tryTerminate(); int c = ctl.get(); // 判断是否不处于STOP状态 if (runStateLessThan(c, STOP)) { // 如果不是由于用户异常导致 if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; // 确保worker的数量最多有corePoolSize,超过了则不再新建 if (workerCountOf(c) >= min) return; // replacement not needed } // 添加一个新Worker addWorker(null, false); } }
java

5. 其它

ThreadPoolExecutor一同作为线程池的还有ForkJoinPool

Java多线程之ThreadPoolExecutor和ForkJoinPool的用法 - 掘金 (juejin.cn)