生产消费者模式
# 生产消费者模式
特点:不需要生产消费者一一对应,消息队列容量有限,满时不再加入数据,空时不再消费数据,阻塞队列基于此实现!消息不会立刻被消费,属于异步模式
案例代码:
消息队列
class Message{ private int id; private Object message; public Message(int id, Object message) { this.id = id; this.message = message; } public int getId() { return id; } public Object getMessage() { return message; } } @Slf4j class MessageQueue{ private LinkedList<Message> queue; private int capacity; public MessageQueue(int capacity) { this.capacity = capacity; queue = new LinkedList<>(); } public Message take(){ synchronized(queue) { while(queue.isEmpty()) { log.debug("没有消息了..."); try { queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Message message = queue.removeFirst(); log.debug("已消费消息{}",message); queue.notifyAll(); return message; } } public void put(Message message){ synchronized(queue){ while(queue.size() == capacity){ log.debug("队列已满..."); try { queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.addLast(message); log.debug("已生产消息{}",message); queue.notifyAll(); } } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54测试
public class testPC { public static void main(String[] args) { MessageQueue queue = new MessageQueue(2); for (int i = 0; i < 3; i++) { int id = i; new Thread(()->{ queue.put(new Message(id,"值"+id)); },"生产者"+id).start(); } new Thread(()->{ while(true){ try { sleep(1000); Message message = queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } },"消费者").start(); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22