Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
Home
  • 计算机网络
  • 操作系统
  • 数据结构与算法
  • 设计模式
  • JavaSE
  • JVM
  • JUC
  • Netty
  • CPP
  • QT
  • UE
  • Go
  • Gin
  • Gorm
  • HTML
  • CSS
  • JavaScript
  • vue2
  • TypeScript
  • vue3
  • react
  • Spring
  • SpringMVC
  • Mybatis
  • SpringBoot
  • SpringSecurity
  • SpringCloud
  • Mysql
  • Redis
  • 消息中间件
  • RPC
  • 分布式锁
  • 分布式事务
  • 个人博客
  • 弹幕视频平台
  • API网关
  • 售票系统
  • 消息推送平台
  • SaaS短链接系统
  • Linux
  • Docker
  • Git
GitHub (opens new window)
  • Netty学习笔记
    • Netty案例代码
    • Netty组件
      • EventLoopGroup
      • Channel
      • Future&Promise
      • Handler
  • 粘包与半包
  • 协议设计与解析
  • 案例—聊天室
  • Netty
Nreal
2023-12-01
目录

Netty学习笔记

# Netty案例代码

加入依赖:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.39.Final</version>
</dependency>
1
2
3
4
5

服务端:

public class HelloServer {
    public static void main(String[] args) {
        //1.启动器,服务组装netty组件,启动服务器
        new ServerBootstrap()
                //2.boss,worker组
                .group(new NioEventLoopGroup())
                //3.选择服务器ServerSocketChannel 实现
                .channel(NioServerSocketChannel.class)
                //4.boss负责处理连接,worker负责处理读写,决定了worker能执行哪些操作(handler)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    protected void initChannel(NioSocketChannel ch) {
                        ch.pipeline().addLast(new StringDecoder());//将bytebuf转换为字符串
                        ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, String msg) {
                                System.out.println(msg);
                            }
                        });
                    }
                })
                .bind(9527);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

客户端:

