Java 线程池 ThreadPoolExecutor 原理

为什么要使用线程池

平时讨论多线程处理,大佬们必定会说使用线程池,那为什么要使用线程池?其实,这个问题可以反过来思考一下,不使用线程池会怎么样?当需要多线程并发执行任务时,只能不断的通过new Thread创建线程,每创建一个线程都需要在堆上分配内存空间,同时需要分配虚拟机栈、本地方法栈、程序计数器等线程私有的内存空间,当这个线程对象被可达性分析算法标记为不可用时被GC回收,这样频繁的创建和回收需要大量的额外开销。再者说,JVM的内存资源是有限的,如果系统中大量的创建线程对象,JVM很可能直接抛出OutOfMemoryError异常,还有大量的线程去竞争CPU会产生其他的性能开销,更多的线程反而会降低性能,所以必须要限制线程数。

既然不使用线程池有那么多问题,我们来看一下使用线程池有哪些好处:

  • 使用线程池可以复用池中的线程,不需要每次都创建新线程,减少创建和销毁线程的开销;
  • 同时,线程池具有队列缓冲策略、拒绝机制和动态管理线程个数,特定的线程池还具有定时执行、周期执行功能,比较重要的一点是线程池可实现线程环境的隔离,例如分别定义支付功能相关线程池和优惠券功能相关线程池,当其中一个运行有问题时不会影响另一个。


JDK默认的几种线程池

jdk提供了一个工具类( java . util . concurrent .Executors) 类构造几个类型的线程池

eclipse-javadoc:☂=debug_jdk8/E:\/java8\/jre\/lib\/endorsed\/rt_debug.jar

eclipse-javadoc:☂=debug_jdk8/E:\/java8\/jre\/lib\/endorsed\/rt_debug.jar

eclipse-javadoc:☂=debug_jdk8/E:\/java8\/jre\/lib\/endorsed\/rt_debug.jar

三个方法的源码

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

我们发现 都是 构造了 ThreadPoolExecutor 这个类,这个类就是我们真实的线程池实现类。


线程池几个参数大致意思

corePoolSize(int)

核心线程数量。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到任务队列当中。线程池将长期保证这些线程处于存活状态,即使线程已经处于闲置状态。除非配置了allowCoreThreadTimeOut=true,核心线程数的线程也将不再保证长期存活于线程池内,在空闲时间超过keepAliveTime后被销毁。

workQueue

阻塞队列,存放等待执行的任务,线程从workQueue中取任务,若无任务将阻塞等待。当线程池中线程数量达到corePoolSize后,就会把新任务放到该队列当中。JDK提供了四个可直接使用的队列实现,分别是:基于数组的有界队列ArrayBlockingQueue、基于链表的无界队列LinkedBlockingQueue、只有一个元素的同步队列SynchronousQueue、优先级队列PriorityBlockingQueue。在实际使用时一定要设置队列长度。

maximumPoolSize(int)

线程池内的最大线程数量,线程池内维护的线程不得超过该数量,大于核心线程数量小于最大线程数量的线程将在空闲时间超过keepAliveTime后被销毁。当阻塞队列存满后,将会创建新线程执行任务,线程的数量不会大于maximumPoolSize。

keepAliveTime(long)

线程存活时间,若线程数超过了corePoolSize,线程闲置时间超过了存活时间,该线程将被销毁。除非配置了allowCoreThreadTimeOut=true,核心线程数的线程也将不再保证长期存活于线程池内,在空闲时间超过keepAliveTime后被销毁。

TimeUnit unit

线程存活时间的单位,例如TimeUnit.SECONDS表示秒。

RejectedExecutionHandler

拒绝策略,当任务队列存满并且线程池个数达到maximunPoolSize后采取的策略。ThreadPoolExecutor中提供了四种拒绝策略,分别是:抛RejectedExecutionException异常的AbortPolicy(如果不指定的默认策略)、使用调用者所在线程来运行任务CallerRunsPolicy、丢弃一个等待执行的任务,然后尝试执行当前任务DiscardOldestPolicy、不动声色的丢弃并且不抛异常DiscardPolicy。项目中如果为了更多的用户体验,可以自定义拒绝策略。

threadFactory

创建线程的工厂,虽说JDK提供了线程工厂的默认实现DefaultThreadFactory,但还是建议自定义实现最好,这样可以自定义线程创建的过程,例如线程分组、自定义线程名称等。

