线程池
# 自定义阻塞队列
class BlockingQueue<T>{
private Deque<T> queue = new ArrayDeque<>();//任务队列
private ReentrantLock lock = new ReentrantLock();
private Condition fullWaitSet = lock.newCondition();
private Condition emptyWaitSet = lock.newCondition();
private int capacity;
public BlockingQueue(int capacity) {
this.capacity = capacity;
}
//阻塞获取
public T take(){
lock.lock();
try {
while(queue.isEmpty()){
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
//阻塞添加
public void put(T task){
lock.lock();
try {
while(queue.size() == capacity){
try {
log.debug("等待加入任务队列 {} ...", task);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
public int size(){
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
//带超时阻塞获取
public T poll(long timeout, TimeUnit unit){
lock.lock();
try {
long nanos = unit.toNanos(timeout);
while(queue.isEmpty()){
try {
//返回值是剩余时间
if(nanos<=0){
return null;
}
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
//带超时阻塞添加
public boolean offer(T task,long timeout,TimeUnit unit){
lock.lock();
try {
long nanos = unit.toNanos(timeout);
while(queue.size() == capacity){
try {
if(nanos <= 0){
return false;
}
log.debug("等待加入任务队列 {} ...", task);
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
//拒绝策略
public void tryPut(RejectPolicy<T> rejectedPolicy,T task){
lock.lock();
try {
if(queue.size() == capacity){
rejectedPolicy.reject(this,task);
}else{
log.debug("加入任务队列{}",task);
queue.addLast(task);
emptyWaitSet.signal();
}
} finally {
lock.unlock();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# 自定义线程池
class ThreadPool{
//阻塞队列
private BlockingQueue<Runnable> taskQueue;
//工作线程集合
private HashSet<Worker> workers = new HashSet<>();
private int coreSize;//核心线程
private long timeout;
private TimeUnit timeUnit;
private RejectPolicy<Runnable> rejectPolicy;
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapcity);
this.rejectPolicy = rejectPolicy;
}
//执行任务
public void execute(Runnable task){
//任务数未超过coreSize直接给worker对象执行,超过就加入任务队列
synchronized(workers){
if(workers.size()<coreSize){
Worker worker = new Worker(task);
log.debug("新增worker{},{}",worker,task);
workers.add(worker);
worker.start();
}else{
//拒绝策略1.死等2.超时等待3.放弃执行4.抛出异常5.调用者自己执行
taskQueue.tryPut(rejectPolicy,task);
}
}
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task){
this.task = task;
}
@Override
public void run() {
//task不为空,执行任务;执行完毕,再接着从任务队列获取任务执行
while(task!=null||(task=taskQueue.poll(timeout,timeUnit))!=null){
try {
log.debug("正在执行...{}",task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
synchronized(workers){
log.debug("worker 被移除{}",this);
workers.remove(this);
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
拒绝策略:
@FunctionalInterface
interface RejectPolicy<T>{
void reject(BlockingQueue<T> queue,T task);
}
2
3
4
测试:
@Slf4j
public class TestPool {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1,1000,TimeUnit.MILLISECONDS,1,(queue,task)->{
// queue.put(task);//死等
// task.run();//调用者自己执行
});
for (int i = 0; i < 4; i++) {
int j = i;
threadPool.execute(()->{
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("{}",j);
});
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 如何创建线程池?
通过
ThreadPoolExecutor
构造函数创建;UML类图:
Executor:只需提供Runnable对象,将任务的运行逻辑提交到执行器Executor中;
ExecutorService扩充:
- 可以为一个或多个异步任务生成Future方法;
- 管控线程池的方法,如停止线程池运行;
ThreadPoolExecutor提供几个public的setter方法:
通过Executor框架的工具类
Executors
来创建;
# 为什么需要线程池?
池化思想:
- 降低资源消耗,复用已创建的线程降低创建销毁的消耗;
- 提高响应速度,任务到达时,不需要等到线程创建就能立即执行;
- 提高资源统一管理;
# 线程池核心参数
/**
* 用给定的初始参数创建一个新的ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
corePoolSize:任务队列未超过容量,最大可同时运行的线程数量;
maximumPoolSize:任务队列超过容量,最大线程数;
workQueue:当前运行的线程数超过核心线程数,新任务就会存放队列中;
handler:拒绝策略
keepAliveTime:线程数超过核心线程数,如果超过keepAliveTime没有新任务提交,核心线程外的线程被回收;
# 拒绝策略
JDK提供4种已有策略:
AbortPolicy:丢弃任务,抛出 RejectedExecutionException 异常,默认
CallerRunsPolicy:由调用线程处理该任务;
DiscardPolicy:丢弃任务,不抛出异常;
DiscardOldestPolicy:丢弃队列中最早的任务,重新提交被拒绝任务;
- Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方便定位问题
- Netty 的实现,是创建一个新线程来执行任务
# 阻塞队列
# ArrayBlockingQueue
基于循环数组实现,无法扩容,入队出队使用相同的锁,并发度低;
# LinkedBlockingQueue
用链表实现,默认无界,入队出队使用两把独立的锁,并发度更高;
使用AtomicInteger类型变量的cnt,作为当前数据量;
# SynchronousQueue
没有缓冲,生产出来立即被消费
# PriorityBlockingQueue
优先级阻塞队列,按照元素大小排序,无界
# DelayQueue
延迟功能的阻塞队列,基于PriorityQueue实现,无界队列;
# Disruptor
无锁队列,底层采用CAS
缓存行填充,解决伪共享,在缓存行value前后都加入7个Long类型填充;
环形数组:预分配内存,在创建时就调用EventFactory将数组填满,数据也都是覆盖,避免GC
线程池内部通过一个工作队列去维护任务执行的,有一个根本性缺陷:连续争用。多个线程申请任务时,为了合理分配任务需要加锁,对比快速执行的任务来说,申请的损耗巨大;
Disruptor使用环形缓冲的数据结构,代替队列,避免申请任务时出现的连续争用的状况;
# Executors
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads,nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
1
2
3
4
5适用场景:任务量已知,相对耗时的任务
特点:
- 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
- 阻塞队列是无界的,可以放任意数量的任务
newCachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
1
2
3
4适用场景:整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线程。 适合任务数比较密集,但每个任务执行时间较短的情况
特点:
- 全是救急线程,且生存时间60s
- 队列采用了 SynchronousQueue,同步队列:想放任务就必须得取走一个
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
1
2
3
4
5
6适用场景:希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放
区别:
与自己创建一个线程:自己创建的没有救急线程,若任务失败则终止,而newSingleThreadExecutor能创建一个线程保证池正常工作
与newFixedThreadPool(1) :
newSingleThreadExecutor() 线程个数始终为1,不能修改,FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口;
Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改,对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改;
# 线程池提交任务
执行任务
void execute(Runnable command);
1带任务执行结果
<T> Future<T> submit(Callable<T> task);
1private static void func(ExecutorService pool) throws InterruptedException, ExecutionException { Future<String> future = pool.submit(() -> { log.debug("running"); Thread.sleep(1000); return "ok"; }); log.debug("{}", future.get()); }
1
2
3
4
5
6
7
8提交所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
1提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
1
# 线程池饥饿
固定大小线程池出现饥饿现象,案例代码:
public class TestDeadLock {
static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
static Random random = new Random();
static String cooking() {
return MENU.get(random.nextInt(MENU.size()));
}
public static void main(String[] args) {
//两个工人全去处理点餐,没人做菜
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(()->{
log.debug("点餐中...");
Future<String>f = executorService.submit(()->{
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜:{}",f.get());
} catch (InterruptedException |ExecutionException e){
e.printStackTrace();
}
});
executorService.execute(()->{
log.debug("点餐中...");
Future<String>f = executorService.submit(()->{
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜:{}",f.get());
} catch (InterruptedException |ExecutionException e){
e.printStackTrace();
}
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
解决:采用不同线程池
public static void main(String[] args) {
ExecutorService waiterPool = Executors.newFixedThreadPool(1);
ExecutorService cookPool = Executors.newFixedThreadPool(1);
waiterPool.execute(()->{
log.debug("点餐中...");
Future<String>f = cookPool.submit(()->{
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜:{}",f.get());
} catch (InterruptedException |ExecutionException e){
e.printStackTrace();
}
});
waiterPool.execute(()->{
log.debug("点餐中...");
Future<String>f = cookPool.submit(()->{
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜:{}",f.get());
} catch (InterruptedException |ExecutionException e){
e.printStackTrace();
}
});
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 关闭线程池
- shutdown:不接受新任务等待处理完;
- shutdownNow:调用Thread.interrupt进行中断;
Spring封装过的线程池ThreadPoolTaskExecutor,默认会优雅关闭,实现了DisposableBean接口: