Disruptor缓冲区优化
# Disruptor为什么性能高?
无锁CAS,提高并发性;
缓存行填充避免缓存伪共享;
环形缓冲区,每隔槽之间填充padding变量,使得相邻的槽不会共享同一缓存行,这样一个线程修改一个槽时,不会影响到其它槽,减少缓存同步的开销;
环形缓冲区作为数据存储结构,生产者和消费者可以在缓冲区上独立进行读写操作,减少线程之间竞争;
# 等待策略
- BlockingWaitStrategy(阻塞等待策略): BlockingWaitStrategy 是最基本的等待策略,它使用 Object.wait() 和 Object.notifyAll() 方法来进行线程间的通信。 当消费者等待事件时,会释放 CPU 资源,降低了消费者线程的活跃度,适合于线程数较少的场景。 SleepingWaitStrategy(自旋等待策略);
- SleepingWaitStrategy 在消费者等待事件时使用自旋的方式,避免了阻塞,但在一定时间内如果没有获取到事件,会进入睡眠状态。 适用于对低延迟要求较高的场景,但可能会占用一定的 CPU 资源。 YieldingWaitStrategy(礼让等待策略);
- YieldingWaitStrategy 在消费者等待事件时会尝试进行自旋,如果自旋一定次数后仍未获取到事件,则会进行线程礼让(Yield)。 适用于对低延迟要求高的场景,但可能占用较多的 CPU 资源。 BusySpinWaitStrategy(忙等待策略);
- BusySpinWaitStrategy 是一种非常简单的等待策略,它会一直自旋等待事件的到来,不进行任何的线程礼让或睡眠。 适用于对延迟极为敏感的场景,但可能会占用大量的 CPU 资源。 PhasedBackoffWaitStrategy(分阶段退避等待策略);
- PhasedBackoffWaitStrategy 是一种自适应的等待策略,会根据不同的等待阶段选择不同的等待方式,例如自旋、睡眠等。 可以在不同的场景中平衡延迟和 CPU 资源占用;
# 缓冲区优化
# 事件监听器
public interface EventListener<E> {
void onEvent(E event);
/*k2:执行顺序*/
void onException(Throwable ex,long sequence,E event);
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 并发队列接口
public interface ParallelQueue<E> {
void add(E event);
void add(E... events);
boolean tryAdd(E event);
boolean tryAdd(E... events);
void start();
void shutDown();
boolean isShutDown();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 并发队列处理器
public class ParallelQueueHandler<E> implements ParallelQueue<E> {
private RingBuffer<Holder> ringBuffer;
private EventListener<E> eventListener;
private WorkerPool<Holder> workerPool;
private ExecutorService executorService;
/**
* Disruptor 框架中的一个接口,用于在事件发布(publish)时将数据填充到事件对象中
*/
private EventTranslatorOneArg<Holder,E> eventTranslator;
public ParallelQueueHandler(Builder<E> builder){
this.executorService = Executors.newFixedThreadPool(builder.threads,new ThreadFactoryBuilder().setNameFormat("ParallelQueueHandler"+builder.namePrefix+"-pool-%d").build());
this.eventListener = builder.listener;
this.eventTranslator = new HolderEventTranslator();
// 创建RingBuffer
RingBuffer<Holder> ringBuffer = RingBuffer.create(builder.producerType,new HolderEventFactory(),builder.bufferSize,builder.waitStrategy);
// 通过RingBuffer 创建屏障
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
// 创建多个消费者组
WorkHandler<Holder>[] workHandlers = new WorkHandler[builder.threads];
for (int i = 0; i< workHandlers.length;i++){
workHandlers[i] = new HolderWorkHandler();
}
//创建多个消费者线程池
WorkerPool<Holder> workerPool = new WorkerPool<>(ringBuffer,sequenceBarrier,new HolderExceptionHandler(),workHandlers);
// 设置多消费者的Sequence序号,统计消费进度
ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
this.workerPool = workerPool;
}
@Override
public void add(E event) {
final RingBuffer<Holder> holderRing = ringBuffer;
if(holderRing == null){
process(this.eventListener,new IllegalStateException("ParallelQueueHandler is close"),event);
}
try {
ringBuffer.publishEvent(this.eventTranslator,event);
}catch (NullPointerException e){
process(this.eventListener,new IllegalStateException("ParallelQueueHandler is close"),event);
}
}
@Override
public void add(E... events) {
final RingBuffer<Holder> holderRing = ringBuffer;
if(holderRing == null){
process(this.eventListener,new IllegalStateException("ParallelQueueHandler is close"),events);
}
try {
ringBuffer.publishEvents(this.eventTranslator,events);
}catch (NullPointerException e){
process(this.eventListener,new IllegalStateException("ParallelQueueHandler is close"),events);
}
}
@Override
public boolean tryAdd(E event) {
final RingBuffer<Holder> holderRing = ringBuffer;
if(holderRing == null){
return false;
}
try {
return ringBuffer.tryPublishEvent(this.eventTranslator,event);
}catch (NullPointerException e){
return false;
}
}
@Override
public boolean tryAdd(E... events) {
final RingBuffer<Holder> holderRing = ringBuffer;
if(holderRing == null){
return false;
}
try {
return ringBuffer.tryPublishEvents(this.eventTranslator,events);
}catch (NullPointerException e){
return false;
}
}
@Override
public void start() {
this.ringBuffer = workerPool.start(executorService);
}
@Override
public void shutDown() {
RingBuffer<Holder> holder = ringBuffer;
ringBuffer = null;
if(holder ==null){
return;
}
if(workerPool != null){
workerPool.drainAndHalt();
}
if(executorService != null){
executorService.shutdown();
}
}
@Override
public boolean isShutDown() {
return ringBuffer == null;
}
/*缓存*/
public class Holder{
private E event;
public void setValue(E event){
this.event = event;
}
@Override
public String toString() {
return "Holder{" +
"event=" + event +
'}';
}
}
public static class Builder<E> {
private ProducerType producerType = ProducerType.MULTI;
private int bufferSize = 1024 * 16;
private int threads = 1;
private String namePrefix = "";
private WaitStrategy waitStrategy = new BlockingWaitStrategy();
private EventListener<E> listener;
public Builder<E> setProducerType(ProducerType producerType){
Preconditions.checkNotNull(producerType);
this.producerType = producerType;
return this;
}
public Builder<E> setBufferSize(int bufferSize){
Preconditions.checkArgument(Integer.bitCount(bufferSize) == 1);
this.bufferSize = bufferSize;
return this;
}
public Builder<E> setThreads(int threads){
Preconditions.checkArgument(threads > 0);
this.threads = threads;
return this;
}
public Builder<E> setNamePrefix(String namePrefix){
Preconditions.checkNotNull(namePrefix);
this.namePrefix = namePrefix;
return this;
}
public Builder<E> setWaitStrategy(WaitStrategy waitStrategy){
Preconditions.checkNotNull(waitStrategy);
this.waitStrategy = waitStrategy;
return this;
}
public Builder<E> setListener(EventListener<E> listener){
Preconditions.checkNotNull(listener);
this.listener = listener;
return this;
}
public ParallelQueueHandler<E> build(){
return new ParallelQueueHandler<>(this);
}
}
private class HolderEventTranslator implements EventTranslatorOneArg<Holder,E>{
@Override
public void translateTo(Holder holder, long l, E e) {
holder.setValue(e);
}
}
private class HolderEventFactory implements EventFactory<Holder>{
@Override
public Holder newInstance() {
return new Holder();
}
}
private class HolderWorkHandler implements WorkHandler<Holder>{
@Override
public void onEvent(Holder holder) throws Exception {
eventListener.onEvent(holder.event);
holder.setValue(null);
}
}
private class HolderExceptionHandler implements ExceptionHandler<Holder>{
@Override
public void handleEventException(Throwable throwable, long l, Holder event) {
Holder holder = (Holder) event;
try {
eventListener.onException(throwable,l,holder.event);
} catch (Exception e) {
// ?
} finally {
holder.setValue(null);
}
}
@Override
public void handleOnStartException(Throwable throwable) {
throw new UnsupportedOperationException(throwable);
}
@Override
public void handleOnShutdownException(Throwable throwable) {
throw new UnsupportedOperationException(throwable);
}
}
private static <E> void process(EventListener<E> listener,Throwable e,E event){
listener.onException(e,-1,event);
}
private static <E> void process(EventListener<E> listener,Throwable e,E... events){
for(E event: events){
process(listener,e,event);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# NettyCoreProcessor
原始的NettyCoreProcessor,直接处理每个 HTTP 请求,而 DisruptorNettyCoreProcessor 使用了 Disruptor 框架,将 HTTP 请求异步地添加到一个处理队列中,然后由 BatchEventListenerProcessor 来处理这个队列中的事件;
Disruptor是一个异步事件处理框架,因为网络请求通常是 I/O 密集型的操作,通过异步处理可以提高系统的吞吐量;
@Slf4j
public class DisruptorNettyCoreProcessor implements NettyProcessor {
private static final String THREAD_NAME_PREFIX = "gateway-queue-";
private Config config;
private NettyCoreProcessor nettyCoreProcessor;
private ParallelQueueHandler<HttpRequestWrapper> parallelQueueHandler;
public DisruptorNettyCoreProcessor(Config config, NettyCoreProcessor nettyCoreProcessor) {
this.config = config;
this.nettyCoreProcessor = nettyCoreProcessor;
ParallelQueueHandler.Builder<HttpRequestWrapper> builder = new ParallelQueueHandler.Builder<HttpRequestWrapper>()
.setBufferSize(config.getBufferSize())
.setThreads(config.getProcessThread())
.setProducerType(ProducerType.MULTI)
.setNamePrefix(THREAD_NAME_PREFIX)
.setWaitStrategy(config.getWaitStrategy());
BatchEventListenerProcessor batchEventListenerProcessor = new BatchEventListenerProcessor();
builder.setListener(batchEventListenerProcessor);
this.parallelQueueHandler = builder.build();
}
@Override
public void process(HttpRequestWrapper wrapper) {
this.parallelQueueHandler.add(wrapper);
}
@Override
public void start() {
parallelQueueHandler.start();
}
@Override
public void shutDown() {
parallelQueueHandler.shutDown();
}
public class BatchEventListenerProcessor implements EventListener<HttpRequestWrapper>{
@Override
public void onEvent(HttpRequestWrapper event) {
nettyCoreProcessor.process(event);
}
@Override
public void onException(Throwable ex, long sequence, HttpRequestWrapper event) {
FullHttpRequest request = event.getRequest();
ChannelHandlerContext ctx = event.getCtx();
try {
log.error("BatchEventListenerProcessor onException请求写回失败,request:{},errMsg:{} ",request,ex.getMessage(),ex);
//构建响应对象
FullHttpResponse fullHttpResponse = ResponseHelper.getHttpResponse(ResponseCode.INTERNAL_ERROR);
if(!HttpUtil.isKeepAlive(request)){
ctx.writeAndFlush(fullHttpResponse).addListener(ChannelFutureListener.CLOSE);
}else{
fullHttpResponse.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
ctx.writeAndFlush(fullHttpResponse);
}
}catch (Exception e){
log.error("BatchEventListenerProcessor onException请求写回失败,request:{},errMsg:{} ",request,e.getMessage(),e);
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# Container
@Override
public void init() {
NettyCoreProcessor nettyCoreProcessor = new NettyCoreProcessor();
//如果启动要使用多生产者多消费组 那么我们读取配置
if(BUFFER_TYPE_PARALLEL.equals(config.getBufferType())){
//开启配置的情况下使用Disruptor
this.nettyProcessor = new DisruptorNettyCoreProcessor(config,nettyCoreProcessor);
}else{
this. nettyProcessor = nettyCoreProcessor;
}
this.nettyHttpServer = new NettyHttpServer(config, nettyProcessor);
this.nettyHttpClient = new NettyHttpClient(config,
nettyHttpServer.getEventLoopGroupWoker());
}
@Override
public void start() {
nettyProcessor.start();
nettyHttpServer.start();;
nettyHttpClient.start();
log.info("api gateway started!");
}
@Override
public void shutdown() {
nettyProcessor.shutDown();
nettyHttpServer.shutdown();
nettyHttpClient.shutdown();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32