线程池的定义
管理一组工作线程。通过线程池复用线程有以下几点优点:
- 减少资源创建 => 减少内存开销,创建线程占用内存
- 降低系统开销 => 创建线程需要时间,会延迟处理的请求
- 提高稳定稳定性 => 避免无限创建线程引起的
OutOfMemoryError
【简称OOM】
Executors创建线程池的方式
根据返回的对象类型创建线程池可以分为三类:
- 创建返回ThreadPoolExecutor对象
- 创建返回ScheduleThreadPoolExecutor对象
- 创建返回ForkJoinPool对象
本文只讨论创建返回ThreadPoolExecutor
对象
ThreadPoolExecutor对象
在介绍Executors
创建线程池方法前先介绍一下ThreadPoolExecutor
,因为这些创建线程池的静态方法都是返回ThreadPoolExecutor
对象,和我们手动创建ThreadPoolExecutor
对象的区别就是我们不需要自己传构造函数的参数。ThreadPoolExecutor
的构造函数共有四个,但最终调用的都是同一个:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
构造函数参数说明:
-
corePoolSize => 线程池里的线程数量,核心线程池大小
-
maximumPoolSize => 线程池最大数量
-
keepAliveTime => 当线程池线程数量大于corePoolSize时候,多出来的空闲线程,多长时间会被销毁。
-
unit => 时间单位
-
workQueue => 线程池所使用的缓冲队列
-
我们可以选择如下几种:
- ArrayBlockingQueue:基于数组结构的有界阻塞队列,FIFO。
- LinkedBlockingQueue:基于链表结构的有界阻塞队列,FIFO。
- SynchronousQueue:不存储元素的阻塞队列,每个插入操作都必须等待一个移出操作,反之亦然。
- PriorityBlockingQueue:具有优先级别的阻塞队列。
-
threadFactory => 线程池创建线程使用的工厂
-
handler => 线程池对拒绝任务的处理策略 ,是指将任务添加到线程池中时,线程池拒绝该任务所采取的相应策略。
什么时候拒绝?当向线程池中提交任务时,如果此时线程池中的线程已经饱和了,而且阻塞队列也已经满了,则线程池会选择一种拒绝策略来处理该任务,该任务会交给RejectedExecutionHandler 处理。
线程池提供了四种拒绝策略:
- AbortPolicy:直接抛出异常,默认策略;
- CallerRunsPolicy:用调用者所在的线程来执行任务;
- DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
- DiscardPolicy:直接丢弃任务;
线程池执行任务逻辑和线程池参数的关系
执行逻辑说明:
- 判断核心线程数是否已满,核心线程数大小和
corePoolSize
参数有关,未满则创建线程执行任务 - 若核心线程池已满,判断队列是否满,队列是否满和
workQueue
参数有关,若未满则加入队列中 - 若队列已满,判断线程池是否已满,线程池是否已满和
maximumPoolSize
参数有关,若未满创建线程执行任务 - 若线程池已满,则采用拒绝策略处理无法执执行的任务,拒绝策略和
handler
参数有关
Executors创建线程池的弊端
线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
Executor提供的四个静态方法创建线程池,但是阿里规约却并不建议使用它。
Executors各个方法的弊端:
1)newFixedThreadPool和newSingleThreadExecutor:
主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool:
主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。
第一种,newFixedThreadPool和newSingleThreadExecutor分别获得 FixedThreadPool 类型的线程池 和 SingleThreadExecutor 类型的线程池。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
因为,创建了一个无界队列LinkedBlockingQueuesize,是一个最大值为Integer.MAX_VALUE的线程阻塞队列,当添加任务的速度大于线程池处理任务的速度,可能会在队列堆积大量的请求,消耗很大的内存,甚至导致OOM。
第二种,newCachedThreadPool 和 newScheduledThreadPool创建的分别是CachedThreadPool 类型和 ScheduledThreadPoolExecutorScheduledThreadPoolExecutor类型的线程池。
CachedThreadPool是一个会根据需要创建新线程的线程池 ,ScheduledThreadPoolExecutor可以用来在给定延时后执行异步任务或者周期性执行任务。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
创建的线程池允许的最大线程数是Integer.MAX_VALUE,空闲线程存活时间为0,当添加任务的速度大于线程池处理任务的速度,可能会创建大量的线程,消耗资源,甚至导致OOM。
这两种都是有点极端的,稍微点进去看一下源码就能看出来。
阿里规约提倡手动创建线程池,而非Java内置的线程池,给出的正例如下:
正例1:
//org.apache.commons.lang3.concurrent.BasicThreadFactory
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
正例2:
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
//Common Thread Pool
ExecutorService pool = new ThreadPoolExecutor(5, 200,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
pool.execute(()-> System.out.println(Thread.currentThread().getName()));
pool.shutdown();//gracefully shutdown
正例3:
<bean id="userThreadPool"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10" />
<property name="maxPoolSize" value="100" />
<property name="queueCapacity" value="2000" />
<property name="threadFactory" value= threadFactory />
<property name="rejectedExecutionHandler">
<ref local="rejectedExecutionHandler" />
</property>
</bean>
//in code
userThreadPool.execute(thread);
文章转载自: