Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
  • 线程

  • 共享模型

    • 管程
    • JMM
    • 乐观锁
    • 线程池
      • 自定义阻塞队列
      • 自定义线程池
      • 如何创建线程池?
      • 为什么需要线程池?
      • 线程池核心参数
      • 拒绝策略
      • 阻塞队列
        • ArrayBlockingQueue
        • LinkedBlockingQueue
        • SynchronousQueue
        • PriorityBlockingQueue
        • DelayQueue
        • Disruptor
      • Executors
      • 线程池提交任务
      • 线程池饥饿
      • 关闭线程池
    • J.U,C
    • 异步编程
    • 多线程编程
    • 动态线程池
  • 非共享模型

  • 并行

  • 多线程设计模式

  • JUC
  • 共享模型
Nreal
2023-11-01
目录

线程池

# 自定义阻塞队列

class BlockingQueue<T>{
    private Deque<T> queue = new ArrayDeque<>();//任务队列
    private ReentrantLock lock = new ReentrantLock();
    private Condition fullWaitSet = lock.newCondition();
    private Condition emptyWaitSet = lock.newCondition();
    private int capacity;
    public BlockingQueue(int capacity) {
        this.capacity = capacity;
    }
    //阻塞获取
    public T take(){
        lock.lock();
        try {
            while(queue.isEmpty()){
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }
    //阻塞添加
    public void put(T task){
        lock.lock();
        try {
            while(queue.size() == capacity){
                try {
                    log.debug("等待加入任务队列 {} ...", task);
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("加入任务队列 {}", task);
            queue.addLast(task);
            emptyWaitSet.signal();
        } finally {
            lock.unlock();
        }
    }
    public int size(){
        lock.lock();
        try {
            return queue.size();
        } finally {
            lock.unlock();
        }
    }
    //带超时阻塞获取
    public T poll(long timeout, TimeUnit unit){
        lock.lock();
        try {
            long nanos = unit.toNanos(timeout);
            while(queue.isEmpty()){
                try {
                    //返回值是剩余时间
                    if(nanos<=0){
                        return null;
                    }
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }
    //带超时阻塞添加
    public boolean offer(T task,long timeout,TimeUnit unit){
        lock.lock();
        try {
            long nanos = unit.toNanos(timeout);
            while(queue.size() == capacity){
                try {
                    if(nanos <= 0){
                        return false;
                    }
                    log.debug("等待加入任务队列 {} ...", task);
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("加入任务队列 {}", task);
            queue.addLast(task);
            emptyWaitSet.signal();
            return true;
        } finally {
            lock.unlock();
        }
    }
    //拒绝策略
    public void tryPut(RejectPolicy<T> rejectedPolicy,T task){
        lock.lock();
        try {
            if(queue.size() == capacity){
                rejectedPolicy.reject(this,task);
            }else{
                log.debug("加入任务队列{}",task);
                queue.addLast(task);
                emptyWaitSet.signal();
            }
        } finally {
            lock.unlock();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117

# 自定义线程池

class ThreadPool{
    //阻塞队列
    private BlockingQueue<Runnable> taskQueue;
    //工作线程集合
    private HashSet<Worker> workers = new HashSet<>();
    private int coreSize;//核心线程
    private long timeout;
    private TimeUnit timeUnit;
    private RejectPolicy<Runnable> rejectPolicy;

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(queueCapcity);
        this.rejectPolicy = rejectPolicy;
    }

    //执行任务
    public void execute(Runnable task){
        //任务数未超过coreSize直接给worker对象执行,超过就加入任务队列
        synchronized(workers){
            if(workers.size()<coreSize){
                Worker worker = new Worker(task);
                log.debug("新增worker{},{}",worker,task);
                workers.add(worker);
                worker.start();
            }else{
                //拒绝策略1.死等2.超时等待3.放弃执行4.抛出异常5.调用者自己执行
                taskQueue.tryPut(rejectPolicy,task);
            }
        }
    }
    class Worker extends Thread{
        private Runnable task;
        public Worker(Runnable task){
            this.task = task;
        }

        @Override
        public void run() {
            //task不为空,执行任务;执行完毕,再接着从任务队列获取任务执行
            while(task!=null||(task=taskQueue.poll(timeout,timeUnit))!=null){
                try {
                    log.debug("正在执行...{}",task);
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    task = null;
                }
            }
            synchronized(workers){
                log.debug("worker 被移除{}",this);
                workers.remove(this);
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

拒绝策略:

@FunctionalInterface
interface RejectPolicy<T>{
    void reject(BlockingQueue<T> queue,T task);
}
1
2
3
4

测试:

@Slf4j
public class TestPool {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(1,1000,TimeUnit.MILLISECONDS,1,(queue,task)->{
//            queue.put(task);//死等
//            task.run();//调用者自己执行
        });
        for (int i = 0; i < 4; i++) {
            int j = i;
            threadPool.execute(()->{
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("{}",j);
            });
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 如何创建线程池?

  1. 通过ThreadPoolExecutor构造函数创建;

    UML类图:

    Executor:只需提供Runnable对象,将任务的运行逻辑提交到执行器Executor中;

    ExecutorService扩充:

    1. 可以为一个或多个异步任务生成Future方法;
    2. 管控线程池的方法,如停止线程池运行;

    ThreadPoolExecutor提供几个public的setter方法:

  2. 通过Executor框架的工具类Executors来创建;

# 为什么需要线程池?

池化思想:

  1. 降低资源消耗,复用已创建的线程降低创建销毁的消耗;
  2. 提高响应速度,任务到达时,不需要等到线程创建就能立即执行;
  3. 提高资源统一管理;

# 线程池核心参数

/**
 * 用给定的初始参数创建一个新的ThreadPoolExecutor。
 */
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
                          int maximumPoolSize,//线程池的最大线程数
                          long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
                          TimeUnit unit,//时间单位
                          BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
                          ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
                          RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
                           ) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
  • corePoolSize:任务队列未超过容量,最大可同时运行的线程数量;

  • maximumPoolSize:任务队列超过容量,最大线程数;

  • workQueue:当前运行的线程数超过核心线程数,新任务就会存放队列中;

  • handler:拒绝策略

  • keepAliveTime:线程数超过核心线程数,如果超过keepAliveTime没有新任务提交,核心线程外的线程被回收;

# 拒绝策略

JDK提供4种已有策略:

  • AbortPolicy:丢弃任务,抛出 RejectedExecutionException 异常,默认

  • CallerRunsPolicy:由调用线程处理该任务;

  • DiscardPolicy:丢弃任务,不抛出异常;

  • DiscardOldestPolicy:丢弃队列中最早的任务,重新提交被拒绝任务;

  • Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方便定位问题
  • Netty 的实现,是创建一个新线程来执行任务

# 阻塞队列

# ArrayBlockingQueue

基于循环数组实现,无法扩容,入队出队使用相同的锁,并发度低;

# LinkedBlockingQueue

用链表实现,默认无界,入队出队使用两把独立的锁,并发度更高;

使用AtomicInteger类型变量的cnt,作为当前数据量;

# SynchronousQueue

没有缓冲,生产出来立即被消费

# PriorityBlockingQueue

优先级阻塞队列,按照元素大小排序,无界

# DelayQueue

延迟功能的阻塞队列,基于PriorityQueue实现,无界队列;

# Disruptor

  1. 无锁队列,底层采用CAS

  2. 缓存行填充,解决伪共享,在缓存行value前后都加入7个Long类型填充;

  3. 环形数组:预分配内存,在创建时就调用EventFactory将数组填满,数据也都是覆盖,避免GC

    线程池内部通过一个工作队列去维护任务执行的,有一个根本性缺陷:连续争用。多个线程申请任务时,为了合理分配任务需要加锁,对比快速执行的任务来说,申请的损耗巨大;

    Disruptor使用环形缓冲的数据结构,代替队列,避免申请任务时出现的连续争用的状况;

# Executors

  1. newFixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads) {
         return new ThreadPoolExecutor(nThreads,nThreads,
         0L, TimeUnit.MILLISECONDS,
         new LinkedBlockingQueue<Runnable>());
    }
    
    1
    2
    3
    4
    5

    适用场景:任务量已知,相对耗时的任务

    特点:

    • 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
    • 阻塞队列是无界的,可以放任意数量的任务
  2. newCachedThreadPool

    public static ExecutorService newCachedThreadPool() {
         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,
         new SynchronousQueue<Runnable>());
    }
    
    1
    2
    3
    4

    适用场景:整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线程。 适合任务数比较密集,但每个任务执行时间较短的情况

    特点:

    • 全是救急线程,且生存时间60s
    • 队列采用了 SynchronousQueue,同步队列:想放任务就必须得取走一个
  3. newSingleThreadExecutor

    public static ExecutorService newSingleThreadExecutor() {
         return new FinalizableDelegatedExecutorService
         (new ThreadPoolExecutor(1, 1,
         0L, TimeUnit.MILLISECONDS,
         new LinkedBlockingQueue<Runnable>()));
    }
    
    1
    2
    3
    4
    5
    6

    适用场景:希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放

    区别:

    • 与自己创建一个线程:自己创建的没有救急线程,若任务失败则终止,而newSingleThreadExecutor能创建一个线程保证池正常工作

    • 与newFixedThreadPool(1) :

      1. newSingleThreadExecutor() 线程个数始终为1,不能修改,FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口;

      2. Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改,对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改;

# 线程池提交任务

  • 执行任务

    void execute(Runnable command);
    
    1
  • 带任务执行结果

    <T> Future<T> submit(Callable<T> task);
    
    1
    private static void func(ExecutorService pool) throws InterruptedException, ExecutionException {
        Future<String> future = pool.submit(() -> {
            log.debug("running");
            Thread.sleep(1000);
            return "ok";
        });
        log.debug("{}", future.get());
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
  • 提交所有任务

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
    
    1
  • 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消

    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
    
    1

# 线程池饥饿

固定大小线程池出现饥饿现象,案例代码:

public class TestDeadLock {
    static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
    static Random random = new Random();
    static String cooking() {
        return MENU.get(random.nextInt(MENU.size()));
    }

    public static void main(String[] args) {
        //两个工人全去处理点餐,没人做菜
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.execute(()->{
            log.debug("点餐中...");
            Future<String>f = executorService.submit(()->{
                log.debug("做菜");
                return cooking();
            });
            try {
                log.debug("上菜:{}",f.get());
            } catch (InterruptedException |ExecutionException e){
                e.printStackTrace();
            }
        });
        executorService.execute(()->{
            log.debug("点餐中...");
            Future<String>f = executorService.submit(()->{
                log.debug("做菜");
                return cooking();
            });
            try {
                log.debug("上菜:{}",f.get());
            } catch (InterruptedException |ExecutionException e){
                e.printStackTrace();
            }
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

解决:采用不同线程池

public static void main(String[] args) {
    ExecutorService waiterPool = Executors.newFixedThreadPool(1);
    ExecutorService cookPool = Executors.newFixedThreadPool(1);
    waiterPool.execute(()->{
        log.debug("点餐中...");
        Future<String>f = cookPool.submit(()->{
            log.debug("做菜");
            return cooking();
        });
        try {
            log.debug("上菜:{}",f.get());
        } catch (InterruptedException |ExecutionException e){
            e.printStackTrace();
        }
    });
    waiterPool.execute(()->{
        log.debug("点餐中...");
        Future<String>f = cookPool.submit(()->{
            log.debug("做菜");
            return cooking();
        });
        try {
            log.debug("上菜:{}",f.get());
        } catch (InterruptedException |ExecutionException e){
            e.printStackTrace();
        }
    });
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# 关闭线程池

  • shutdown:不接受新任务等待处理完;
  • shutdownNow:调用Thread.interrupt进行中断;

Spring封装过的线程池ThreadPoolTaskExecutor,默认会优雅关闭,实现了DisposableBean接口:

乐观锁
J.U,C

← 乐观锁 J.U,C→

Theme by Vdoing | Copyright © 2021-2024
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式