Netty学习笔记
# Netty案例代码
加入依赖:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.39.Final</version>
</dependency>
2
3
4
5
服务端:
public class HelloServer {
public static void main(String[] args) {
//1.启动器,服务组装netty组件,启动服务器
new ServerBootstrap()
//2.boss,worker组
.group(new NioEventLoopGroup())
//3.选择服务器ServerSocketChannel 实现
.channel(NioServerSocketChannel.class)
//4.boss负责处理连接,worker负责处理读写,决定了worker能执行哪些操作(handler)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringDecoder());//将bytebuf转换为字符串
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println(msg);
}
});
}
})
.bind(9527);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
客户端:
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect("127.0.0.1", 9527)
.sync()//阻塞方法,直到连接建立
.channel()
.writeAndFlush(new Date() + ": hello world!");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Netty组件
# EventLoopGroup
事件循环EventLoop:
EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件;
一个Channel与一个EventLoop绑定;
继承关系:
- 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法;
- 另一条线是继承自 netty 自己的 OrderedEventExecutor,
- 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop;
- 提供了 parent 方法来看看自己属于哪个 EventLoopGroup;
事件循环组EventLoopGroup :
是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全);
执行任务:
public static void main(String[] args) {
//1.创建事件循环组
EventLoopGroup group = new NioEventLoopGroup(2);//io事件,普通任务,定时任务
// EventLoopGroup group = new DefaultEventLoopGroup();//普通任务,定时任务
//2.获取下一个事件循环对象
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
//3.执行普通任务
group.next().submit(()->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
//4.执行定时任务
group.next().scheduleAtFixedRate(()->{
System.out.println("ok");
},0,1, TimeUnit.SECONDS);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# Channel
相关API:
- close() 可以用来关闭 channel
- closeFuture() 用来处理 channel 的关闭
- sync 方法作用是同步等待 channel 关闭
- 而 addListener 方法是异步等待 channel 关闭
- pipeline() 方法添加处理器
- write() 方法将数据写入
- writeAndFlush() 方法将数据写入并刷出
ChannelFuture
客户端代码拆解:
//返回的是 ChannelFuture 对象,它的作用是利用 channel() 方法来获取 Channel 对象
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect("127.0.0.1", 8080);
channelFuture.sync().channel().writeAndFlush(new Date() + ": hello world!");
2
3
4
5
6
7
8
9
10
11
12
13
sync()方法作用:异步操作同步;
connect方法是异步非阻塞的,不等连接建立,方法执行就返回了,channelFuture对象不能立即获得channel对象:
ChannelFuture channelFuture = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) { ch.pipeline().addLast(new StringEncoder()); } }) .connect("127.0.0.1", 8080); System.out.println(channelFuture.channel()); // 1 channelFuture.sync(); // 2 System.out.println(channelFuture.channel()); // 3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
- 执行到 1 时,连接未建立,打印
[id: 0x2e1884dd]
- 执行到 2 时,sync 方法是同步等待连接建立完成
- 执行到 3 时,连接肯定建立了,打印
[id: 0x2e1884dd, L:/127.0.0.1:57191 - R:/127.0.0.1:8080]
除了sync()方法,还可以采用回调方式:
ChannelFuture channelFuture = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) { ch.pipeline().addLast(new StringEncoder()); } }) .connect("127.0.0.1", 8080); System.out.println(channelFuture.channel()); // 1 channelFuture.addListener((ChannelFutureListener) future -> { System.out.println(future.channel()); // 2 });
1
2
3
4
5
6
7
8
9
10
11
12
13
14
- 执行到 1 时,连接未建立,打印
[id: 0x749124ba]
- ChannelFutureListener 会在连接建立时被调用(其中 operationComplete 方法),因此执行到 2 时,连接肯定建立了,打印
[id: 0x749124ba, L:/127.0.0.1:57351 - R:/127.0.0.1:8080]
# Future&Promise
jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果;
// 1. 线程池 ExecutorService service = Executors.newFixedThreadPool(2); // 2. 提交任务 Future<Integer> future = service.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { log.debug("执行计算"); Thread.sleep(1000); return 50; } }); // 3. 主线程通过 future 来获取结果 log.debug("等待结果"); log.debug("结果是 {}", future.get());
1
2
3
4
5
6
7
8
9
10
11
12
13
14netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束;
NioEventLoopGroup group = new NioEventLoopGroup(); EventLoop eventLoop = group.next(); Future<Integer> future = eventLoop.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { log.debug("执行计算"); Thread.sleep(1000); return 70; } }); // log.debug("等待结果"); // log.debug("结果是 {}", future.get()); future.addListener(new GenericFutureListener<Future<? super Integer>>(){ @Override public void operationComplete(Future<? super Integer> future) throws Exception { log.debug("接收结果:{}", future.getNow()); } });
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器;
// 1. 准备 EventLoop 对象 EventLoop eventLoop = new NioEventLoopGroup().next(); // 2. 可以主动创建 promise, 结果容器 DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop); new Thread(() -> { // 3. 任意一个线程执行计算,计算完毕后向 promise 填充结果 log.debug("开始计算..."); try { int i = 1 / 0; Thread.sleep(1000); promise.setSuccess(80); } catch (Exception e) { e.printStackTrace(); promise.setFailure(e);//也可以填充异常结果 } }).start(); // 4. 接收结果的线程 log.debug("等待结果..."); log.debug("结果是: {}", promise.get());
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
常用API:
功能/名称 | jdk Future | netty Future | Promise |
---|---|---|---|
cancel | 取消任务 | - | - |
isCanceled | 任务是否取消 | - | - |
isDone | 任务是否完成,不能区分成功失败 | - | - |
get | 获取任务结果,阻塞等待 | - | - |
getNow | - | 获取任务结果,非阻塞,还未产生结果时返回 null | - |
await | - | 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 | - |
sync | - | 等待任务结束,如果任务失败,抛出异常 | - |
isSuccess | - | 判断任务是否成功 | - |
cause | - | 获取失败信息,非阻塞,如果没有失败,返回null | - |
addLinstener | - | 添加回调,异步接收结果 | - |
setSuccess | - | - | 设置成功结果 |
setFailure | - | - | 设置失败结果 |
案例:
同步处理任务成功
DefaultEventLoop eventExecutors = new DefaultEventLoop(); DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors); eventExecutors.execute(()->{ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("set success, {}",10); promise.setSuccess(10); }); log.debug("start..."); log.debug("{}",promise.getNow()); // 还没有结果 log.debug("{}",promise.get());
1
2
3
4
5
6
7
8
9
10
11
12
13
14异步处理任务成功
DefaultEventLoop eventExecutors = new DefaultEventLoop(); DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors); //设置回调,异步接受结果 promise.addListener(future->{ //这里future就是上面promise log.debug("{}",future.getNow()); }); eventExecutors.execute(()->{ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("set success,{}",10); promise.setSuccess(10); }); log.debug("start...");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20打印结果:
[main] DEBUG com.Nreal.itheima.netty.demo02.T2 - start... [defaultEventLoop-1-1] DEBUG com.Nreal.itheima.netty.demo02.T2 - set success,10 [defaultEventLoop-1-1] DEBUG com.Nreal.itheima.netty.demo02.T2 - 10
# Handler
ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline;
- 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果;
- 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工;
服务端:
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 1. 通过 channel 拿到 pipeline
ChannelPipeline pipeline = ch.pipeline();
// 2. 添加处理器 head -> h1 -> h2 -> h4 -> h3 -> h5 -> h6 -> tail
// 打印 1 2 3 6 5 4
pipeline.addLast("h1", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
ByteBuf buf = (ByteBuf) msg;
String name = buf.toString(Charset.defaultCharset());
super.channelRead(ctx, name);
}
});
pipeline.addLast("h2", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object name) throws Exception {
log.debug("2");
Student student = new Student(name.toString());
// 将数据传递给下个 handler,如果不调用,调用链会断开 或者调用 ctx.fireChannelRead(student);
super.channelRead(ctx, student);
}
});
pipeline.addLast("h3", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("3,结果{}",msg,msg.getClass());
//无此行 不会出栈
ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
}
});
pipeline.addLast("h4", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h5", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("5");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h6", new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("6");
super.write(ctx, msg, promise);
}
});
}
})
.bind(8080);
}
@Data
@AllArgsConstructor
static class Student {
private String name;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
客户端:
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
System.out.println(channelFuture.getClass());
Channel channel = channelFuture.sync().channel();
log.debug("{}", channel);
new Thread(()->{
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("q".equals(line)) {
channel.close(); // close 异步操作 1s 之后
break;
}
channel.writeAndFlush(line);
}
}, "input").start();
// 获取 CloseFuture 对象, 1) 同步处理关闭, 2) 异步处理关闭
ChannelFuture closeFuture = channel.closeFuture();
/*log.debug("waiting close...");
closeFuture.sync();
log.debug("处理关闭之后的操作");*/
System.out.println(closeFuture.getClass());
closeFuture.addListener((ChannelFutureListener) future -> {
log.debug("处理关闭之后的操作");
group.shutdownGracefully();
});
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38