我理解的线程池

一:线程池是什么

本质上是一种池化技术,将创建好的线程放入线程池中,有任务时,直接提交给已经存在的线程,避免频繁创建线程和消亡线程对系统性能的损耗。从某种意义上来说,是空间换时间来提升性能的一种方式。

二:线程池的工作原理

核心参数

private volatile ThreadFactory threadFactory;

/**

  • Handler called when saturated or shutdown in execute.
    */
    private volatile RejectedExecutionHandler handler;

/**

  • Timeout in nanoseconds for idle threads waiting for work.
  • Threads use this timeout when there are more than corePoolSize
  • present or if allowCoreThreadTimeOut. Otherwise they wait
  • forever for new work.
    */
    private volatile long keepAliveTime;

/**

  • If false (default), core threads stay alive even when idle.
  • If true, core threads use keepAliveTime to time out waiting
  • for work.
    */
    private volatile boolean allowCoreThreadTimeOut;

/**

  • Core pool size is the minimum number of workers to keep alive
  • (and not allow to time out etc) unless allowCoreThreadTimeOut
  • is set, in which case the minimum is zero.
  • Since the worker count is actually stored in COUNT_BITS bits,
  • the effective limit is {@code corePoolSize & COUNT_MASK}.
    */
    private volatile int corePoolSize;

/**

  • Maximum pool size.
  • Since the worker count is actually stored in COUNT_BITS bits,
  • the effective limit is {@code maximumPoolSize & COUNT_MASK}.
    */
    private volatile int maximumPoolSize;

/**

  • The default rejected execution handler.
    */
    private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();
参数 描述
corePoolSize 核心线程数
maximumPoolSize 最大线程数
keepAliveTime 空闲线程存活时间
unit 时间单位
workQueue 工作队列
threadFactory 线程工厂
handler 拒绝策略

工作原理

一图胜千言,通过下面的图就能清晰的理解其工作原理。

有几点需要注意:

1、并不是线程池创建后,立马就创建核心线程,而是要等任务到来后,以懒汉式的方式进行创建

2、任务拒绝对应具体的拒绝策略,如果线程池不是运行状态,那么如果想让任务还是要被执行,那么可以使用当前线程去执行

3、当队列中的任务都执行完毕之后,非核心线程消亡,仅存活核心线程,优先消亡等待时间更长的线程

三:如何创建线程池

1、使用默认提供的线程池

FixThreadPool

public class Executors {
/**
 * Creates a thread pool that reuses a fixed number of threads
 * operating off a shared unbounded queue.  At any point, at most
 * {@code nThreads} threads will be active processing tasks.
 * If additional tasks are submitted when all threads are active,
 * they will wait in the queue until a thread is available.
 * If any thread terminates due to a failure during execution
 * prior to shutdown, a new one will take its place if needed to
 * execute subsequent tasks.  The threads in the pool will exist
 * until it is explicitly {@link ExecutorService#shutdown shutdown}.
 *
 * @param nThreads the number of threads in the pool
 * @return the newly created thread pool
 * @throws IllegalArgumentException if {@code nThreads <= 0}
 */
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<runnable>());
}

/**
 * Creates a thread pool that reuses a fixed number of threads
 * operating off a shared unbounded queue, using the provided
 * ThreadFactory to create new threads when needed.  At any point,
 * at most {@code nThreads} threads will be active processing
 * tasks.  If additional tasks are submitted when all threads are
 * active, they will wait in the queue until a thread is
 * available.  If any thread terminates due to a failure during
 * execution prior to shutdown, a new one will take its place if
 * needed to execute subsequent tasks.  The threads in the pool will
 * exist until it is explicitly {@link ExecutorService#shutdown
 * shutdown}.
 *
 * @param nThreads the number of threads in the pool
 * @param threadFactory the factory to use when creating new threads
 * @return the newly created thread pool
 * @throws NullPointerException if threadFactory is null
 * @throws IllegalArgumentException if {@code nThreads &lt;= 0}
 */
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory{
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<runnable>(),
                                  threadFactory);
}

}

通过上面的代码,可以发现:

