ThreadPoolExecutor分析

 

本文记录一下ThreadPollExecutor如何管理线程和任务,何时创建线程?何时销毁线程?

threadPoolExecutor提交任务时有两种方式,一种是execute,一种是submit,众所周知,其区别是后者有返回值,其类型是RunnableFuture。但在这里不关心返回值,只需要知道submit最终执行的也是execute就可以。

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

execute方法的逻辑其实就是大家倒背如流的java线程池的工作逻辑(看代码注释就行,不展开了),这里主要关注ctl是个什么东西?

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //
        int c = ctl.get();
        //当前活跃线程数量小于核心线程数,直接创建新线程
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //达到核心线程数量之后,先将任务放入工作队列,isRunning(c)是表示线程池还是运行状态就行
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //如果线程池关闭了,移除并直接交给拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //这里其实没理解
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //工作队列也满了,创建工作线程去执行,工作线程满了就去调用拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

ctl

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl是一个原子数值类型的实例,其工作是用来计算线程池活跃线程数量和记录当前线程池状态

那java是怎么用一个数值干两个活的呢?利用的就是高低位的思想,将数值对应的二进制位高n位用作一个功能,低32-n位用作另一个功能。

在这里,高3位用来表示线程池的状态,低29位是线程池的数量。之所以需要3位来表示状态,是由于线程池有5个状态。

下面的代码对应的逻辑,其中需要理解的是CAPACITY是低29位为1的二进制位,其作为一个用于比较的基准值。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0))

//COUNT_BITS是29,CAPACITY是低29位全为1的二进制表示
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl

//将基准值取反就是高3位为1,低29位为0,再和ctl去与,获取到的就是状态值
private static int runStateOf(int c)     { return c & ~CAPACITY; }

//当前值与上低29位为1的基准值,获取到的就是线程数
private static int workerCountOf(int c)  { return c & CAPACITY; }

//重新初始化ctl值,默认初始化RUNING和0表示状态为运行,0个线程
private static int ctlOf(int rs, int wc) { return rs | wc; }

然后简单说一下5个状态,其实doc上写的蛮清晰的。

RUNNING表示可以接受并处理新任务

SHUTDOWN表示不接受新任务,但可以处理已经接受的任务,通过调用shutdown到该状态

STOP表示不接受新任务,不处理排队的任务,同时中断正在处理的任务,shutdownNow到该状态

TIDYING表示工作队列清空了,且线程池也为清空了,准备执行terminated

TERMINATED表示terminated执行完了

创建-addWorker

现在我们知道了,线程池对于线程的计数是通过ctl的低29位来搞的,那么是通过什么来真正操作线程的呢?

其实是通过workers集合,来管理持有Thread对象的一个个Worker类,addWorker方法就是创建的逻辑:主要逻辑是ctl+1,然后创建worker类并添加到workers,最终启动worker线程

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 线程池状态如果SHUTDOWN及以上是无法添加新任务的,因此要return false
            if (rs >= SHUTDOWN &&
                //不理解:以下三个满足任意一个为假就总的为真:rs>shutdown || task不为空 || workQueue是空
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                //工作线程达到上限也要返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //能执行到这里有几种情况:
                //1.线程池是running状态且没有达到最大线程数量
                //2.线程池是shutdown但是工作队列没有清空
                //如果ctl++成功,表明线程池多了一个线程,当然此时还没有创建
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        //创建worker,并添加到workers,然后启动worker线程
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

运行- runWoker

worker.thread.start之后,调用的是run 方法,最终调用的是runWorker方法,这个是当前线程的执行逻辑:

本质上就是一直循环拉取新的任务,能拉取到就执行,拉取不到了就表明任务全部执行完了,当前线程就可以释放了。

public void run() {
    runWorker(this);
}
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // 如果还没有开始执行任务,可以中断
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                task.run();
                w.completedTasks++;
                w.unlock();
            }
            completedAbruptly = false;
        } finally {
            //停止任务
            processWorkerExit(w, completedAbruptly);
        }
    }

删除线程-getTask&processWorkerExit

线程的释放是在获取不到任务之后,此时只有工作线程会进行回收,核心线程会阻塞在take方法

删除分为两步:其中ctl-1操作是在getTask进行的,worker的移除是在processWorkerExit处理的。

timed字段的逻辑表明任意线程都有可能被删,结合之前创造线程时的逻辑,其实线程池里并没有明确的标识区分线程A和线程B是核心还是非核心,创造和删除的时候都是这样,因此,删除的时候是完全随机的。

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 当没有任务的时候,ctl要减1,并返回null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            //在上一次从队列取任务已经超时的情况下,此时线程依然有存活或者队列刚空,这个线程都应该被释放
            //但是由于其他线程也可能释放,因此这里是cas,只有一个在当前循环会成功,失败的会从外层循环再次尝试
            //直到一直减到核心线程数量为止
            //这里的wc > maximumPoolSize不理解,线程数量怎么会超呢??
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            //工作线程在获取任务时会等待一定的时间(这是线程存活时间的体现)
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                   //当达到核心线程数量之下时会在这里阻塞
                     workQueue.take();
                if (r != null)
                    return r;
                //超时没有获取到,下次循环就可以不再试了
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
        //从workers中移除线程,这样线程就可以被垃圾回收了
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        //每个线程结束的时候,都要去尝试关闭线程池,万一你正好是最后一个线程呢
        tryTerminate();

        int c = ctl.get();
        //如果线程池还是存活,且任务还没有处理完
        //比如execute抛异常的情况,会在这里新建线程
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

多线程异常捕获问题

        ThreadPoolExecutor executor=new ThreadPoolExecutor(1,8,1, TimeUnit.MINUTES,new LinkedBlockingQueue<>(32),new NamedThreadFactory("transfer",false),new ThreadPoolExecutor.DiscardOldestPolicy());
        executor.execute(()->{
            System.out.println("execute");
            throw new RuntimeException("测试多线程抛异常");
        });
        executor.submit(()->{
            System.out.println("submit");
            throw new RuntimeException("测试多线程抛异常");
        });
        executor.shutdown();

通过上述代码测试,execute方法是可以打印异常堆栈的,但是submit方法提交的任务就没有打印,这是为啥呢?

在runworker方法中,是有尝试catch异常的,

在Thread中,异常是通过UncaughtExceptionHandler来进行处理的,这个类是通过jvm回调的,因此普通的execute提交的runnable,最终都会打印

try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                }

    private void dispatchUncaughtException(Throwable e) {
        getUncaughtExceptionHandler().uncaughtException(this, e);
    }
//假如完全没有自定义过,最终会执行到ThreadGroup的这个方法
    public void uncaughtException(Thread t, Throwable e) {
        if (parent != null) {
            parent.uncaughtException(t, e);
        } else {
            Thread.UncaughtExceptionHandler ueh =
                Thread.getDefaultUncaughtExceptionHandler();
            if (ueh != null) {
                ueh.uncaughtException(t, e);
            } else if (!(e instanceof ThreadDeath)) {
                //会在这里打印
                System.err.print("Exception in thread \""
                                 + t.getName() + "\" ");
                e.printStackTrace(System.err);
            }
        }
    }

但是submit不同,submit本质上是提交了一个FutureTask类型的任务,其run方法的逻辑如下

try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
    //放入outcome中,并设置task的state为EXCEPTIONAL
    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        //会调到repost
        return report(s);
    }
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        //抛出一个ExecutionException异常
        throw new ExecutionException((Throwable)x);
    }

可以看到,其在内部捕获了异常并没有向上抛出,因此,上层不会处理。只能通过get方法触发获取outcome里记录的异常

另外,由于submit方式提前捕获了异常,因此该线程并不会被删除,而execute方式则会将该线程删除,并尝试新建线程

参考:

面试官:线程池中多余的线程是如何回收的?-腾讯云开发者社区-腾讯云

Licensed under CC BY-NC-SA 4.0
最后更新于 2024-10-18