IO模型
# IO模型
应用程序 A向应用 B发送一条信息;
步骤:
- 应用A把消息发送到 TCP发送缓冲区;
- TCP发送缓冲区再把消息发送出去,经过网络传递后,消息会发送到B服务器的TCP接收缓冲区;
- B再从TCP接收缓冲区去读取属于自己的数据;
# BIO
什么是 BIO?
应用 B发起读取数据的请求(调用 recvfrom),内核数据没有准备好之前,B一直处于等待状态,直到内核将数据准备好,交给 B为止;
BIO缺点?
每一个连接到来,经过协调器的处理,就开启一个对应的线程进行接管。如果连接有 1000条,那就需要 1000个线程。线程资源是非常昂贵的,除了占用大量的内存,还会占用非常多的 CPU调度时间,所以 BIO在连接非常多的情况下,效率会变得非常低;
使用 ServerSocket
实现socket服务器,监听8888端口:
public class BIO {
static boolean stop = false;
public static void main(String[] args) throws Exception {
int connectionNum = 0;
//服务 ip + port
int port = 8888;
ExecutorService service = Executors.newCachedThreadPool();
//创建socket服务
ServerSocket serverSocket = new ServerSocket(port);
while (!stop) {
if (10 == connectionNum) {
stop = true;
}
Socket socket = serverSocket.accept();
service.execute(() -> {
try {
Scanner scanner = new Scanner(socket.getInputStream());
PrintStream printStream = new PrintStream(socket.getOutputStream());
while (!stop) {
String s = scanner.next().trim();
printStream.println("PONG:" + s);
}
} catch (Exception ex) {
ex.printStackTrace();
}
});
connectionNum++;
}
service.shutdown();
serverSocket.close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# NIO
什么是 NIO?
内核数据没有准备好之前,直接通知 应用 B数据未准备好,B 下个时间节点继续询问内核数据是否准备好;
NIO缺点?
并发规模过大,所有请求不知道内核何时数据准备好,不断发送 recvfrom请求读取数据,资源浪费严重;
# 多路复用
什么是 IO多路复用?
思路:一个或多个线程监控多个网络请求(fd文件描述符,linux系统把所有网络请求以一个fd来标识),这样就可以只需要一个或几个线程就可以完成数据状态询问的操作(不必为每个fd创建一个对应的监控线程),当有数据准备就绪之后再分配对应的线程去读取数据,这么做就可以节省出大量的线程资源出来;
IO多路复用实现有:select , poll , epoll;
详细见:https://hedsay.github.io/rui-notes/pages/ebb907/
使用Selector
案例:
public class NIO {
static boolean stop = false;
public static void main(String[] args) throws Exception {
int connectionNum = 0;
int port = 8888;
ExecutorService service = Executors.newCachedThreadPool();
//创建了一个服务端ssc,并开启一个新的事件选择器,监听它的OP_ACCEPT事件。
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.socket().bind(new InetSocketAddress("localhost", port));
Selector selector = Selector.open();
//共有4种事件类型。
// 分别是新连接事件(OP_ACCEPT)、连接就绪事件(OP_CONNECT)、读就绪事件(OP_READ)、写就绪事件(OP_WRITE)。
// 任何网络和文件操作,都可以抽象成这四个事件
//SelectionKey.OP_ACCEPT = 16 ssc.validOps()=16
ssc.register(selector, ssc.validOps());
while (!stop) {
if (10 == connectionNum) {
stop = true;
}
//在while循环里,使用select函数,阻塞在主线程里。
//所谓阻塞,就是操作系统不再分配CPU事件片到当前线程中,所以select函数是几乎不占用任何系统资源的
int num = selector.select();
if (num == 0) {
continue;
}
//一旦有新的事件到达,比如有新的连接到来,主线程就能够被调度到,程序就能够向下执行。
//这时候,就能够根据订阅的事件通知,持续获取订阅的事件。
//由于注册到selector的连接和事件可能会有多个,所以这些事件也会有多个。
//使用安全的迭代器循环进行处理,在处理完毕之后,将它删除。
//如果事件不删除的话,或者漏掉了某个事件的处理,会怎么样呢?
//后果还是比较严重的,由于事件总是存在,我们的程序会陷入无休无止的循环之中。
Iterator<SelectionKey> events = selector.selectedKeys().iterator();
while (events.hasNext()) {
SelectionKey event = events.next();
if (event.isAcceptable()) {
//NIO操作的对象是抽象的概念Channel,通过缓冲区进行数据交换
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
//订阅OP_READ事件,数据流读取
sc.register(selector, SelectionKey.OP_READ);
connectionNum++;
} else if (event.isReadable()) {
try {
SocketChannel sc = (SocketChannel) event.channel();
//创建了一个1024字节的缓冲区,用于数据的读取
//如果连接中的数据,大于1024字节怎么办?
//水平触发 (level-triggered) 称作LT模式。只要缓冲区有数据,事件就会一直发生
//边缘触发 (edge-triggered) 称作ET模式。缓冲区有数据,仅会触发一次。事件想要再次触发,必须先将fd中的数据读完才行
//java的NIO使用了LT模式,LT模式频繁环唤醒线程,效率相比较ET模式低,所以Netty使用JNI的方式,实现了ET模式,效率上更高一些
ByteBuffer buf = ByteBuffer.allocate(1024);
//这依旧是阻塞的
int size = sc.read(buf);
if(-1==size){
sc.close();
}
String result = new String(buf.array()).trim();
ByteBuffer wrap = ByteBuffer.wrap(("PONG:" + result).getBytes());
sc.write(wrap);
} catch (Exception ex) {
ex.printStackTrace();
}
} else if (event.isWritable()) {
SocketChannel sc = (SocketChannel) event.channel();
}
events.remove();
}
}
service.shutdown();
ssc.close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# Netty
# Reactor模式
多线程模型(区别单线程):
- acceptor线程用于监听服务端,接收客户端的TCP连接请求;
- 网络I/O操作——读、写等由一个NIO线程池负责,线程池可以采用标准的JDK线程池实现,它包含一个任务队列和N个可用的线程,由这些NIO线程负责消息的读取、解码、编码和发送;
- 1个NIO线程可以同时处理N条链路,但是1个链路只对应1个NIO线程,防止发生并发操作问题;
缺点:百万客户端并发连接,或者服务端需要对客户端的握手信息进行安全认证,认证本身非常损耗性能;
解决:主从Reactor多线程模型
接受客户端连接的也是一个独立的 NIO线程池,acceptor接受到客户端 TCP 连接请求处理完成后,将新创建的SocketChannel注册到 I/O线程池(sub reactor线程池)的某个I/O线程上,由它负责SocketChannel的读写和编解码工作;
主从Reactor多线程模型工作流程:
Reactor 主线程 MainReactor 对象通过 select 监听客户端连接事件,收到事件后,通过 Acceptor 处理客户端连接事件。
当 Acceptor 处理完客户端连接事件之后(与客户端建立好 Socket 连接),MainReactor 将连接分配给 SubReactor。
即:MainReactor 只负责监听客户端连接请求,和客户端建立连接之后将连接交由 SubReactor 监听后面的 IO 事件。
SubReactor 将连接加入到自己的连接队列进行监听,并创建 Handler 对各种事件进行处理。
当连接上有新事件发生的时候,SubReactor 就会调用对应的 Handler 处理。
Handler 通过 read 从连接上读取请求数据,将请求数据分发给 Worker 线程池进行业务处理。
Worker 线程池会分配独立线程来完成真正的业务处理,并将处理结果返回给 Handler。Handler 通过 send 向客户端发送响应数据。
一个 MainReactor 可以对应多个 SubReactor,即一个 MainReactor 线程可以对应多个 SubReactor 线程。
# Netty架构
Boss线程对应着对连接的处理和分派,相当于mainReactor;Work线程 对应着subReactor,使用多线程负责读写事件的分发和处理;
说明:
- Netty 抽象出两组线程池:
BossGroup
和WorkerGroup
。每个线程池中都有NioEventLoop
线程。BossGroup 中的线程专门负责和客户端建立连接,WorkerGroup 中的线程专门负责处理连接上的读写。BossGroup 和 WorkerGroup 的类型都是NioEventLoopGroup
。 NioEventLoopGroup
相当于一个事件循环组,这个组中含有多个事件循环,每个事件循环就是一个NioEventLoop
。NioEventLoop
表示一个不断循环的执行事件处理的线程,每个NioEventLoop
都包含一个Selector
,用于监听注册在其上的 Socket 网络连接(Channel
)。NioEventLoopGroup
可以含有多个线程,即可以含有多个NioEventLoop
。- 每个
BossNioEventLoop
中循环执行以下三个步骤:- select:轮询注册在其上的
ServerSocketChannel
的 accept 事件(OP_ACCEPT 事件); - processSelectedKeys:处理 accept 事件,与客户端建立连接,生成一个
NioSocketChannel
,并将其注册到某个WorkerNioEventLoop
上的Selector
上; - runAllTasks:再去以此循环处理任务队列中的其他任务;
- select:轮询注册在其上的
- 每个
WorkerNioEventLoop
中循环执行以下三个步骤:- select:轮询注册在其上的
NioSocketChannel
的 read/write 事件(OP_READ/OP_WRITE 事件); - processSelectedKeys:在对应的
NioSocketChannel
上处理 read/write 事件; - runAllTasks:再去以此循环处理任务队列中的其他任务;
- select:轮询注册在其上的
- 在以上两个processSelectedKeys步骤中,会使用 Pipeline(管道),Pipeline 中引用了 Channel,即通过 Pipeline 可以获取到对应的 Channel,Pipeline 中维护了很多的处理器(拦截处理器、过滤处理器、自定义处理器等)。
# Netty案例
# 服务端
public class TCPServer {
public static void main(String[] args) {
// 创建 BossGroup 和 WorkerGroup
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 创建服务器端的启动对象
//ServerBootstrap 服务器端的引导类,
// 一个 Netty 应用程序通常由一个引导类开始,主要是用来配置整个 Netty 程序、设置业务处理类(Handler)、绑定端口、发起连接等
ServerBootstrap bootstrap = new ServerBootstrap();
// 配置参数
bootstrap
// 设置线程组
.group(bossGroup, workerGroup)
// 说明服务器端通道的实现类(便于 Netty 做反射处理)
// 服务端首先创建一个 NioServerSocketChannel 作为服务器端通道,
// 每当接收一个客户端连接就产生一个 NioSocketChannel 应对该客户端
.channel(NioServerSocketChannel.class)
// 设置等待连接的队列的容量(当客户端连接请求速率大于 NioServerSocketChannel 接收速率的时候,会使用该队列做缓冲)
// option()方法用于给服务端的 ServerSocketChannel添加配置
.option(ChannelOption.SO_BACKLOG, 128)
// 设置连接保活
// childOption()方法用于给服务端 ServerSocketChannel
// 接收到的 SocketChannel 添加配置
.childOption(ChannelOption.SO_KEEPALIVE, true)
// handler()方法用于给 BossGroup 设置业务处理器
// childHandler()方法用于给 WorkerGroup 设置业务处理器
.childHandler(
// 创建一个通道初始化对象
new ChannelInitializer<SocketChannel>() {
// 向 Pipeline 添加业务处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyServerHandler());
// 可以继续调用 socketChannel.pipeline().addLast()
// 添加更多 Handler
}
}
);
System.out.println("server is ready...");
// 绑定端口,启动服务器,生成一个 channelFuture 对象,
// ChannelFuture 涉及到 Netty 的异步模型,后面展开讲
ChannelFuture channelFuture = bootstrap.bind(8080).sync();
// 对通道关闭进行监听
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 当通道有数据可读时执行
* @param ctx 上下文对象,可以从中取得相关联的 Pipeline、Channel、客户端地址等
* @param msg 客户端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 接收客户端发来的数据
System.out.println("client address: " + ctx.channel().remoteAddress());
// ByteBuf 是 Netty 提供的类,比 NIO 的 ByteBuffer 性能更高
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("data from client: " + byteBuf.toString(CharsetUtil.UTF_8));
}
/**
* 数据读取完毕后执行
* @param ctx 上下文对象
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 发送响应给客户端
ctx.writeAndFlush(
// Unpooled 类是 Netty 提供的专门操作缓冲区的工具类,
// copiedBuffer 方法返回的 ByteBuf 对象类似于
// NIO 中的 ByteBuffer,但性能更高
Unpooled.copiedBuffer(
"hello client! i have got your data.",
CharsetUtil.UTF_8
)
);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 关闭与客户端的 Socket 连接
ctx.channel().close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 客户端
public class TCPClient {
public static void main(String[] args) {
//客户端需要一个事件循环组,可以看做 BossGroup
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
// 创建客户端的启动对象
//Bootstrap 客户端引导类
Bootstrap bootstrap = new Bootstrap();
// 配置参数
bootstrap
// 设置线程组
.group(eventLoopGroup)
// 说明客户端通道的实现类(便于 Netty 做反射处理)
// 客户端创建一个 NioSocketChannel 作为客户端通道,去连接服务器
.channel(NioSocketChannel.class)
// handler()方法用于给 BossGroup 设置业务处理器
.handler(
// 创建一个通道初始化对象
//当一个链接建立时,我们需要知道怎么来接收或者发送数据,
// 当然,我们有各种各样的Handler实现来处理它,
// 那么ChannelInitializer便是用来配置这些Handler,
// 它会提供一个ChannelPipeline,并把Handler加入到ChannelPipeline
new ChannelInitializer<SocketChannel>() {
// 向 Pipeline 添加业务处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler());
// 可以继续调用 socketChannel.pipeline().addLast()
// 添加更多 Handler
}
}
);
System.out.println("client is ready...");
// 启动客户端去连接服务器端,ChannelFuture 涉及到 Netty 的异步模型
//在Netty中所有的IO操作都是异步的,因此,你不能立刻得知消息是否被正确处理,
// 但是我们可以过一会等它执行完成或者直接注册一个监听,
// 具体的实现就是通过Future和ChannelFutures,
// 他们可以注册一个监听,当操作执行成功或失败时监听会自动触发。
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
// 对通道关闭进行监听
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 向服务器发送数据
ctx.writeAndFlush(
// Unpooled 类是 Netty 提供的专门操作缓冲区的工具类,copiedBuffer 方法返回的 ByteBuf 对象类似于
// NIO 中的 ByteBuffer,但性能更高
Unpooled.copiedBuffer(
"hello server!",
CharsetUtil.UTF_8
)
);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 接收服务器端发来的数据
System.out.println("server address: "
+ ctx.channel().remoteAddress());
// ByteBuf 是 Netty 提供的类,比 NIO 的 ByteBuffer 性能更高
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("data from server: "
+ byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 关闭与服务器端的 Socket 连接
ctx.channel().close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31