1、核心线程数corePoolSize和最大线程数maximumPoolSize是相等的

2、工作队列workQueue采用的LinkedBlockingQueue这种无界队列,链表的长度设置为Integer.MAX_VALUE=2147483647

3、线程空闲存活时间keepAliveTime为0,因为没有空闲时间,全是核心线程,也就不存在线程消亡问题

SingleThreadPool

public class Executors {
/**
 * Creates an Executor that uses a single worker thread operating
 * off an unbounded queue. (Note however that if this single
 * thread terminates due to a failure during execution prior to
 * shutdown, a new one will take its place if needed to execute
 * subsequent tasks.)  Tasks are guaranteed to execute
 * sequentially, and no more than one task will be active at any
 * given time. Unlike the otherwise equivalent
 * {@code newFixedThreadPool(1)} the returned executor is
 * guaranteed not to be reconfigurable to use additional threads.
 *
 * @return the newly created single-threaded Executor
 */
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<runnable>()));
}

/**
 * Creates an Executor that uses a single worker thread operating
 * off an unbounded queue, and uses the provided ThreadFactory to
 * create a new thread when needed. Unlike the otherwise
 * equivalent {@code newFixedThreadPool(1, threadFactory)} the
 * returned executor is guaranteed not to be reconfigurable to use
 * additional threads.
 *
 * @param threadFactory the factory to use when creating new threads
 * @return the newly created single-threaded Executor
 * @throws NullPointerException if threadFactory is null
 */
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<runnable>(),
                                threadFactory));
}

}

通过上面的代码,可以发现:

1、是一种特殊的FixThreadPool

2、核心线程数corePoolSize和最大线程数maximumPoolSize是相等的,都为1

3、工作队列workQueue也采用的LinkedBlockingQueue这种无界队列

CachedThreadPool

/**
 * Creates a thread pool that creates new threads as needed, but
 * will reuse previously constructed threads when they are
 * available.  These pools will typically improve the performance
 * of programs that execute many short-lived asynchronous tasks.
 * Calls to {@code execute} will reuse previously constructed
 * threads if available. If no existing thread is available, a new
 * thread will be created and added to the pool. Threads that have
 * not been used for sixty seconds are terminated and removed from
 * the cache. Thus, a pool that remains idle for long enough will
 * not consume any resources. Note that pools with similar
 * properties but different details (for example, timeout parameters)
 * may be created using {@link ThreadPoolExecutor} constructors.
 *
 * @return the newly created thread pool
 */
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue());
}

/**

  • Creates a thread pool that creates new threads as needed, but
  • will reuse previously constructed threads when they are
  • available, and uses the provided
  • ThreadFactory to create new threads when needed.
  • @param threadFactory the factory to use when creating new threads
  • @return the newly created thread pool
  • @throws NullPointerException if threadFactory is null
    */
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue(),
    threadFactory);
    }

通过上面的代码,可以发现:

1、核心线程数corePoolSize为0,最大线程数maximumPoolSize为Integer.MAX_VALUE

2、工作队列workQueue采用的SynchronousQueue这种同步队列,支持公平或者非公平的从队列中取任务

ScedureThreadPool

/**
 * Creates a thread pool that can schedule commands to run after a
 * given delay, or to execute periodically.
 * @param corePoolSize the number of threads to keep in the pool,
 * even if they are idle
 * @return the newly created scheduled thread pool
 * @throws IllegalArgumentException if {@code corePoolSize < 0}
 */
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

/**

  • Creates a thread pool that can schedule commands to run after a
  • given delay, or to execute periodically.
  • @param corePoolSize the number of threads to keep in the pool,
  • even if they are idle
  • @param threadFactory the factory to use when the executor
  • creates a new thread
  • @return the newly created scheduled thread pool
  • @throws IllegalArgumentException if {@code corePoolSize < 0}
  • @throws NullPointerException if threadFactory is null
    */
    public static ScheduledExecutorService newScheduledThreadPool(
    int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

/**

  • Creates a new {@code ScheduledThreadPoolExecutor} with the
  • given core pool size.
  • @param corePoolSize the number of threads to keep in the pool, even
  •    if they are idle, unless {@code allowCoreThreadTimeOut} is set
    
  • @throws IllegalArgumentException if {@code corePoolSize < 0}
    */
    public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
    DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
    new DelayedWorkQueue());
    }

