创建线程池的正确方式

composed 2020年01月13日 97次浏览

线程池的定义

管理一组工作线程。通过线程池复用线程有以下几点优点:

  • 减少资源创建 => 减少内存开销,创建线程占用内存
  • 降低系统开销 => 创建线程需要时间,会延迟处理的请求
  • 提高稳定稳定性 => 避免无限创建线程引起的OutOfMemoryError【简称OOM】

Executors创建线程池的方式

根据返回的对象类型创建线程池可以分为三类:

  • 创建返回ThreadPoolExecutor对象
  • 创建返回ScheduleThreadPoolExecutor对象
  • 创建返回ForkJoinPool对象

本文只讨论创建返回ThreadPoolExecutor对象

ThreadPoolExecutor对象

在介绍Executors创建线程池方法前先介绍一下ThreadPoolExecutor,因为这些创建线程池的静态方法都是返回ThreadPoolExecutor对象,和我们手动创建ThreadPoolExecutor对象的区别就是我们不需要自己传构造函数的参数。ThreadPoolExecutor的构造函数共有四个,但最终调用的都是同一个:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

构造函数参数说明:

  • corePoolSize => 线程池里的线程数量,核心线程池大小

  • maximumPoolSize => 线程池最大数量

  • keepAliveTime => 当线程池线程数量大于corePoolSize时候,多出来的空闲线程,多长时间会被销毁。

  • unit => 时间单位

  • workQueue => 线程池所使用的缓冲队列

  • 我们可以选择如下几种:

    • ArrayBlockingQueue:基于数组结构的有界阻塞队列,FIFO。
    • LinkedBlockingQueue:基于链表结构的有界阻塞队列,FIFO。
    • SynchronousQueue:不存储元素的阻塞队列,每个插入操作都必须等待一个移出操作,反之亦然。
    • PriorityBlockingQueue:具有优先级别的阻塞队列。
  • threadFactory => 线程池创建线程使用的工厂

  • handler => 线程池对拒绝任务的处理策略 ,是指将任务添加到线程池中时,线程池拒绝该任务所采取的相应策略。

    什么时候拒绝?当向线程池中提交任务时,如果此时线程池中的线程已经饱和了,而且阻塞队列也已经满了,则线程池会选择一种拒绝策略来处理该任务,该任务会交给RejectedExecutionHandler 处理。

      线程池提供了四种拒绝策略:

    1. AbortPolicy:直接抛出异常,默认策略;
    2. CallerRunsPolicy:用调用者所在的线程来执行任务;
    3. DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    4. DiscardPolicy:直接丢弃任务;

线程池执行任务逻辑和线程池参数的关系

线程池

执行逻辑说明:

  • 判断核心线程数是否已满,核心线程数大小和corePoolSize参数有关,未满则创建线程执行任务
  • 若核心线程池已满,判断队列是否满,队列是否满和workQueue参数有关,若未满则加入队列中
  • 若队列已满,判断线程池是否已满,线程池是否已满和maximumPoolSize参数有关,若未满创建线程执行任务
  • 若线程池已满,则采用拒绝策略处理无法执执行的任务,拒绝策略和handler参数有关

Executors创建线程池的弊端

线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

Executor提供的四个静态方法创建线程池,但是阿里规约却并不建议使用它。

Executors各个方法的弊端:
1)newFixedThreadPool和newSingleThreadExecutor:
  主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool:
  主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。

第一种,newFixedThreadPool和newSingleThreadExecutor分别获得 FixedThreadPool 类型的线程池 和 SingleThreadExecutor 类型的线程池。 

 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
  }
 public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
 }

因为,创建了一个无界队列LinkedBlockingQueuesize,是一个最大值为Integer.MAX_VALUE的线程阻塞队列,当添加任务的速度大于线程池处理任务的速度,可能会在队列堆积大量的请求,消耗很大的内存,甚至导致OOM。

第二种,newCachedThreadPool 和 newScheduledThreadPool创建的分别是CachedThreadPool 类型和 ScheduledThreadPoolExecutorScheduledThreadPoolExecutor类型的线程池。

CachedThreadPool是一个会根据需要创建新线程的线程池 ,ScheduledThreadPoolExecutor可以用来在给定延时后执行异步任务或者周期性执行任务。

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
}

创建的线程池允许的最大线程数是Integer.MAX_VALUE,空闲线程存活时间为0,当添加任务的速度大于线程池处理任务的速度,可能会创建大量的线程,消耗资源,甚至导致OOM。

这两种都是有点极端的,稍微点进去看一下源码就能看出来。

阿里规约提倡手动创建线程池,而非Java内置的线程池,给出的正例如下:

正例1:

//org.apache.commons.lang3.concurrent.BasicThreadFactory
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
    new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());

正例2:

ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();

//Common Thread Pool
ExecutorService pool = new ThreadPoolExecutor(5, 200,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());

pool.execute(()-> System.out.println(Thread.currentThread().getName()));
pool.shutdown();//gracefully shutdown

正例3:

<bean id="userThreadPool"
    class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
  <property name="corePoolSize" value="10" />
  <property name="maxPoolSize" value="100" />
  <property name="queueCapacity" value="2000" />

  <property name="threadFactory" value= threadFactory />
  <property name="rejectedExecutionHandler">
    <ref local="rejectedExecutionHandler" />
  </property>
</bean>
//in code
userThreadPool.execute(thread);

文章转载自:

https://juejin.im/post/5dc41c165188257bad4d9e69

https://www.cnblogs.com/ibigboy/p/11298004.html