public class HelloClient {
    public static void main(String[] args) throws InterruptedException {
        new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect("127.0.0.1", 9527)
                .sync()//阻塞方法,直到连接建立
                .channel()
                .writeAndFlush(new Date() + ": hello world!");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# Netty组件

# EventLoopGroup

事件循环EventLoop:

EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件;

一个Channel与一个EventLoop绑定;

继承关系:

  • 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法;
  • 另一条线是继承自 netty 自己的 OrderedEventExecutor,
    • 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop;
    • 提供了 parent 方法来看看自己属于哪个 EventLoopGroup;

事件循环组EventLoopGroup :

是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全);

执行任务:

public static void main(String[] args) {
    //1.创建事件循环组
    EventLoopGroup group = new NioEventLoopGroup(2);//io事件,普通任务,定时任务
//        EventLoopGroup group = new DefaultEventLoopGroup();//普通任务,定时任务
    //2.获取下一个事件循环对象
    System.out.println(group.next());
    System.out.println(group.next());
    System.out.println(group.next());
    //3.执行普通任务
    group.next().submit(()->{
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    //4.执行定时任务
    group.next().scheduleAtFixedRate(()->{
        System.out.println("ok");
    },0,1, TimeUnit.SECONDS);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# Channel

相关API:

  • close() 可以用来关闭 channel
  • closeFuture() 用来处理 channel 的关闭
    • sync 方法作用是同步等待 channel 关闭
    • 而 addListener 方法是异步等待 channel 关闭
  • pipeline() 方法添加处理器
  • write() 方法将数据写入
  • writeAndFlush() 方法将数据写入并刷出

ChannelFuture

客户端代码拆解:

//返回的是 ChannelFuture 对象,它的作用是利用 channel() 方法来获取 Channel 对象
ChannelFuture channelFuture = new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1", 8080); 

channelFuture.sync().channel().writeAndFlush(new Date() + ": hello world!");
1
2
3
4
5
6
7
8
9
10
11
12
13

sync()方法作用:异步操作同步;

connect方法是异步非阻塞的,不等连接建立,方法执行就返回了,channelFuture对象不能立即获得channel对象:

ChannelFuture channelFuture = new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1", 8080);

System.out.println(channelFuture.channel()); // 1
channelFuture.sync(); // 2
System.out.println(channelFuture.channel()); // 3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
  • 执行到 1 时,连接未建立,打印 [id: 0x2e1884dd]
  • 执行到 2 时,sync 方法是同步等待连接建立完成
  • 执行到 3 时,连接肯定建立了,打印 [id: 0x2e1884dd, L:/127.0.0.1:57191 - R:/127.0.0.1:8080]

除了sync()方法,还可以采用回调方式:

ChannelFuture channelFuture = new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1", 8080);
System.out.println(channelFuture.channel()); // 1
channelFuture.addListener((ChannelFutureListener) future -> {
    System.out.println(future.channel()); // 2
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
  • 执行到 1 时,连接未建立,打印 [id: 0x749124ba]
  • ChannelFutureListener 会在连接建立时被调用(其中 operationComplete 方法),因此执行到 2 时,连接肯定建立了,打印 [id: 0x749124ba, L:/127.0.0.1:57351 - R:/127.0.0.1:8080]

# Future&Promise

  • jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果;

    // 1. 线程池
    ExecutorService service = Executors.newFixedThreadPool(2);
    // 2. 提交任务
    Future<Integer> future = service.submit(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            log.debug("执行计算");
            Thread.sleep(1000);
            return 50;
        }
    });
    // 3. 主线程通过 future 来获取结果
    log.debug("等待结果");
    log.debug("结果是 {}", future.get());
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
  • netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束;

    NioEventLoopGroup group = new NioEventLoopGroup();
    EventLoop eventLoop = group.next();
    Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            log.debug("执行计算");
            Thread.sleep(1000);
            return 70;
        }
    });
    //        log.debug("等待结果");
    //        log.debug("结果是 {}", future.get());
    future.addListener(new GenericFutureListener<Future<? super Integer>>(){
        @Override
        public void operationComplete(Future<? super Integer> future) throws Exception {
            log.debug("接收结果:{}", future.getNow());
        }
    });
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
  • netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器;

    // 1. 准备 EventLoop 对象
    EventLoop eventLoop = new NioEventLoopGroup().next();
    // 2. 可以主动创建 promise, 结果容器
    DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
    new Thread(() -> {
        // 3. 任意一个线程执行计算,计算完毕后向 promise 填充结果
        log.debug("开始计算...");
        try {
            int i = 1 / 0;
            Thread.sleep(1000);
            promise.setSuccess(80);
        } catch (Exception e) {
            e.printStackTrace();
            promise.setFailure(e);//也可以填充异常结果
        }
    
    }).start();
    // 4. 接收结果的线程
    log.debug("等待结果...");
    log.debug("结果是: {}", promise.get());
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20

常用API:

功能/名称 jdk Future netty Future Promise
cancel 取消任务 - -
isCanceled 任务是否取消 - -
isDone 任务是否完成,不能区分成功失败 - -
get 获取任务结果,阻塞等待 - -
getNow - 获取任务结果,非阻塞,还未产生结果时返回 null -
await - 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 -
sync - 等待任务结束,如果任务失败,抛出异常 -
isSuccess - 判断任务是否成功 -
cause - 获取失败信息,非阻塞,如果没有失败,返回null -
addLinstener - 添加回调,异步接收结果 -
setSuccess - - 设置成功结果
setFailure - - 设置失败结果

案例:

  1. 同步处理任务成功

