java epoll select-ag凯发k8国际
点击上方 java后端,选择 设为星标
优质文章,及时送达作者 | hongjie
链接 | javadoop.com/post/nio-and-aio本文将介绍非阻塞 io 和异步 io,也就是大家耳熟能详的 nio 和 aio。很多初学者可能分不清楚异步和非阻塞的区别,只是在各种场合能听到异步非阻塞这个词。
本文会先介绍并演示阻塞模式,然后引入非阻塞模式来对阻塞模式进行优化,最后再介绍 jdk7 引入的异步 io,由于网上关于异步 io 的介绍相对较少,所以这部分内容我会介绍得具体一些。
希望看完本文,读者可以对非阻塞 io 和异步 io 的迷雾看得更清晰些,或者为初学者解开一丝丝疑惑也是好的。
阻塞模式 io
我们已经介绍过使用 java nio 包组成一个简单的客户端-服务端网络通讯所需要的 serversocketchannel、socketchannel 和 buffer,我们这里整合一下它们,给出一个完整的可运行的例子:
public classserver{publicstaticvoidmain(string[] args) throws ioexception {
serversocketchannel serversocketchannel = serversocketchannel.open();// 监听 8080 端口进来的 tcp 链接
serversocketchannel.socket().bind(new inetsocketaddress(8080));while (true) {// 这里会阻塞,直到有一个请求的连接进来
socketchannel socketchannel = serversocketchannel.accept();// 开启一个新的线程来处理这个请求,然后在 while 循环中继续监听 8080 端口
sockethandler handler = new sockethandler(socketchannel);new thread(handler).start();
}
}
}
这里看一下新的线程需要做什么,sockethandler:
public classsockethandlerimplementsrunnable{private socketchannel socketchannel;publicsockethandler(socketchannel socketchannel) {this.socketchannel = socketchannel;
}@overridepublicvoidrun() {
bytebuffer buffer = bytebuffer.allocate(1024);try {// 将请求数据读入 buffer 中int num;while ((num = socketchannel.read(buffer)) > 0) {// 读取 buffer 内容之前先 flip 一下
buffer.flip();// 提取 buffer 中的数据byte[] bytes = new byte[num];
buffer.get(bytes);
string re = new string(bytes, "utf-8");
system.out.println("收到请求:" re);// 回应客户端
bytebuffer writebuffer = bytebuffer.wrap(("我已经收到你的请求,你的请求内容是:" re).getbytes());
socketchannel.write(writebuffer);
buffer.clear();
}
} catch (ioexception e) {
ioutils.closequietly(socketchannel);
}
}
}
最后,贴一下客户端 socketchannel 的使用,客户端比较简单:
public classsocketchanneltest{publicstaticvoidmain(string[] args) throws ioexception {
socketchannel socketchannel = socketchannel.open();
socketchannel.connect(new inetsocketaddress("localhost", 8080));// 发送请求
bytebuffer buffer = bytebuffer.wrap("1234567890".getbytes());
socketchannel.write(buffer);// 读取响应
bytebuffer readbuffer = bytebuffer.allocate(1024);int num;if ((num = socketchannel.read(readbuffer)) > 0) {
readbuffer.flip();byte[] re = new byte[num];
readbuffer.get(re);
string result = new string(re, "utf-8");
system.out.println("返回值: " result);
}
}
}
上面介绍的阻塞模式的代码应该很好理解:来一个新的连接,我们就新开一个线程来处理这个连接,之后的操作全部由那个线程来完成。
那么,这个模式下的性能瓶颈在哪里呢?
非阻塞 io
说完了阻塞模式的使用及其缺点以后,我们这里就可以介绍非阻塞 io 了。非阻塞 io 的核心在于使用一个 selector 来管理多个通道,可以是 socketchannel,也可以是 serversocketchannel,将各个通道注册到 selector 上,指定监听的事件。之后可以只用一个线程来轮询这个 selector,看看上面是否有通道是准备好的,当通道准备好可读或可写,然后才去开始真正的读写,这样速度就很快了。我们就完全没有必要给每个通道都起一个线程。nio 中 selector 是对底层操作系统实现的一个抽象,管理通道状态其实都是底层系统实现的,这里简单介绍下在不同系统下的实现。select:上世纪 80 年代就实现了,它支持注册 fd_setsize(1024) 个 socket,在那个年代肯定是够用的,不过现在嘛,肯定是不行了。poll:1997 年,出现了 poll 作为 select 的替代者,最大的区别就是,poll 不再限制 socket 数量。select 和 poll 都有一个共同的问题,那就是它们都只会告诉你有几个通道准备好了,但是不会告诉你具体是哪几个通道。所以,一旦知道有通道准备好以后,自己还是需要进行一次扫描,显然这个不太好,通道少的时候还行,一旦通道的数量是几十万个以上的时候,扫描一次的时间都很可观了,时间复杂度 o(n)。所以,后来才催生了以下实现。epoll:2002 年随 linux 内核 2.5.44 发布,epoll 能直接返回具体的准备好的通道,时间复杂度 o(1)。除了 linux 中的 epoll,2000 年 freebsd 出现了 kqueue,还有就是,solaris 中有 /dev/poll。前面说了那么多实现,但是没有出现 windows,windows 平台的非阻塞 io 使用 select,我们也不必觉得 windows 很落后,在 windows 中 iocp 提供的异步 io 是比较强大的。我们回到 selector,毕竟 jvm 就是这么一个屏蔽底层实现的平台,我们面向 selector 编程就可以了。之前在介绍 selector 的时候已经了解过了它的基本用法,这边来一个可运行的实例代码,大家不妨看看:public classselectorserver{publicstaticvoidmain(string[] args) throws ioexception {selector selector = selector.open();
serversocketchannel server = serversocketchannel.open();
server.socket().bind(new inetsocketaddress(8080));// 将其注册到 selector 中,监听 op_accept 事件
server.configureblocking(false);
server.register(selector, selectionkey.op_accept);while (true) {int readychannels = selector.select();if (readychannels == 0) {continue;
}
set readykeys = selector.selectedkeys();// 遍历
iterator iterator = readykeys.iterator();while (iterator.hasnext()) {
selectionkey key = iterator.next();
iterator.remove();if (key.isacceptable()) {// 有已经接受的新的到服务端的连接
socketchannel socketchannel = server.accept();// 有新的连接并不代表这个通道就有数据,// 这里将这个新的 socketchannel 注册到 selector,监听 op_read 事件,等待数据
socketchannel.configureblocking(false);
socketchannel.register(selector, selectionkey.op_read);
} else if (key.isreadable()) {// 有数据可读// 上面一个 if 分支中注册了监听 op_read 事件的 socketchannel
socketchannel socketchannel = (socketchannel) key.channel();
bytebuffer readbuffer = bytebuffer.allocate(1024);int num = socketchannel.read(readbuffer);if (num > 0) {// 处理进来的数据...
system.out.println("收到数据:" new string(readbuffer.array()).trim());
bytebuffer buffer = bytebuffer.wrap("返回给客户端的数据...".getbytes());
socketchannel.write(buffer);
} else if (num == -1) {// -1 代表连接已经关闭
socketchannel.close();
}
}
}
}
}
}至于客户端,大家可以继续使用上一节介绍阻塞模式时的客户端进行测试。
nio.2 异步 io
more new io,或称 nio.2,随 jdk 1.7 发布,包括了引入异步 io 接口和 paths 等文件访问接口。异步这个词,我想对于绝大多数开发者来说都很熟悉,很多场景下我们都会使用异步。通常,我们会有一个线程池用于执行异步任务,提交任务的线程将任务提交到线程池就可以立马返回,不必等到任务真正完成。如果想要知道任务的执行结果,通常是通过传递一个回调函数的方式,任务结束后去调用这个函数。同样的原理,java 中的异步 io 也是一样的,都是由一个线程池来负责执行任务,然后使用回调或自己去查询结果。大部分开发者都知道为什么要这么设计了,这里再啰嗦一下。异步 io 主要是为了控制线程数量,减少过多的线程带来的内存消耗和 cpu 在线程调度上的开销。在 unix/linux 等系统中,jdk 使用了并发包中的线程池来管理任务,具体可以查看 asynchronouschannelgroup 的源码。在 windows 操作系统中,提供了一个叫做 i/o completion ports 的方案,通常简称为 iocp,操作系统负责管理线程池,其性能非常优异,所以在 windows 中 jdk 直接采用了 iocp 的支持,使用系统支持,把更多的操作信息暴露给操作系统,也使得操作系统能够对我们的 io 进行一定程度的优化。在 linux 中其实也是有异步 io 系统实现的,但是限制比较多,性能也一般,所以 jdk 采用了自建线程池的方式。本文还是以实用为主,想要了解更多信息请自行查找其他资料,下面对 java 异步 io 进行实践性的介绍。总共有三个类需要我们关注,分别是 asynchronoussocketchannel,asynchronousserversocketchannel 和 asynchronousfilechannel,只不过是在之前介绍的 filechannel、socketchannel 和 serversocketchannel 的类名上加了个前缀 asynchronous。java 异步 io 提供了两种使用方式,分别是返回 future 实例和使用回调函数。1、返回 future 实例
返回 java.util.concurrent.future 实例的方式我们应该很熟悉,jdk 线程池就是这么使用的。future 接口的几个方法语义在这里也是通用的,这里先做简单介绍。- future.isdone();判断操作是否已经完成,包括了正常完成、异常抛出、取消
- future.cancel(true);取消操作,方式是中断。参数 true 说的是,即使这个任务正在执行,也会进行中断。
- future.iscancelled();是否被取消,只有在任务正常结束之前被取消,这个方法才会返回 true
- future.get();这是我们的老朋友,获取执行结果,阻塞。
- future.get(10, timeunit.seconds);如果上面的 get() 方法的阻塞你不满意,那就设置个超时时间。
2、提供 completionhandler 回调函数
java.nio.channels.completionhandler 接口定义:public interfacecompletionhandler{voidcompleted(v result, a attachment);voidfailed(throwable exc, a attachment);}注意,参数上有个 attachment,虽然不常用,我们可以在各个支持的方法中传递这个参数值asynchronousserversocketchannel listener = asynchronousserversocketchannel.open().bind(null);// accept 方法的第一个参数可以传递 attachment
listener.accept(attachment, new completionhandler() {publicvoidcompleted(
asynchronoussocketchannel client, object attachment) {//
}publicvoidfailed(throwable exc, object attachment) {//
}
});
asynchronousfilechannel
网上关于 non-blocking io 的介绍文章很多,但是 asynchronous io 的文章相对就少得多了,所以我这边会多介绍一些相关内容。首先,我们就来关注异步的文件 io,前面我们说了,文件 io 在所有的操作系统中都不支持非阻塞模式,但是我们可以对文件 io 采用异步的方式来提高性能。下面,我会介绍 asynchronousfilechannel 里面的一些重要的接口,都很简单,读者要是觉得无趣,直接滑到下一个标题就可以了。实例化:asynchronousfilechannel channel = asynchronousfilechannel.open(paths.get("/users/hongjie/test.txt"));一旦实例化完成,我们就可以着手准备将数据读入到 buffer 中:bytebuffer buffer = bytebuffer.allocate(1024);future result = channel.read(buffer, 0);异步文件通道的读操作和写操作都需要提供一个文件的开始位置,文件开始位置为 0除了使用返回 future 实例的方式,也可以采用回调函数进行操作,接口如下:public abstract voidread(bytebuffer dst,
long position,
a attachment,
completionhandler handler);顺便也贴一下写操作的两个版本的接口:publicabstract future write(bytebuffer src, long position);public abstract voidwrite(bytebuffer src,
long position,
a attachment,
completionhandler handler);我们可以看到,aio 的读写主要也还是与 buffer 打交道,这个与 nio 是一脉相承的。另外,还提供了用于将内存中的数据刷入到磁盘的方法:publicabstractvoidforce(boolean metadata) throws ioexception;因为我们对文件的写操作,操作系统并不会直接针对文件操作,系统会缓存,然后周期性地刷入到磁盘。如果希望将数据及时写入到磁盘中,以免断电引发部分数据丢失,可以调用此方法。参数如果设置为 true,意味着同时也将文件属性信息更新到磁盘。还有,还提供了对文件的锁定功能,我们可以锁定文件的部分数据,这样可以进行排他性的操作。publicabstract future lock(long position, long size, boolean shared);position 是要锁定内容的开始位置,size 指示了要锁定的区域大小,shared 指示需要的是共享锁还是排他锁当然,也可以使用回调函数的版本:public abstract voidlock(long position,
long size,
boolean shared,
a attachment,
completionhandler handler);文件锁定功能上还提供了 trylock 方法,此方法会快速返回结果:publicabstract filelock trylock(long position, long size, boolean shared)
throws ioexception;这个方法很简单,就是尝试去获取锁,如果该区域已被其他线程或其他应用锁住,那么立刻返回 null,否则返回 filelock 对象。asynchronousfilechannel 操作大体上也就以上介绍的这些接口,还是比较简单的,这里就少一些废话早点结束好了。
asynchronousserversocketchannel
这个类对应的是非阻塞 io 的 serversocketchannel,大家可以类比下使用方式。我们就废话少说,用代码说事吧:package com.javadoop.aio;import java.io.ioexception;import java.net.inetsocketaddress;import java.net.socketaddress;import java.nio.bytebuffer;import java.nio.channels.asynchronousserversocketchannel;import java.nio.channels.asynchronoussocketchannel;import java.nio.channels.completionhandler;public classserver{publicstaticvoidmain(string[] args) throws ioexception {// 实例化,并监听端口asynchronousserversocketchannel server =
asynchronousserversocketchannel.open().bind(new inetsocketaddress(8080));// 自己定义一个 attachment 类,用于传递一些信息
attachment att = new attachment();
att.setserver(server);
server.accept(att, new completionhandler() {@overridepublicvoidcompleted(asynchronoussocketchannel client, attachment att) {try {
socketaddress clientaddr = client.getremoteaddress();
system.out.println("收到新的连接:" clientaddr);// 收到新的连接后,server 应该重新调用 accept 方法等待新的连接进来
att.getserver().accept(att, this);
attachment newatt = new attachment();
newatt.setserver(server);
newatt.setclient(client);
newatt.setreadmode(true);
newatt.setbuffer(bytebuffer.allocate(2048));// 这里也可以继续使用匿名实现类,不过代码不好看,所以这里专门定义一个类
client.read(newatt.getbuffer(), newatt, new channelhandler());
} catch (ioexception ex) {
ex.printstacktrace();
}
}@overridepublicvoidfailed(throwable t, attachment att) {
system.out.println("accept failed");
}
});// 为了防止 main 线程退出try {
thread.currentthread().join();
} catch (interruptedexception e) {
}
}
}看一下 channelhandler 类:package com.javadoop.aio;import java.io.ioexception;import java.nio.bytebuffer;import java.nio.channels.completionhandler;import java.nio.charset.charset;public classchannelhandlerimplementscompletionhandler{@overridepublicvoidcompleted(integer result, attachment att) {if (att.isreadmode()) {// 读取来自客户端的数据
bytebuffer buffer = att.getbuffer();
buffer.flip();byte bytes[] = new byte[buffer.limit()];
buffer.get(bytes);
string msg = new string(buffer.array()).tostring().trim();
system.out.println("收到来自客户端的数据: " msg);// 响应客户端请求,返回数据
buffer.clear();
buffer.put("response from server!".getbytes(charset.forname("utf-8")));
att.setreadmode(false);
buffer.flip();// 写数据到客户端也是异步
att.getclient().write(buffer, att, this);
} else {// 到这里,说明往客户端写数据也结束了,有以下两种选择:// 1. 继续等待客户端发送新的数据过来// att.setreadmode(true);// att.getbuffer().clear();// att.getclient().read(att.getbuffer(), att, this);// 2. 既然服务端已经返回数据给客户端,断开这次的连接try {
att.getclient().close();
} catch (ioexception e) {
}
}
}@overridepublicvoidfailed(throwable t, attachment att) {
system.out.println("连接断开");
}
}顺便再贴一下自定义的 attachment 类:public classattachment{private asynchronousserversocketchannel server;private asynchronoussocketchannel client;private boolean isreadmode;private bytebuffer buffer;// getter & setter
}这样,一个简单的服务端就写好了,接下来可以接收客户端请求了。上面我们用的都是回调函数的方式,读者要是感兴趣,可以试试写个使用 future 的。
asynchronoussocketchannel
其实,说完上面的 asynchronousserversocketchannel,基本上读者也就知道怎么使用 asynchronoussocketchannel 了,和非阻塞 io 基本类似。这边做个简单演示,这样读者就可以配合之前介绍的 server 进行测试使用了。package com.javadoop.aio;import java.io.ioexception;import java.net.inetsocketaddress;import java.nio.bytebuffer;import java.nio.channels.asynchronoussocketchannel;import java.nio.charset.charset;import java.util.concurrent.executionexception;import java.util.concurrent.future;public classclient{publicstaticvoidmain(string[] args) throws exception {asynchronoussocketchannel client = asynchronoussocketchannel.open();// 来个 future 形式的
future> future = client.connect(new inetsocketaddress(8080));// 阻塞一下,等待连接成功
future.get();
attachment att = new attachment();
att.setclient(client);
att.setreadmode(false);
att.setbuffer(bytebuffer.allocate(2048));byte[] data = "i am obot!".getbytes();
att.getbuffer().put(data);
att.getbuffer().flip();// 异步发送数据到服务端
client.write(att.getbuffer(), att, new clientchannelhandler());// 这里休息一下再退出,给出足够的时间处理数据
thread.sleep(2000);
}
}往里面看下 clientchannelhandler 类:package com.javadoop.aio;import java.io.ioexception;import java.nio.bytebuffer;import java.nio.channels.completionhandler;import java.nio.charset.charset;public classclientchannelhandlerimplementscompletionhandler{@overridepublicvoidcompleted(integer result, attachment att) {
bytebuffer buffer = att.getbuffer();if (att.isreadmode()) {// 读取来自服务端的数据
buffer.flip();byte[] bytes = new byte[buffer.limit()];
buffer.get(bytes);
string msg = new string(bytes, charset.forname("utf-8"));
system.out.println("收到来自服务端的响应数据: " msg);// 接下来,有以下两种选择:// 1. 向服务端发送新的数据// att.setreadmode(false);// buffer.clear();// string newmsg = "new message from client";// byte[] data = newmsg.getbytes(charset.forname("utf-8"));// buffer.put(data);// buffer.flip();// att.getclient().write(buffer, att, this);// 2. 关闭连接try {
att.getclient().close();
} catch (ioexception e) {
}
} else {// 写操作完成后,会进到这里
att.setreadmode(true);
buffer.clear();
att.getclient().read(buffer, att, this);
}
}@overridepublicvoidfailed(throwable t, attachment att) {
system.out.println("服务器无响应");
}
}以上代码都是可以运行调试的,如果读者碰到问题,请在评论区留言。
asynchronous channel groups
为了知识的完整性,有必要对 group 进行介绍,其实也就是介绍 asynchronouschannelgroup 这个类。之前我们说过,异步 io 一定存在一个线程池,这个线程池负责接收任务、处理 io 事件、回调等。这个线程池就在 group 内部,group 一旦关闭,那么相应的线程池就会关闭。asynchronousserversocketchannels 和 asynchronoussocketchannels 是属于 group 的,当我们调用 asynchronousserversocketchannel 或 asynchronoussocketchannel 的 open() 方法的时候,相应的 channel 就属于默认的 group,这个 group 由 jvm 自动构造并管理。如果我们想要配置这个默认的 group,可以在 jvm 启动参数中指定以下系统变量:- java.nio.channels.defaultthreadpool.threadfactory此系统变量用于设置 threadfactory,它应该是 java.util.concurrent.threadfactory 实现类的全限定类名。一旦我们指定了这个 threadfactory 以后,group 中的线程就会使用该类产生。
- java.nio.channels.defaultthreadpool.initialsize此系统变量也很好理解,用于设置线程池的初始大小。
- asynchronouschannelgroup.withcachedthreadpool(executorservice executor, int initialsize)
- asynchronouschannelgroup.withfixedthreadpool(int nthreads, threadfactory threadfactory)
- asynchronouschannelgroup.withthreadpool(executorservice executor)
.withfixedthreadpool(10, executors.defaultthreadfactory());
asynchronousserversocketchannel server = asynchronousserversocketchannel.open(group);
asynchronoussocketchannel client = asynchronoussocketchannel.open(group);asynchronousfilechannels 不属于 group。但是它们也是关联到一个线程池的,如果不指定,会使用系统默认的线程池,如果想要使用指定的线程池,可以在实例化的时候使用以下方法:publicstatic asynchronousfilechannel open(path file,
set extends openoption> options,
executorservice executor,
fileattribute>... attrs) {
...
}到这里,异步 io 就算介绍完成了。
小结
我想,本文应该是说清楚了非阻塞 io 和异步 io 了,对于异步 io,由于网上的资料比较少,所以不免篇幅多了些。我们也要知道,看懂了这些,确实可以学到一些东西,多了解一些知识,但是我们还是很少在工作中将这些知识变成工程代码。一般而言,我们需要在网络应用中使用 nio 或 aio 来提升性能,但是,在工程上,绝不是了解了一些概念,知道了一些接口就可以的,需要处理的细节还非常多。这也是为什么 netty/mina 如此盛行的原因,因为它们帮助封装好了很多细节,提供给我们用户友好的接口,后面有时间我也会对 netty 进行介绍。-end-
学java,请关注公众号:java后端
喜欢文章,点个在看总结
以上是ag凯发k8国际为你收集整理的java epoll select_java 非阻塞 io 和异步 io的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: javascript 本地对象和内置对象
- 下一篇: