# JAVA线程池
JAVA 线程池 2019-03-29# 线程池概述
java.util.concurrent.Executors
提供了一个 java.util.concurrent.Executor
接口的实现用于创建线程池
多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。
# 实现原理
java
线程池的实现原理很简单,说白了就是一个线程集合workerSet
和一个阻塞队列workQueue
。当用户向线程池提交一个任务(也就是线程)时,线程池会先将任务放入workQueue
中。workerSet
中的线程会不断的从workQueue
中获取线程然后执行。当workQueue
中没有任务的时候,worker
就会阻塞,直到队列中有任务了就取出来继续执行。
# 线程池的作用
线程池作用就是限制系统中执行线程的数量。根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;少了浪费了系统资源,多了造成系统拥挤效率不高。用线程池控制线程数量,其他线程排队等候。一个任务执行完毕,再从队列的中取最前面的任务开始执行。若队列中没有等待进程,线程池的这一资源处于等待。当一个新任务需要运行时,如果线程池中有等待的工作线程,就可以开始运行了;否则进入等待队列。
# 线程池的优点
- 减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。
- 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
- 便于统一管理、分配、调优和监控。
Java1.5中引入的Executor框架把任务的提交和执行进行解耦,只需要定义好任务,然后提交给线程池,而不用关心该任务是如何执行、被哪个线程执行,以及什么时候执行。
# Executor框架接口
Executor
框架是一个根据一组执行策略调用,调度,执行和控制的异步任务的框架,目的是提供一种将”任务提交”与”任务如何运行”分离开来的机制。
- J.U.C中有三个Executor接口:
- Executor:一个运行新任务的简单接口;
- ExecutorService:扩展了Executor接口。添加了一些用来管理执行器生命周期和任务生命周期的方法;
- ScheduledExecutorService:扩展了ExecutorService。支持Future和定期执行任务。
# 主要参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize
: 核心线程数量,即规定线程池有几个线程(worker)在运行。- 如果运行的线程少于 corePoolSize,则创建新线程来处理任务,即使线程池中的其他线程是空闲的;
- 如果线程池中的线程数量大于等于 corePoolSize 且小于 maximumPoolSize,则只有当workQueue满时才创建新的线程去处理任务;
- 如果设置的corePoolSize 和 maximumPoolSize相同,则创建的线程池的大小是固定的,这时如果有新任务提交,若workQueue未满,则将请求放入workQueue中,等待有空闲的线程去从workQueue中取任务并处理;
- 如果运行的线程数量大于等于maximumPoolSize,这时如果workQueue已经满了,则通过handler所指定的策略来处理任务;
所以,任务提交时,判断的顺序为 corePoolSize –> workQueue –> maximumPoolSize。
maximumPoolSize
: 最大线程数量。当workQueue满了,不能添加任务的时候,这个参数才会生效。规定线程池最多只能有多少个线程(worker)在执行。keepAliveTime
: 线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime。unit
: 生存时间对于的单位workQueue
: 等待队列,当任务提交时,如果线程池中的线程数量大于等于corePoolSize的时候,把该任务封装成一个Worker对象放入等待队列。主要有以下几种处理方式:- 直接切换:这种方式常用的队列是SynchronousQueue,但现在还没有研究过该队列,这里暂时还没法介绍;
- 使用无界队列:一般使用基于链表的阻塞队列LinkedBlockingQueue。如果使用这种方式,那么线程池中能够创建的最大线程数就是corePoolSize,而maximumPoolSize就不会起作用了(后面也会说到)。当线程池中所有的核心线程都是RUNNING状态时,这时一个新的任务提交就会放入等待队列中。
- 使用有界队列:一般使用ArrayBlockingQueue。使用该方式可以将线程池的最大线程数量限制为maximumPoolSize,这样能够降低资源的消耗,但同时这种方式也使得线程池对线程的调度变得更困难,因为线程池和队列的容量都是有限的值,所以要想使线程池处理任务的吞吐率达到一个相对合理的范围,又想使线程调度相对简单,并且还要尽可能的降低线程池对资源的消耗,就需要合理的设置这两个数量。
threadFactory
: 它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。handler
: 它是RejectedExecutionHandler类型的变量,表示线程池的拒绝策略。如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。线程池提供了4种策略:- AbortPolicy:直接抛出异常,这是默认策略;
- CallerRunsPolicy:用调用者所在的线程来执行任务;
- DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
- DiscardPolicy:直接丢弃任务;
**用户通过submit
提交一个任务。线程池会执行如下流程: **
- 判断当前运行的
worker
数量是否超过corePoolSize
,如果不超过corePoolSize
。就创建一个worker
直接执行该任务。—— 线程池最开始是没有worker
在运行的 。 - 如果正在运行的worker数量超过或者等于
corePoolSize
,那么就将该任务加入到workQueue
队列中去。 - 如果
workQueue
队列满了,也就是offer
方法返回false的话,就检查当前运行的worker
数量是否小于maximumPoolSize
,如果小于就创建一个worker
直接执行该任务。 - 如果当前运行的
worker
数量是否大于等于maximumPoolSize
,那么就执行RejectedExecutionHandler
来拒绝这个任务的提交。
# 线程池常见的5种状态
RUNNING
:能接受新提交的任务,并且也能处理阻塞队列中的任务;SHUTDOWN
:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用 shutdown()方法进入该状态);STOP
:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;TIDYING
:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用
terminated() 方法进入TERMINATED 状态。TERMINATED
:在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。
进入TERMINATED的条件如下:
- 线程池不是RUNNING状态;
- 线程池状态不是TIDYING状态或TERMINATED状态;
- 如果线程池状态是SHUTDOWN并且workerQueue为空;
- workerCount为0;
- 设置TIDYING状态成功。
# 源码解析
# ThreadPoolExecutor
- ThreadPoolExecutor中的关键属性
//这个属性是用来存放 当前运行的worker数量以及线程池状态的
//int是32位的,这里把int的高3位拿来充当线程池状态的标志位,后29位拿来充当当前运行worker的数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//存放任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
//worker的集合,用set来存放
private final HashSet<Worker> workers = new HashSet<Worker>();
//历史达到的worker数最大值
private int largestPoolSize;
//当队列满了并且worker的数量达到maxSize的时候,执行具体的拒绝策略
private volatile RejectedExecutionHandler handler;
//超出coreSize的worker的生存时间
private volatile long keepAliveTime;
//常驻worker的数量
private volatile int corePoolSize;
//最大worker的数量,一般当workQueue满了才会用到这个参数
private volatile int maximumPoolSize;
# 提交任务
- 提交任务时处理过程相关源码
public void execute(Runnable command) {
if (command == )
throw new NullPointerException();
int c = ctl.get();
//workerCountOf(c)会获取当前正在运行的worker数量
if (workerCountOf(c) < corePoolSize) {
//如果workerCount小于corePoolSize,就创建一个worker然后直接执行该任务
if (addWorker(command, true))
return;
c = ctl.get();
}
//isRunning(c)是判断线程池是否在运行中,如果线程池被关闭了就不会再接受任务
//后面将任务加入到队列中
if (isRunning(c) && workQueue.offer(command)) {
//如果添加到队列成功了,会再检查一次线程池的状态
int recheck = ctl.get();
//如果线程池关闭了,就将刚才添加的任务从队列中移除
if (! isRunning(recheck) && remove(command))
//执行拒绝策略
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(, false);
}
//如果加入队列失败,就尝试直接创建worker来执行任务
else if (!addWorker(command, false))
//如果创建worker失败,就执行拒绝策略
reject(command);
}
# addWorker
- 添加worker的方法addWorker源码
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//使用自旋+cas失败重试来保证线程竞争问题
for (;;) {
//先获取线程池的状态
int c = ctl.get();
int rs = runStateOf(c);
// 如果线程池是关闭的,或者workQueue队列非空,就直接返回false,不做任何处理
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//根据入参core 来判断可以创建的worker数量是否达到上限,如果达到上限了就拒绝创建worker
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//没有的话就尝试修改ctl添加workerCount的值。这里用了cas操作,如果失败了下一个循环会继续重试,直到设置成功
if (compareAndIncrementWorkerCount(c))
//如果设置成功了就跳出外层的那个for循环
break retry;
//重读一次ctl,判断如果线程池的状态改变了,会再重新循环一次
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = ;
try {
final ReentrantLock mainLock = this.mainLock;
//创建一个worker,将提交上来的任务直接交给worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != ) {
//加锁,防止竞争
mainLock.lock();
try {
int c = ctl.get();
int rs = runStateOf(c);
//还是判断线程池的状态
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == )) {
//如果worker的线程已经启动了,会抛出异常
if (t.isAlive())
throw new IllegalThreadStateException();
//添加新建的worker到线程池中
workers.add(w);
int s = workers.size();
//更新历史worker数量的最大值
if (s > largestPoolSize)
largestPoolSize = s;
//设置新增标志位
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//如果worker是新增的,就启动该线程
if (workerAdded) {
t.start();
//成功启动了线程,设置对应的标志位
workerStarted = true;
}
}
} finally {
//如果启动失败了,会触发执行相应的方法
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
# Worker
Worker
是ThreadPoolExecutor
内部定义的一个内部类。Worker
的继承关系如下:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
它实现了Runnable
接口,所以可以拿来当线程用。同时它还继承了AbstractQueuedSynchronizer
同步器类,主要用来实现一个不可重入的锁。
- 构造方法
//运行的线程,前面addWorker方法中就是直接通过启动这个线程来启动这个worker
final Thread thread;
//当一个worker刚创建的时候,就先尝试执行这个任务
Runnable firstTask;
//记录完成任务的数量
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//创建一个Thread,将自己设置给他,后面这个thread启动的时候,也就是执行worker的run方法
this.thread = getThreadFactory().newThread(this);
}
- run方法
public void run() {
//这里调用了ThreadPoolExecutor的runWorker方法
runWorker(this);
}
- ThreadPoolExecutor的runWorker方法
final void runWorker(Worker w) {
//获取当前线程
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = ;
//执行unlock方法,允许其他线程来中断自己
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//如果前面的firstTask有值,就直接执行这个任务
//如果没有具体的任务,就执行getTask()方法从队列中获取任务
//这里会不断执行循环体,除非线程中断或者getTask()返回null才会跳出这个循环
while (task != || (task = getTask()) != ) {
//执行任务前先锁住,这里主要的作用就是给shutdown方法判断worker是否在执行中的
//shutdown方法里面会尝试给这个线程加锁,如果这个线程在执行,就不会中断它
w.lock();
//判断线程池状态,如果线程池被强制关闭了,就马上退出
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//执行任务前调用。预留的方法,可扩展
beforeExecute(wt, task);
Throwable thrown = ;
try {
//真正的执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//执行任务后调用。预留的方法,可扩展
afterExecute(task, thrown);
}
} finally {
task = ;
//记录完成的任务数量
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
- getTask()方法
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果线程池已经关闭了,就直接返回null,
//如果这里返回null,调用的那个worker就会跳出while循环,然后执行完销毁线程
//SHUTDOWN状态表示执行了shutdown()方法
//STOP表示执行了shutdownNow()方法
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return ;
}
//获取当前正在运行中的worker数量
int wc = workerCountOf(c);
// 如果设置了核心worker也会超时或者当前正在运行的worker数量超过了corePoolSize,就要根据时间判断是否要销毁线程了
//其实就是从队列获取任务的时候要不要设置超时间时间,如果超过这个时间队列还没有任务进来,就会返回null
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//如果上一次循环从队列获取到的未null,这时候timedOut就会为true了
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//通过cas来设置WorkerCount,如果多个线程竞争,只有一个可以设置成功
//最后如果没设置成功,就进入下一次循环,说不定下一次worker的数量就没有超过corePoolSize了,也就不用销毁worker了
if (compareAndDecrementWorkerCount(c))
return ;
continue;
}
try {
//如果要设置超时时间,就设置一下咯
//过了这个keepAliveTime时间还没有任务进队列就会返回null,那worker就会销毁
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != )
return r;
//如果r为null,就设置timedOut为true
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
- 添加Callable任务的实现源码
public <T> Future<T> submit(Callable<T> task) {
if (task == ) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
要添加一个有返回值的任务的实现也很简单。其实就是对任务做了一层封装,将其封装成Future
,然后提交给线程池执行,最后返回这个Future
。
这里的 newTaskFor(task)
方法会将其封装成一个FutureTask
类。
外部的线程拿到这个Future
,执行get()
方法的时候,如果任务本身没有执行完,执行线程就会被阻塞,直到任务执行完。
- FutureTask的get方法
public V get() throws InterruptedException, ExecutionException {
int s = state;
//判断状态,如果任务还没执行完,就进入休眠,等待唤醒
if (s <= COMPLETING)
s = awaitDone(false, 0L);
//返回值
return report(s);
}
FutureTask
中通过一个state
状态来判断任务是否完成。当run
方法执行完后,会将state
状态置为完成,同时唤醒所有正在等待的线程。
- FutureTask的run方法
public void run() {
//判断线程的状态
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != && state == NEW) {
V result;
boolean ran;
try {
//执行call方法
result = c.call();
ran = true;
} catch (Throwable ex) {
result = ;
ran = false;
setException(ex);
}
if (ran)
//这个方法里面会设置返回内容,并且唤醒所以等待中的线程
set(result);
}
} finally {
runner = ;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
- shutdown方法
shutdown
方法会将线程池的状态设置为SHUTDOWN
,线程池进入这个状态后,就拒绝再接受任务,然后会将剩余的任务全部执行完。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查是否可以关闭线程
checkShutdownAccess();
//设置线程池状态
advanceRunState(SHUTDOWN);
//尝试中断worker
interruptIdleWorkers();
//预留方法,留给子类实现
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍历所有的worker
for (Worker w : workers) {
Thread t = w.thread;
//先尝试调用w.tryLock(),如果获取到锁,就说明worker是空闲的,就可以直接中断它
//注意的是,worker自己本身实现了AQS同步框架,然后实现的类似锁的功能
//它实现的锁是不可重入的,所以如果worker在执行任务的时候,会先进行加锁,这里tryLock()就会返回false
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
- shutdownNow方法
shutdownNow
做的比较绝,它先将线程池状态设置为STOP,然后拒绝所有提交的任务。最后中断左右正在运行中的worker
,然后清空任务队列。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//检测权限
advanceRunState(STOP);
//中断所有的worker
interruptWorkers();
//清空任务队列
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍历所有worker,然后调用中断方法
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
# 常见的4种线程池
# newCachedThreadPool
newCachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
package test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int index = i;
try {
Thread.sleep(index * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.execute(new Runnable() {
public void run() {
System.out.println(index);
}
});
}
}
}
# newFixedThreadPool
newFixedThreadPool
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
package test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
final int index = i;
fixedThreadPool.execute(new Runnable() {
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
# newScheduledThreadPool
newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。
package test;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.schedule(new Runnable() {
public void run() {
System.out.println("delay 3 seconds");
}
}, 1, 3, TimeUnit.SECONDS); //表示延迟1秒后每3秒执行一次。
}
}
# newSingleThreadExecutor
newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
package test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int index = i;
singleThreadExecutor.execute(new Runnable() {
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}