ThreadPoolExecutor 源码精讲


ThreadPoolExecutor 成员变量

// 核心线程数
private volatile int corePoolSize;
// 最大线程数
private volatile int maximumPoolSize;
// 工作队列
private final BlockingQueue<Runnable> workQueue;
private volatile long keepAliveTime;
// 创建实际线程 的工厂类 (定制线程名称,父线程等参数)
private volatile ThreadFactory threadFactory;
// 线程池没有空闲线程,队列也满了,此时有任务提交过来,该任务如何执行的策略
private volatile RejectedExecutionHandler handler;
// 存储要执行的worker任务
private final HashSet<Worker> workers = new HashSet<Worker>();
// 线程池状态 和 工作线程数存储
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl

AtomicInteger 类型。高3位存储线程池状态,低29位存储当前线程数量。workerCountOf(c) 返回当前线程数量。runStateOf(c) 返回当前线程池状态。 线程池有如下状态:

  • RUNNING:接收新任务,处理队列任务。
  • SHUTDOWN:不接收新任务,但处理队列任务。
  • STOP:不接收新任务,也不处理队列任务,并且中断所有处理中的任务。
  • TIDYING:所有任务都被终结,有效线程为0。会触发terminated()方法。
  • TERMINATED:当terminated()方法执行结束。


ThreadPoolExecutor 的构造函数

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

构造函数很简单,就是把线程池的配置信息赋值,没有实际开线程预热。配置参数的意思见上面成员变量,详解会面会讲。


