本文记录一下ThreadPollExecutor如何管理线程和任务,何时创建线程?何时销毁线程?
threadPoolExecutor提交任务时有两种方式,一种是execute,一种是submit,众所周知,其区别是后者有返回值,其类型是RunnableFuture。但在这里不关心返回值,只需要知道submit最终执行的也是execute就可以。
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
execute方法的逻辑其实就是大家倒背如流的java线程池的工作逻辑(看代码注释就行,不展开了),这里主要关注ctl是个什么东西?
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//
int c = ctl.get();
//当前活跃线程数量小于核心线程数,直接创建新线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//达到核心线程数量之后,先将任务放入工作队列,isRunning(c)是表示线程池还是运行状态就行
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//如果线程池关闭了,移除并直接交给拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
//这里其实没理解
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//工作队列也满了,创建工作线程去执行,工作线程满了就去调用拒绝策略
else if (!addWorker(command, false))
reject(command);
}
ctl
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl是一个原子数值类型的实例,其工作是用来计算线程池活跃线程数量和记录当前线程池状态。
那java是怎么用一个数值干两个活的呢?利用的就是高低位的思想,将数值对应的二进制位高n位用作一个功能,低32-n位用作另一个功能。
在这里,高3位用来表示线程池的状态,低29位是线程池的数量。之所以需要3位来表示状态,是由于线程池有5个状态。
下面的代码对应的逻辑,其中需要理解的是CAPACITY是低29位为1的二进制位,其作为一个用于比较的基准值。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0))
//COUNT_BITS是29,CAPACITY是低29位全为1的二进制表示
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
//将基准值取反就是高3位为1,低29位为0,再和ctl去与,获取到的就是状态值
private static int runStateOf(int c) { return c & ~CAPACITY; }
//当前值与上低29位为1的基准值,获取到的就是线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
//重新初始化ctl值,默认初始化RUNING和0表示状态为运行,0个线程
private static int ctlOf(int rs, int wc) { return rs | wc; }
然后简单说一下5个状态,其实doc上写的蛮清晰的。
RUNNING表示可以接受并处理新任务
SHUTDOWN表示不接受新任务,但可以处理已经接受的任务,通过调用shutdown到该状态
STOP表示不接受新任务,不处理排队的任务,同时中断正在处理的任务,shutdownNow到该状态
TIDYING表示工作队列清空了,且线程池也为清空了,准备执行terminated
TERMINATED表示terminated执行完了
创建-addWorker
现在我们知道了,线程池对于线程的计数是通过ctl的低29位来搞的,那么是通过什么来真正操作线程的呢?
其实是通过workers集合,来管理持有Thread对象的一个个Worker类,addWorker方法就是创建的逻辑:主要逻辑是ctl+1,然后创建worker类并添加到workers,最终启动worker线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 线程池状态如果SHUTDOWN及以上是无法添加新任务的,因此要return false
if (rs >= SHUTDOWN &&
//不理解:以下三个满足任意一个为假就总的为真:rs>shutdown || task不为空 || workQueue是空
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//工作线程达到上限也要返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//能执行到这里有几种情况:
//1.线程池是running状态且没有达到最大线程数量
//2.线程池是shutdown但是工作队列没有清空
//如果ctl++成功,表明线程池多了一个线程,当然此时还没有创建
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
//创建worker,并添加到workers,然后启动worker线程
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
运行- runWoker
worker.thread.start之后,调用的是run 方法,最终调用的是runWorker方法,这个是当前线程的执行逻辑:
本质上就是一直循环拉取新的任务,能拉取到就执行,拉取不到了就表明任务全部执行完了,当前线程就可以释放了。
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 如果还没有开始执行任务,可以中断
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
task.run();
w.completedTasks++;
w.unlock();
}
completedAbruptly = false;
} finally {
//停止任务
processWorkerExit(w, completedAbruptly);
}
}
删除线程-getTask&processWorkerExit
线程的释放是在获取不到任务之后,此时只有工作线程会进行回收,核心线程会阻塞在take方法
删除分为两步:其中ctl-1操作是在getTask进行的,worker的移除是在processWorkerExit处理的。
timed字段的逻辑表明任意线程都有可能被删,结合之前创造线程时的逻辑,其实线程池里并没有明确的标识区分线程A和线程B是核心还是非核心,创造和删除的时候都是这样,因此,删除的时候是完全随机的。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 当没有任务的时候,ctl要减1,并返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//在上一次从队列取任务已经超时的情况下,此时线程依然有存活或者队列刚空,这个线程都应该被释放
//但是由于其他线程也可能释放,因此这里是cas,只有一个在当前循环会成功,失败的会从外层循环再次尝试
//直到一直减到核心线程数量为止
//这里的wc > maximumPoolSize不理解,线程数量怎么会超呢??
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
//工作线程在获取任务时会等待一定的时间(这是线程存活时间的体现)
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
//当达到核心线程数量之下时会在这里阻塞
workQueue.take();
if (r != null)
return r;
//超时没有获取到,下次循环就可以不再试了
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
//从workers中移除线程,这样线程就可以被垃圾回收了
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
//每个线程结束的时候,都要去尝试关闭线程池,万一你正好是最后一个线程呢
tryTerminate();
int c = ctl.get();
//如果线程池还是存活,且任务还没有处理完
//比如execute抛异常的情况,会在这里新建线程
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
多线程异常捕获问题
ThreadPoolExecutor executor=new ThreadPoolExecutor(1,8,1, TimeUnit.MINUTES,new LinkedBlockingQueue<>(32),new NamedThreadFactory("transfer",false),new ThreadPoolExecutor.DiscardOldestPolicy());
executor.execute(()->{
System.out.println("execute");
throw new RuntimeException("测试多线程抛异常");
});
executor.submit(()->{
System.out.println("submit");
throw new RuntimeException("测试多线程抛异常");
});
executor.shutdown();
通过上述代码测试,execute方法是可以打印异常堆栈的,但是submit方法提交的任务就没有打印,这是为啥呢?
在runworker方法中,是有尝试catch异常的,
在Thread中,异常是通过UncaughtExceptionHandler来进行处理的,这个类是通过jvm回调的,因此普通的execute提交的runnable,最终都会打印
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
}
private void dispatchUncaughtException(Throwable e) {
getUncaughtExceptionHandler().uncaughtException(this, e);
}
//假如完全没有自定义过,最终会执行到ThreadGroup的这个方法
public void uncaughtException(Thread t, Throwable e) {
if (parent != null) {
parent.uncaughtException(t, e);
} else {
Thread.UncaughtExceptionHandler ueh =
Thread.getDefaultUncaughtExceptionHandler();
if (ueh != null) {
ueh.uncaughtException(t, e);
} else if (!(e instanceof ThreadDeath)) {
//会在这里打印
System.err.print("Exception in thread \""
+ t.getName() + "\" ");
e.printStackTrace(System.err);
}
}
}
但是submit不同,submit本质上是提交了一个FutureTask类型的任务,其run方法的逻辑如下
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
//放入outcome中,并设置task的state为EXCEPTIONAL
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
//会调到repost
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
//抛出一个ExecutionException异常
throw new ExecutionException((Throwable)x);
}
可以看到,其在内部捕获了异常并没有向上抛出,因此,上层不会处理。只能通过get方法触发获取outcome里记录的异常
另外,由于submit方式提前捕获了异常,因此该线程并不会被删除,而execute方式则会将该线程删除,并尝试新建线程