通过上面的代码,可以发现:

1、核心线程数corePoolSize可以自定义,最大线程数maximumPoolSize为Integer.MAX_VALUE

2、工作队列workQueue采用的DelayedWorkQueue这种延时队列

弊端

不知道通过上面几种系统自带的线程池,有没有发现一些共同的弊端。

FixThreadPool 和 SingleThreadPool的workQueue使用的是无界的LinkedBlockingQueue,任务队列最大长度为 Integer.MAX_VALUE,有导致系统OOM的风险

CachedThreadPool:使用的是同步队列SynchronousQueue,不持有任务,收到任务后就立马交给线程去执行。但是允许创建的非核心线程数量为Integer.MAX_VALUE,那么极端情况下就有两个弊端

1、线程频繁的创建和消亡,系统开销大

2、大量的线程被创建,有导致系统OOM的风险

ScedureThreadPool使用的无界的延迟阻塞队列DelayedWorkQueue,非核心线程数量为Integer.MAX_VALUE,有导致系统OOM的风险

另外赋一张阻塞队列的图,加深下了解。

2、自定义线程

基于系统自带的线程池有导致系统OOM的弊端,程序中建议自定义线程池。

规划

那么在自定义线程池的时候,有几点建议:

1、核心原则是使用有界队列,控制线程数量

2、线程数量也需要考虑业务是属于CPU密集型还是IO密集型。如果是IO密集型的话,可以稍微加大线程数,避免CPU长时间等待;如果CPU密集型的话,就默认设置成系统的核数就好。如果想调整到最佳状态,还需压测及观测性能

3、不要所有业务都使用同一个线程池,最好根据业务隔离,避免线程饥饿问题

4、给各自的线程池设置有意义的名字,方便区分

5、根据业务特性,选取合适的工作队列,如果业务需要快速响应,那么也可以酌情选用同步队列

6、根据业务特性,设置合适的拒绝策略(不处理、立即处理、延时处理、记日志......)

实施

使用ThreadPoolExcutor来自定义线程池。

ThreadPoolExecutor pool 
    // 核心线程数和最大线程,默认都设置成系统的核数
    = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() + 1,
       Runtime.getRuntime().availableProcessors() + 1,
       15,
       TimeUnit.SECONDS,
       // 使用了有界的ArrayBlockingQueue队列,同时设置非公平访问
       new ArrayBlockingQueue<>(512, false),
       // 线程工厂使用google guava的ThreadFactoryBuilder
       new ThreadFactoryBuilder().setNameFormat("业务1").setDaemon(true).build(),
       // 自定义拒绝策略
       new RejectedExecutionHandler() {
          @Override
          public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
             // 拒绝执行暂停30秒再尝试提交
             try {
                synchronized (lock) {
                   lock.wait(30000);
                }
             } catch (InterruptedException e) {
                e.printStackTrace();
             }
             pool.submit(r);
          }
       });

四:如何使用和监控线程池

// 将任务提交给线程池
pool.execute(new Runnable() {
    @Override
    public void run() {
       System.out.println("这是一个线程");
    }
});

使用的建议:

1、尽量避免将耗时任务提交到线程池

2、尽量使用全局的线程池,如果一定要在局部方法里面定义线程池,使用完毕后,记得要执行shutdown方法

如何去监控线程池

方法 描述 监控方面
getActiveCount() 获取正在工作的线程数 监控线程的变化
getPoolSize() 获取当前存在的线程数 监控线程的变化
getLargestPoolSize() 获取历史最大的线程数 监控线程的变化
getTaskCount() 获取计划执行的任务总数 监控任务的变化
getCompletedTaskCount() 获取已完成的任务数 监控任务的变化
getQueue() 获取任务队列 监控任务的变化

另外,也可以集成美团的Hippo4j。 https://hippo4j.cn/zh/


这是一个从 https://juejin.cn/post/7368759192255692863 下的原始话题分离的讨论话题