execute(Runnable command) 提交任务

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // 可以不理解这个,就是为了高效 取线程状态 和 工作线程数
        int c = ctl.get();
        // runStateOf获取ctl高三位,也就是线程池的状态。workerCountOf获取ctl低29位,也就是线程池中线程数
        // 当前正在运行的工作线程数 小于了 核心线程数
        if (workerCountOf(c) < corePoolSize) {
            // addWorker ,执行任务
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 大于等于了 核心线程数 put到任务队列里面
        if (isRunning(c) && workQueue.offer(command)) {
            // 放入队列成功
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                // 线程池 没在正常状态了,拒绝任务
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //队列满了,在进行一次addWorker ,执行任务
        else if (!addWorker(command, false))
            // addWorker 执行任务不成功(超过了最大线程数),走拒绝任务策略
            reject(command);
    }

提交三步曲

  • 当前运行中的线程数(工作线程数) < 核心线程数时,调用addWorker执行任务。
  • 当前运行中的线程数(工作线程数) >= 核心线程数时,暂时先放到一个阻塞队列里面,等待有空闲的线程了,就取这个阻塞队列去执行。
  • 如果上面阻塞队列存储满了,在调用一次addWorker去执行任务,如果addWorker不成功时,就会拒绝这个任务。

重点就落在了 addWorker上

     private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            // 拿到 线程池的状态
            int rs = runStateOf(c);
            // 如果线程池状态已经是SHUTDOWN
            if (rs >= SHUTDOWN &&
               // 任务为null,队列不为空,线程池状态也是SHUTDOWN
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                 // 直接返回false
                return false;

            for (;;) {
                // 当前正在运行的线程数(工作线程数)
                int wc = workerCountOf(c);
                if (wc >= 536870911 ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    //  工作线程数<corePoolSize时,core为true
                    // 工作线程数>=corePoolSize时,core为false , 大于最大线程数直接返回
                    return false;
                //  //使用 CAS 将线程数量加1
                if (compareAndIncrementWorkerCount(c))
                    break retry;
               //修改不成功说明线程数量有变化
               //重新判断线程池状态,有变化时跳到外层循环重新获取线程池状态
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // Worker 就是任务的包装,而且继承AQS,带有锁性质
            w = new Worker(firstTask);
           // ..... 为了一步一步搞清,先来看下 Worker 的数据结构
             
        } finally {
        }
        return workerStarted;
    }

Worker 数据结构

 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable{
     // 真正执行任务的线程
     final Thread thread;
     // 要执行的任务
     Runnable firstTask;
     Worker(Runnable firstTask) {
            setState(-1); //AQS 的 state设置为-1
            this.firstTask = firstTask;
            // new出线程,并没有start
            this.thread = getThreadFactory().newThread(this);
     }
}

然后接着上面addWorker方法

        try {
            w = new Worker(firstTask);
            //取得worker的线程
            final Thread t = w.thread;
            if (t != null) {
                // 这里要加锁,保证下面操作为线程安全
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // 获取当前线程池状态
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN || // 可以接受任务状态
                        (rs == SHUTDOWN && firstTask == null)) { //SHUTDOWN为可以处理阻塞队列里的任务
                        if (t.isAlive()) // 线程已经运行了
                            throw new IllegalThreadStateException();
                       // 将Worker加入到set集合
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            // 当前任务数 大于了 largestPoolSize 
                            largestPoolSize = s;
                       //添加成功的标识
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                   //添加成功,开启线程,就会执行Worker的run方法
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                 //没有走到 线程的start,则从 workers集合 中移除这个worker
                addWorkerFailed(w);
        }

首先构造了一个Worker 对象,然后加锁,添加到全局的set集合中。然后调用了Worker里面的Thread的start方法,启动线程,之后就会执行Worker的run方法。

Worker的run方法

public void run() {
        //this为 当前worker对象
        runWorker(this);
}   
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 如果task不为空直接进入while,否则从阻塞队列里面获取task任务
            while (task != null || (task = getTask()) != null) {
                w.lock(); // 只对当前task加锁,保证锁粒度很细 ,并发高
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    // 线程池状态停止,中断线程
                    wt.interrupt();
                try {
                   //空实现
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //调用 客户端真正的业务方法
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);  //空实现
                    }
                } finally {
                   // task置为空,然后下次while循环就从阻塞队列里面取
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //收尾工作, 移除workers集合,更新统计等
            processWorkerExit(w, completedAbruptly);
        }
    }

run方法是一个while循环,循环里面执行真正的客户端业务方法,然后调用getTask从阻塞队列里面取任务。

getTask

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            //当前运行的线程数
            int wc = workerCountOf(c);
            // allowCoreThreadTimeOut 默认为false
            //如果将 allowCoreThreadTimeOut 设置为 true, 核心线程数也会走timed 为true的逻辑,会销毁
            // 当前运行的线程数>核心线程数 ,需要开非核心的线程了
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
               //队列没数据了,且已经过了 keepAliveTime时间了,没有任务执行了,
                // 就返回退出for 销毁这个线程
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
              //workQueue.poll 队列没数据会阻塞等待 keepAliveTime时间,然后返回,返回之后就会销毁线程
                // workQueue.take() 会一直阻塞,直到队列中有数据就返回。
                 Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

首先getTask是在一个无限的for循环里面的, 判断 当前运行的线程数<=核心线程数 ,直接调用阻塞队列的take方法,无限阻塞下去,直到队列中有数据放入,唤醒退出for循环。

如果 当前运行的线程数> 核心线程数 ,如果队列没数据,会阻塞 keepAliveTime 时间才返回,继续for循环时,如果队列为空就退出for,销毁该阻塞在队列上的线程。


线程池流程总结

当任务提交时,如果当前正在运行的线程数 小于 核心线程数 时, 构造一个worker,并且调用worker的start开启一个线程。 线程的run方法为一个while循环。循环里面调用getTask从全局阻塞队列中取出数据,取出就立即执行,然后阻塞到getTask方法的队列上。

当任务提交时,如果 当前正在运行的线程数 大于等于 核心线程数 且 队列没有满,放入到全局阻塞队列里面,此时上面getTask 阻塞的线程将唤醒执行业务任务。

当任务提交时,如果 当前正在运行的线程数 大于等于 核心线程数 且 队列满了,会继续判断当前正在运行的线程数 的是否超过最大线程数,如果超过了,走拒绝任务策略,否则构造一个worker ,调用worker的start开启一个线程,执行第一步的while循环getTask的操作。

getTask操作

getTask时,如果当前运行线程数在核心线程数内,线程不紧张时,就会无限阻塞到队列上,直到队列有数据放入就返回执行,如果当前运行线程数大于了核心线程数,线程开的较多了。就会阻塞keepAliveTime时间,期间如果队列还没数据来,就会销毁这个worker的线程,让其回收。

KeepAliveTime的作用?

keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。该参数只在线程数大于 corePoolSize 时才有用, 超过这个时间的空闲线程将被终止;但是如果设置 allowCoreThreadTimeOut 为true,则核心线程数到了keepAliveTime时间没任务执行的话也会被销毁掉。 如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。

线程池的拒绝策略?

先看下线程池里面是怎么调用拒绝策略接口的

// 拒绝策略接口
public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
// 线程池全局变量: 拒绝策略接口
private volatile RejectedExecutionHandler handler;
// 线程池拒绝方法
final void reject(Runnable command) {
    //直接回调 拒绝策略接口
     handler.rejectedExecution(command, this);
}

线程池拒绝时,直接就回调RejectedExecutionHandler接口,下面看下JDK的几种拒绝策略

  • CallerRunsPolicy(调用者运行策略)
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
      
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

功能 :当触发拒绝策略时,只要线程池没有关闭,就由提交任务的当前线程处理。

使用场景 :一般在不允许失败的、对性能要求不高、并发量较小的场景下使用,因为线程池一般情况下不会关闭,也就是提交的任务一定会被运行,但是由于是调用者线程自己执行的,当多次提交任务时,就会阻塞后续任务执行,性能和效率自然就慢了。

2. AbortPolicy(中止策略)

    public static class AbortPolicy implements RejectedExecutionHandler {

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

功能: 当触发拒绝策略时,直接抛出拒绝执行的异常,中止策略的意思也就是打断当前执行流程。

使用场景: 这个就没有特殊的场景了,但是一点要正确处理抛出的异常。

3. DiscardOldestPolicy(弃老策略)

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
     
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
               // 把队列老的数据 弹出 丢弃
                e.getQueue().poll();
               // 执行这个新的提交的任务
                e.execute(r);
            }
        }
    }

