选择器 Selector 是 I/O 多路复用模型的核心组件,它可以监控实现了 SelectableChannel 接口的通道的就绪情况。基于多路复用(multiplexing) I/O 模型,单线程的 Java 程序能够处理数万个连接,极大提高了系统的并发数。
1. 多路复用 I/O 模型
I/O 多路复用模型是操作系统提供给应用程序的一种进行 I/O 操作的模型。应用程序通过 select/poll 系统调用监控多个 I/O 设备,一旦某个或者多个 I/O 设备的处于就绪状态(例如:可读)则返回,应用程序随后可对就绪的设备进行操作。
大致流程如下:
1)应用程序向内核发起 select 系统调用,该调用会阻塞应用程序。
2)内核等待数据到达。数据可能由 DMA 复制到内核缓冲区,也有可能是 CPU 进行复制。
3)数据准备完毕,select 调用返回。select 返回的是一个集和,可能有多个连接都已经就绪。
4)应用程序发起 read 系统调用。
5)操作系统将数据有内核缓冲区复制到用户缓冲区。
6)read 调用返回。
I/O 多路复用模型本质上是一种阻塞 I/O,进行读操作的 read 系统调用是阻塞的,select 的时候也是阻塞的。不过 I/O 多路复用模型的优势在于阻塞时可以等待多路 I/O 就绪,然后一并处理。与多线程处理多路 I/O 相比,它是单线程的,没有线程切换的开销,单位时间内能够处理多的连接数。
2. 选择器与通道关系
在 Java 中,通道 Channel 可以表示 I/O 连接,而选择器可以监控某些 I/O 事件就绪的通道,选择通道中就绪的 I/O 事件。这里的通道必须是实现了 SelectableChannel 接口的通道,例如:SocketChannel, DatagramChannel 等;而文件通道 FileChannel 没有实现该接口,所以不支持选择器。
3. 选择键 SelectionKey
选择器 Selector 监控通道时监控的是通道中的事件,选择键 SelectionKey 就代表着 I/O 事件。程序通过调用 Selector.select() 方法来选中选择器所监控的通道中的就绪的 I/O 事件的集合,然后遍历集合,对事件作出相应的处理。
选择键 SelectionKey 可以表示 4 种事件,这 4 种事件使用 int 类型的常量来表示。
1)SelectionKey.OP_ACCEPT 表示 accept 事件就绪。例如:对于 ServerSocketChannel 来说,该事件就绪表示可以调用 accept() 方法来获得与客户端连接的通道 SocketChannel。
2)SelectionKey.OP_CONNECT 表示客户端与服务端连接成功。
3)SelectionKey.OP_READ 表示通道中已经有了可读数据,可以调用 read() 方法从通道中读取数据。
4)SelectionKey.OP_WRITE 表示写事件就绪,可以调用 write() 方法往通道中写入数据。
不同的通道所能够支持的 I/O 事件不同,例如:ServerSocketChannel 只支持 accept 事件,而 DatagramChannel 只支持 read 和 write 事件。要查看通道所支持的事件,可以查看通道的 javadoc 文档,或者调用通道的 validOps() 方法来进行判断。例如:channel.validOps() & SelectionKey.OP_READ > 0 表示 channel 支持读事件。
4. 选择器使用步骤
4.1 获取选择器
与通道和缓冲区的获取类似,选择器的获取也是通过静态工厂方法 open() 来得到的。
Selector selector = Selector.open(); // 获取一个选择器实例
4.2 获取可选择通道
能够被选择器监控的通道必须实现了 SelectableChannel 接口,并且需要将通道配置成非阻塞模式,否则后续的注册步骤会抛出 IllegalBlockingModeException。
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9090)); // 打开 SocketChannel 并连接到本机 9090 端口 socketChannel.configureBlocking(false); // 配置通道为非阻塞模式
4.3 将通道注册到选择器
通道在被指定的选择器监控之前,应该先告诉选择器,并且告知监控的事件,即:将通道注册到选择器。
通道的注册通过 SelectableChannel.register(Selector selector, int ops) 来完成,ops 表示关注的事件,如果需要关注该通道的多个 I/O 事件,可以传入这些事件类型或运算之后的结果。这些事件必须是通道所支持的,否则抛出 IllegalArgumentException。
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); // 将套接字通过到注册到选择器,关注 read 和 write 事件
4.4 轮询 select 就绪事件
通过调用选择器的 Selector.select() 方法可以获取就绪事件,该方法会将就绪事件放到一个 SelectionKey 集合中,然后返回就绪的事件的个数。这个方法映射多路复用 I/O 模型中的 select 系统调用,它是一个阻塞方法。正常情况下,直到至少有一个就绪事件,或者其它线程调用了当前 Selector 对象的 wakeup() 方法,或者当前线程被中断时返回。
while (selector.select() > 0){ // 轮询,且返回时有就绪事件 Set<SelectionKey> keys = selector.selectedKeys(); // 获取就绪事件集合 ....... }
有 3 种方式可以 select 就绪事件:
1)select() 阻塞方法,有一个就绪事件,或者其它线程调用了 wakeup() 或者当前线程被中断时返回。
2)select(long timeout) 阻塞方法,有一个就绪事件,或者其它线程调用了 wakeup(),或者当前线程被中断,或者阻塞时长达到了 timeout 时返回。不抛出超时异常。
3)selectNode() 不阻塞,如果无就绪事件,则返回 0;如果有就绪事件,则将就绪事件放到一个集合,返回就绪事件的数量。
4.5 处理就绪事件
每次可以 select 出一批就绪的事件,所以需要对这些事件进行迭代。从一个 SelectionKey 对象可以得到:1)就绪事件的对应的通道;2)就绪的事件。通过这些信息,就可以很方便地进行 I/O 操作。
for(SelectionKey key : keys){ if(key.isWritable()){ // 可写事件 if("Bye".equals( (line = scanner.nextLine()) )){ socketChannel.shutdownOutput(); socketChannel.close(); break; } buf.put(line.getBytes()); buf.flip(); socketChannel.write(buf); buf.compact(); } } keys.clear(); // 清除选择键(事件)集,避免下次循环的时候重复处理。
需要注意的是,处理完 I/O 事件之后,需要清除选择键集合,避免下一轮循环的时候对同一事件重复处理。
5. 完整示例
下面给出一个完整的实例,实例中包含 TCP 客户端 TcpClient, UDP 客户端 UdpClient 和服务端 EchoServer。服务端 EchoServer 可以同时处理 UDP 请求和 TCP 请求,用户可以在客户端控制台输入内容,按回车发送给服务端,服务端打印客户端发送过来的内容。完整代码:https://github.com/Robothy/java-experiments/tree/main/nio/Selector
5.1 服务端
public class EchoServer { public static void main(String[] args) throws IOException { Selector selector = Selector.open(); // 获取选择器 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 打开服务器通道 serverSocketChannel.configureBlocking(false); // 服务器通道配置为非阻塞模式 serverSocketChannel.bind(new InetSocketAddress(9090)); // 绑定 TCP 端口 9090 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 将服务器通道注册到选择器 selector 中,注册事件为 ACCEPT DatagramChannel datagramChannel = DatagramChannel.open(); // 打开套接字通道 datagramChannel.configureBlocking(false); // 配置通道为非阻塞模式 datagramChannel.bind(new InetSocketAddress(9090)); // 绑定 UDP 端口 9090 datagramChannel.register(selector, SelectionKey.OP_READ); // 将通道注册到选择器 selector 中,注册事件为读取数据 ByteBuffer buf = ByteBuffer.allocate(1024); // 分配一个 1024 字节的堆字节缓冲区 while (selector.select() > 0){ // 轮询已经就绪的注册的通道的 I/O 事件 Set<SelectionKey> keys = selector.selectedKeys(); // 获取就绪的 I/O 事件,即选择器键集合 for (SelectionKey key : keys){ // 遍历选择键,处理就绪事件 if(key.isAcceptable()){ // 选择键的事件的是 I/O 连接事件 SocketChannel socketChannel = serverSocketChannel.accept(); // 执行 I/O 操作,获取套接字连接通道 socketChannel.configureBlocking(false); // 配置为套接字通道为非阻塞模式 socketChannel.register(selector, SelectionKey.OP_READ); // 将套接字通过到注册到选择器,关注 READ 事件 }else if(key.isReadable()){ // 选择键的事件是 READ StringBuilder sb = new StringBuilder(); if(key.channel() instanceof DatagramChannel){ // 选择的通道为数据报通道,客户端是通过 UDP 连接过来的 sb.append("UDP Client: "); datagramChannel.receive(buf); // 最多读取 1024 字节,数据报多出的部分自动丢弃 buf.flip(); while(buf.position() < buf.limit()) { sb.append((char)buf.get()); } buf.clear(); }else{ // 选择的通道为套接字通道,客户端时通过 TCP 连接过来的 sb.append("TCP Client: "); ReadableByteChannel channel = (ReadableByteChannel) key.channel(); // 获取通道 int size; while ( (size = channel.read(buf))>0){ buf.flip(); while (buf.position() < buf.limit()) { sb.append((char)buf.get()); } buf.clear(); } if (size == -1) { sb.append("Exit"); channel.close(); } } System.out.println(sb); } } keys.clear(); // 将选择键清空,防止下次循环时被重复处理 } } }
5.2 TCP 客户端
public class TcpClient { public static void main(String[] args) throws IOException { Selector selector = Selector.open(); SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9090)); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_WRITE); Scanner scanner = new Scanner(System.in); String line; ByteBuffer buf = ByteBuffer.allocate(1024); while (selector.select() > 0){ Set<SelectionKey> keys = selector.selectedKeys(); for(SelectionKey key : keys){ if(key.isWritable()){ if("Bye".equals( (line = scanner.nextLine()) )){ socketChannel.shutdownOutput(); socketChannel.close(); break; } buf.put(line.getBytes()); buf.flip(); socketChannel.write(buf); buf.compact(); } } keys.clear(); if(!socketChannel.isOpen()) break; } } }
5.3 UDP 客户端
public class UdpClient { public static void main(String[] args) throws IOException { Selector selector = Selector.open(); // 获取选择器 DatagramChannel datagramChannel = DatagramChannel.open(); // 打开一个数据报通道 datagramChannel.configureBlocking(false); // 配置通道为非阻塞模式 datagramChannel.register(selector, SelectionKey.OP_WRITE); // 将通道的写事件注册到选择器 ByteBuffer buff = ByteBuffer.allocate(1024); // 分配字节缓冲区 Scanner scanner = new Scanner(System.in); // 创建扫描器,扫描控制台输入流 InetSocketAddress server = new InetSocketAddress("localhost", 9090); while (selector.select() > 0){ // 有就绪事件 Set<SelectionKey> keys = selector.selectedKeys(); // 获取选择键,即就绪的事件 for(SelectionKey key : keys){ // 遍历选择键 if(key.isWritable()){ // 如果当前选择键是读就绪 String line; if("Bye".equals( line = scanner.nextLine() )) { // 从控制台获取 1 行输入,并检查输入的是不是 Bye System.exit(0); // 正常退出 } buff.put(line.getBytes()); // 放入缓冲区 buff.flip(); // 将缓冲区置为读状态 datagramChannel.send(buff, server); // 往 I/O 写数据 buff.compact(); // 压缩缓冲区,保留没发送完的数据 } } keys.clear(); } } }
6. 小结
Selector 作为多路复用 I/O 模型的核心组件,能够同时监控多路 I/O 通道。选择器在 select 就绪事件地时候会阻塞,在处理 I/O 事件的时候也会阻塞,它的优势在于在阻塞的时候可以等待多路 I/O 就绪,是一种异步阻塞 I/O 模型。与多线程处理多路 I/O 相比,多路复用模型只需要单个线程即可处理万级连接,没有线程切换的开销。