    DefaultEventLoop eventExecutors = new DefaultEventLoop();
    DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);
    eventExecutors.execute(()->{
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.debug("set success, {}",10);
        promise.setSuccess(10);
    });
    log.debug("start...");
    log.debug("{}",promise.getNow()); // 还没有结果
    log.debug("{}",promise.get());
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
  2. 异步处理任务成功

    DefaultEventLoop eventExecutors = new DefaultEventLoop();
    DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);
    
    //设置回调,异步接受结果
    promise.addListener(future->{
        //这里future就是上面promise
        log.debug("{}",future.getNow());
    });
    
    eventExecutors.execute(()->{
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.debug("set success,{}",10);
        promise.setSuccess(10);
    });
    
    log.debug("start...");
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20

    打印结果:

    [main] DEBUG com.Nreal.itheima.netty.demo02.T2 - start... [defaultEventLoop-1-1] DEBUG com.Nreal.itheima.netty.demo02.T2 - set success,10 [defaultEventLoop-1-1] DEBUG com.Nreal.itheima.netty.demo02.T2 - 10

# Handler

ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline;

  • 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果;
  • 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工;

服务端:

public static void main(String[] args) {
    new ServerBootstrap()
            .group(new NioEventLoopGroup())
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    // 1. 通过 channel 拿到 pipeline
                    ChannelPipeline pipeline = ch.pipeline();
                    // 2. 添加处理器 head ->  h1 -> h2 ->  h4 -> h3 -> h5 -> h6 -> tail
                    // 打印 1 2 3 6 5 4
                    pipeline.addLast("h1", new ChannelInboundHandlerAdapter(){
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            log.debug("1");
                            ByteBuf buf = (ByteBuf) msg;
                            String name = buf.toString(Charset.defaultCharset());
                            super.channelRead(ctx, name);
                        }
                    });
                    pipeline.addLast("h2", new ChannelInboundHandlerAdapter(){
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object name) throws Exception {
                            log.debug("2");
                            Student student = new Student(name.toString());
                            // 将数据传递给下个 handler,如果不调用,调用链会断开 或者调用 ctx.fireChannelRead(student);
                            super.channelRead(ctx, student);
                        }
                    });

                    pipeline.addLast("h3", new ChannelInboundHandlerAdapter(){
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            log.debug("3,结果{}",msg,msg.getClass());
                            //无此行 不会出栈
                            ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
                        }
                    });
                    pipeline.addLast("h4", new ChannelOutboundHandlerAdapter(){
                        @Override
                        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                            log.debug("4");
                            super.write(ctx, msg, promise);
                        }
                    });
                    pipeline.addLast("h5", new ChannelOutboundHandlerAdapter(){
                        @Override
                        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                            log.debug("5");
                            super.write(ctx, msg, promise);
                        }
                    });
                    pipeline.addLast("h6", new ChannelOutboundHandlerAdapter(){
                        @Override
                        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                            log.debug("6");
                            super.write(ctx, msg, promise);
                        }
                    });
                }
            })
            .bind(8080);
}
@Data
@AllArgsConstructor
static class Student {
    private String name;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68

客户端:

public static void main(String[] args) throws InterruptedException {
    NioEventLoopGroup group = new NioEventLoopGroup();
    ChannelFuture channelFuture = new Bootstrap()
            .group(group)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<NioSocketChannel>() {
                @Override // 在连接建立后被调用
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new StringEncoder());
                }
            })
            .connect(new InetSocketAddress("localhost", 8080));
    System.out.println(channelFuture.getClass());
    Channel channel = channelFuture.sync().channel();
    log.debug("{}", channel);
    new Thread(()->{
        Scanner scanner = new Scanner(System.in);
        while (true) {
            String line = scanner.nextLine();
            if ("q".equals(line)) {
                channel.close(); // close 异步操作 1s 之后
                break;
            }
            channel.writeAndFlush(line);
        }
    }, "input").start();

    // 获取 CloseFuture 对象, 1) 同步处理关闭, 2) 异步处理关闭
    ChannelFuture closeFuture = channel.closeFuture();
    /*log.debug("waiting close...");
    closeFuture.sync();
    log.debug("处理关闭之后的操作");*/
    System.out.println(closeFuture.getClass());
    closeFuture.addListener((ChannelFutureListener) future -> {
        log.debug("处理关闭之后的操作");
        group.shutdownGracefully();
    });
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
粘包与半包

粘包与半包→

Theme by Vdoing | Copyright © 2021-2024
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式