功能: 如果线程池未关闭,就弹出队列头部的元素,然后尝试执行。

使用场景: 这个策略还是会丢弃任务,丢弃时也是毫无声息,但是特点是丢弃的是老的未执行的任务,而且是待执行优先级较高的任务。

4. DiscardPolicy(丢弃策略)

    public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

功能: 直接静悄悄的丢弃这个任务,不触发任何动作 。

使用场景: 如果你提交的任务无关紧要,你就可以使用它 。因为它就是个空实现,会悄无声息的吞噬你的的任务。所以这个策略基本上不用了。


线程池的几种阻塞队列

  • ArrayBlockingQueue :规定大小的BlockingQueue,其构造必须指定大小。其所含的对象是FIFO顺序排序的。
  • LinkedBlockingQueue :大小不固定的BlockingQueue,若其构造时指定大小,生成的BlockingQueue有大小限制,不指定大小,其大小有Integer.MAX_VALUE来决定。其所含的对象是FIFO顺序排序的。
  • PriorityBlockingQueue :类似于LinkedBlockingQueue,但是其所含对象的排序不是FIFO,而是依据对象的自然顺序或者构造函数的Comparator决定。
  • SynchronizedQueue :特殊的BlockingQueue,对其的操作必须是放和取交替完成。


线程池参数调优

线程池的线程数量设置过多会导致线程竞争激烈,如果线程数量设置过少的话,还会导致系统无法充分利用计算机资源。一般多线程执行的任务类型可以分为 CPU 密集型和 I/O 密集型,根据不同的任务类型,我们计算线程数的方法也不一样。

CPU 密集型

CPU密集型也叫计算密集型,指的是系统的硬盘、内存性能相对CPU要好很多,此时,系统运作大部分的状况是CPU Loading 100%,CPU要读/写I/O(硬盘/内存),I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU Loading很高。

CPU 密集型 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。

I/O 密集型

IO密集型指的是系统的CPU性能相对硬盘、内存要好很多,此时,系统运作,大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作,此时CPU Loading并不高。

I/O 密集型 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。


如何让线程池启动后就开启线程?

线程池里面提供了 prestartAllCoreThreads方法进行预热 核心线程

    public int prestartAllCoreThreads() {
        int n = 0;
        // addWorker会开启线程直到 核心线程数满了
        while (addWorker(null, true))
            ++n;
        return n;
    }

核心线程数会被回收吗?需要什么设置?

核心线程数默认是不会被回收的,如果需要回收核心线程数,需要调用 allowCoreThreadTimeOut(true) 这个方法

    public void allowCoreThreadTimeOut(boolean value) {
        if (value && keepAliveTime <= 0)
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
        if (value != allowCoreThreadTimeOut) {
            allowCoreThreadTimeOut = value;
            if (value)
                interruptIdleWorkers();
        }
    }

