所以就出现了NIO,非阻塞式线程模型,在连接请求时和读取数据时,都不会阻塞也可以叫 New Blocking IO,因为它可以设置为阻塞和非阻塞两种形态。
大概解释一下下面这堆代码
1、同样的,也是先创建一个socket连接通道,然后向外提供一个9000的监听端口。
2、设置死循环和接收连接请求时不阻塞,当有连接进来时,会去建立连接并放入到一个 SocketChannel 集合中,如下所示:
List<SocketChannel> channelList = new ArrayList<>();
3、然后每次去循环遍历这个数组去读取他么所拿到的数据,
public class NioServer {
// 保存客户端连接
static List<SocketChannel> channelList = new ArrayList<>();
public static void main(String[] args) throws IOException {
// 创建NIO ServerSocketChannel,与BIO的serverSocket类似
ServerSocketChannel serverSocket = ServerSocketChannel.open();
// 绑定端口号 9000
serverSocket.socket().bind(new InetSocketAddress(9000));
// 设置ServerSocketChannel为非阻塞
serverSocket.configureBlocking(false);
System.out.println("服务启动成功");
while (true) {
// 这个 while 循环就一直在跑
// 非阻塞模式 accept 方法不会阻塞,否则会阻塞
// NIO的非阻塞是由操作系统内部实现的,底层调用了linux内核的accept函数
SocketChannel socketChannel = serverSocket.accept();
// 如果有客户端进行连接
if (socketChannel != null) {
System.out.println("连接成功");
// 设置SocketChannel为非阻塞,读取数据前不阻塞
socketChannel.configureBlocking(false);
// 保存客户端连接在List中,将刚刚客户端与服务器建立的通道放到这个list中
channelList.add(socketChannel);
}
// 遍历连接进行数据读取,不管这个通道有木有数据都会遍历
Iterator<SocketChannel> iterator = channelList.iterator();
while (iterator.hasNext()) {
SocketChannel sc = iterator.next();
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
// 非阻塞模式read方法不会阻塞,否则会阻塞
int len = sc.read(byteBuffer);
// 如果有数据,把数据打印出来
if (len > 0) {
System.out.println("接收到消息:" + new String(byteBuffer.array()));
// 如果客户端断开,把socket从集合中去掉
} else if (len == -1) {
iterator.remove();
System.out.println("客户端断开连接");
}
}
}
}
}
可以仿照 BIO 那篇博客的测试方法。
由上面代码我们可以分析出,由于我们将所有的连接通道都放到了一个list集合里面,所以每一次在循环遍历这个list的时候,会将所有建立过连接的通道全部遍历一遍,如果说现在有10000个通道都建立的连接,但是收发数据的通道只有100个,那么剩下的9900个通道的遍历就是无效的,这样很浪费资源。
在 NIO 的原始版本中,出现的问题是遍历了无效的连接通道(也就是没有数据收发的通道连接),多路复用器就相当于是一个服务员,中间人,他去收集哪些通道有数据发送,然后交给主线程来遍历读取数据,这样的话就解决了无效遍历的问题。
先是一个图的大概讲解,这个图有点抽象,后续有时间尽量改进
大概挑几个重点解释一下
首先多路复用器最重要的地方就是三个方法
// 创建一个多路复用器
Selector selector = Selector.open();
// 将我们创建的通道注册到多路复用器上
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
// 取出请求事件的集合列表
selector.select();
open():
open方法会linux的一个库函数 epoll_create 来创建 epoll 对象,是一个文件描述符,因为linux一切皆文件嘛,这里也可以理解为返回了一个 epoll 对象的索引。
register:
这里其实就是简单的注册,里面调用了这行代码,pollWrapper.add(fd),很明显,它是将我们客户端的请求channel放到pollWrapper集合中。
select():
这里会调用linux的两个库函数 epoll_ctl epoll_wait
epoll_ctl 是遍历我们上面那个pollWrapper集合,真正的将channel和epoll关联起来,通过操作系统的硬中断,当有客户端访问进来时,会将这个访问放到一个 rdlist 上,这个就是我们最后取出来的需要循环处理的事件。
epoll_wait 函数就是回调返回事件,如果没有就阻塞,所以这行代码会阻塞
public class NioSelectorServer {
public static void main(String[] args) throws IOException {
// 创建NIO ServerSocketChannel
ServerSocketChannel serverSocket = ServerSocketChannel.open();
// 往这个channel上绑定 9000 端口,客户端就通过这个 channel 来连接服务端
serverSocket.socket().bind(new InetSocketAddress(9000));
// 设置ServerSocketChannel为非阻塞
serverSocket.configureBlocking(false);
// 打开Selector处理Channel,即创建 epoll 对象,返回了一 epoll 文件描述符
// 这里调用了linux的一个库函数 epoll_create 来创建 epoll 对象
// 也可以理解为返回了一个 epoll 对象的索引
Selector selector = Selector.open();
// 将我们创建的通道注册到 selector(多路复用器)上,并且selector对客户端accept连接操作感兴趣
// pollWrapper.add(fd) 会放到这个集合中
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务启动成功");
while (true) {
// 阻塞等待需要处理的事件发生
// 这里可以接收很多事件
// 如果是第一次循环的话,按照这个代码来说只能接收到连接事件
// 如果执行了里面的 while 循环,给它注册了其他的事件,那么它就可以感知到其他的事件的发生
// 这里会调用linux的两个库函数 epoll_ctl epoll_wait
// epoll_ctl 真正的将channel和epoll关联起来,通过操作系统的硬中断,当有客户端访问进来时,
// 会将这个访问放到一个 rdlist 上,这个就是我们最后取出来的需要循环处理的事件
// epoll_wait 函数就是回调返回事件,如果没有就阻塞,所以这行代码会阻塞
selector.select();
// 获取selector中注册的全部事件的 SelectionKey 实例
// 拿出所有感知的事件后弄一个迭代器
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
// 遍历SelectionKey对事件进行处理
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 如果是OP_ACCEPT事件,则进行连接获取和事件注册
if (key.isAcceptable()) {
// 拿出需要连接的客户端服务
ServerSocketChannel server = (ServerSocketChannel) key.channel();
// 将需要连接的客户端进行连接,并返回一个唯一channel,以便后续的读写操作
SocketChannel socketChannel = server.accept();
// 并设置读取操作时也不阻塞
socketChannel.configureBlocking(false);
// 这里只注册了读事件,如果需要给客户端发送数据可以注册写事件
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("客户端连接成功");
}
// 如果是OP_READ事件,则进行读取和打印
else if (key.isReadable()) {
// 取出前面连接时创建的唯一的读写channel
SocketChannel socketChannel = (SocketChannel) key.channel();
// 再定义一个存放数据的缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(128);
// 从读写channel中将需要读取的数据拿出来
int len = socketChannel.read(byteBuffer);
// 如果有数据,把数据打印出来
if (len > 0) {
System.out.println("接收到消息:" + new String(byteBuffer.array()));
} else if (len == -1) {
// 如果客户端断开连接,关闭Socket
System.out.println("客户端断开连接");
socketChannel.close();
}
}
//从事件集合里删除本次处理的key,防止下次select重复处理
iterator.remove();
}
}
}
}
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- igbc.cn 版权所有 湘ICP备2023023988号-5
违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务