allowCoreThreadTimeOut 默认为false, 设置为true的话,getTask取队列任务的时候用的是poll+指定keepAliveTime 时间,keepAliveTime 时间到了后还没任务就回收线程。


动态修改线程池配置

  • 核心线程数的修改 setCorePoolSize
    public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0)
            throw new IllegalArgumentException();
        int delta = corePoolSize - this.corePoolSize;
        // 将核心线程数 修改
        this.corePoolSize = corePoolSize;
        // 当前运行的线程数 > 核心线程数
        if (workerCountOf(ctl.get()) > corePoolSize)
            // 大于了核心线程 ,打断正在阻塞到队列上的线程,也就是回收线程
            interruptIdleWorkers();
        else if (delta > 0) {
            // 是扩大核心线程
            int k = Math.min(delta, workQueue.size());// 扩大的线程数
            while (k-- > 0 && addWorker(null, true)) { // 进行预热扩大的核心线程
                if (workQueue.isEmpty()) // 队列是空的,代表没有任务执行,就不要要预热核心线程
                    break;
            }
        }
    }

2. 最大线程数的修改 setMaximumPoolSize

    public void setMaximumPoolSize(int maximumPoolSize) {
        if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
            throw new IllegalArgumentException();
        // 修改配置
        this.maximumPoolSize = maximumPoolSize;
        if (workerCountOf(ctl.get()) > maximumPoolSize)
           // 大于了最大线程数,打断正在阻塞到队列上的线程,也就是回收线程
            interruptIdleWorkers();
    }

3. 修改拒绝策略 setRejectedExecutionHandler

    public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
        if (handler == null)
            throw new NullPointerException();
           // 直接修改拒绝策略接口
        this.handler = handler;
    }

4. 修改非核心线程保活时间 setKeepAliveTime

    public void setKeepAliveTime(long time, TimeUnit unit) {
        if (time < 0)
            throw new IllegalArgumentException();
        if (time == 0 && allowsCoreThreadTimeOut())
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
        long keepAliveTime = unit.toNanos(time);
        long delta = keepAliveTime - this.keepAliveTime;
        this.keepAliveTime = keepAliveTime;
        if (delta < 0)
        // 时间修改的变小了 , 打断正在阻塞到队列上的线程,也就是回收线程
            interruptIdleWorkers();
    }


线程池监控

// 当前核心线程数
int s1 = threadPoolExecutor.getCorePoolSize();

// 当前最大线程数
int s2 = threadPoolExecutor.getMaximumPoolSize();

// 当前活动线程数
int s3 = threadPoolExecutor.getActiveCount();

// 历史完成的任务数
long s4 = threadPoolExecutor.getCompletedTaskCount();

// 获取队列的剩余 大小
int s5 = threadPoolExecutor.getQueue().remainingCapacity();

// 队列的 总大小
int s6 = threadPoolExecutor.getQueue().size();

线程池OOM

比如有这么一个程序

	public static void main(String[] args) throws IOException, InterruptedException {
		ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(1);
		for (int i = 0; i < 10000000; i++) {
			threadPoolExecutor.execute(() -> {
				try {
					Thread.sleep(500000);
				} catch (Exception e) {
					e.printStackTrace();
				}
			});
		}
	}

运行后,会报出

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
	at debug_jdk8.Main.main(Main.java:21)

利用VisualVM 打开 dump 堆

发现有很多 LinkedBlockingQueue的内部类Node 对象 , 这个对象就是 LinkedBlockingQueue队列的元素,说明我们线程池队列已经塞满了任务直到OOM了,打开线程池LinkedBlockingQueue的构造:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
 ////////////构造了一个 Integer.MAX_VALUE 长度的 LinkedBlockingQueue队列
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

问题就出在了 构造了 Integer.MAX_VALUE 长度的 LinkedBlockingQueue队列。

线上线程池遇到CountDownLatch卡死案例

罗政:CountDownLatch 遇到 线程池 卡死


强烈推荐一个 进阶 JAVA架构师 的博客

https://pic2.zhimg.com/v2-1e8deea0c94dab83067a8eca4007734d_ipico.jpg

支付宝打赏 微信打赏

如果文章对您有帮助,您可以鼓励一下作者