Linux JDK NIO 源码分析 引言 多路复用是一种模型,而实现这种模型有 select、poll 和 epoll,在 linux 下 NIO 的实现默认是 epoll
当一个 Socket 成功建立,客户端向服务端发数据,内核是最先知道这个 Socket 是可读的,但是业务层也想知道这个事情,那么怎么做呢?
借鉴观察者模式的思想,当这个事情发生了,内核就开始通知这个事情的监听者,也就是 epoll 对象,epoll 对象会触发的行为已经被 linux 内核写好了,就是把这个事件放到调用阻塞方法传来的一个参数(就绪队列)上
当业务层解除阻塞返回时,传递来的这个参数(就绪队列)上就是发生的事件
这个流程在代码怎么实现呢?
1 2 3 int epfd = epoll_create()
当有客户端来连接,业务层收到了一个 Socket,希望 epoll 对象监听这个 Socket 的读事件
1 2 3 int epoll_ctl (int epfd, int ops, int fd, int events)
业务层调用开始阻塞
1 2 3 int epoll_wait (int epfd, long pollAddress, int numfds, int timeout)
首先以一段经典的 NIO 代码开始:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public static void main (String[] args) throws IOException { Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false ); serverSocketChannel.bind(new InetSocketAddress ("127.0.0.1" , 9097 )); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true ) { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); SelectableChannel channel = selectionKey.channel(); if (selectionKey.isAcceptable()) { ServerSocketChannel serverChannel = (ServerSocketChannel) channel; SocketChannel accept = serverChannel.accept(); accept.configureBlocking(false ); accept.register(selector, SelectionKey.OP_READ); } if (selectionKey.isReadable()) { SocketChannel socketChannel = (SocketChannel) channel; ByteBuffer byteBuffer = ByteBuffer.allocate(1024 ); int length = socketChannel.read(byteBuffer); if (length == -1 ) { channel.close(); } else { byteBuffer.flip(); byte [] buffer = new byte [byteBuffer.remaining()]; byteBuffer.get(buffer); System.out.println(new String (buffer)); } } } } }
重点其实就是上述代码的第2、6、8行,所以接下来也分为这三个部分进行讲解
Selector.open() 1 2 3 public static Selector open () throws IOException { return SelectorProvider.provider().openSelector(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private static class Holder { static final SelectorProvider INSTANCE = provider(); @SuppressWarnings("removal") static SelectorProvider provider () { PrivilegedAction<SelectorProvider> pa = () -> { SelectorProvider sp; if ((sp = loadProviderFromProperty()) != null ) return sp; if ((sp = loadProviderAsService()) != null ) return sp; return sun.nio.ch.DefaultSelectorProvider.get(); }; return AccessController.doPrivileged(pa); } }
第1段代码块的第2行 SelectorProvider.provider() 其实是一个静态常量,即第2段代码块中的 INSTANCE 对象,由于是 static 代码块,所以会直接按照流程进行初始化。
第7行中首先从系统属性中读取参数,如果不存在则第9行通过 SPI 机制即 ServiceLoader 加载,如果仍然不存在,则在第11行通过默认提供的 provider 进行初始化。
1 2 3 4 5 6 7 8 9 10 11 12 13 public class DefaultSelectorProvider { private static final SelectorProviderImpl INSTANCE; static { PrivilegedAction<SelectorProviderImpl> pa = EPollSelectorProvider::new ; INSTANCE = AccessController.doPrivileged(pa); } private DefaultSelectorProvider () { } public static SelectorProviderImpl get () { return INSTANCE; } }
分析 DefaultSelectorProvider,.get() 方法就是返回了 EPollSelectorProvider 创建出的一个实例,其实就是 EPollSelectorProvider。
回到最开始的代码中,SelectorProvider.provider().openSelector(),接下来就是执行 openSelector() 方法
1 2 3 4 5 6 7 public class EPollSelectorProvider extends SelectorProviderImpl { public AbstractSelector openSelector () throws IOException { return new EPollSelectorImpl (this ); } }
代码第5行创建出了我们平时在 linux 中说的 EpollSelector 实现(这里再说一下:多路复用是一个模型,select、poll、epoll 是对这个模型的实现 )
接下来看一下创建 EPollSelectorImpl 都做了什么
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 EPollSelectorImpl(SelectorProvider sp) throws IOException { super (sp); this .epfd = EPoll.create(); this .pollArrayAddress = EPoll.allocatePollArray(NUM_EPOLLEVENTS); try { this .eventfd = new EventFD (); IOUtil.configureBlocking(IOUtil.newFD(eventfd.efd()), false ); } catch (IOException ioe) { EPoll.freePollArray(pollArrayAddress); FileDispatcherImpl.closeIntFD(epfd); throw ioe; } EPoll.ctl(epfd, EPOLL_CTL_ADD, eventfd.efd(), EPOLLIN); }
第4行执行了 Epoll.create() 创建了一个 epoll 实例,epfd 即这个对象的描述符(linux万物皆文件,因此通过fd即可定位到对象),第5行分配了一部分空间并返回了空间的起始地址,这个地址在后续会用到,存放发生的事件用。
第8、9、17行创建了一个特殊的 Event,并把它的 fd 赋值给了 EPollSelectorImpl 这个实例的一个属性,其实这个 fd 是为了唤醒 epoll 使用的,我们知道在 selector.select() 过程中,如果不设置超时时间,则 epoll 会一直阻塞下去,Java NIO 设置了一个特殊的 fd,当这个 fd 发生事件时,会唤醒 epoll。
到这里 Selector.open() 的逻辑就结束了
ServerSocketChannel.register() 初次看到这个方法时可能会觉得奇怪,因为这是 socket 的 register 方法,还需要传一个 selector 进来,我们知道多路复用其实是一个 selector 处理多个 socket 的事件,那么为什么是调用 socket 的注册方法传 selector 而不是调用 selector 的注册方法传入 channel 呢?源码会告诉我们答案
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public final SelectionKey register (Selector sel, int ops, Object att) throws ClosedChannelException { if ((ops & ~validOps()) != 0 ) throw new IllegalArgumentException (); if (!isOpen()) throw new ClosedChannelException (); synchronized (regLock) { if (isBlocking()) throw new IllegalBlockingModeException (); synchronized (keyLock) { if (!isOpen()) throw new ClosedChannelException (); SelectionKey k = findKey(sel); if (k != null ) { k.attach(att); k.interestOps(ops); } else { k = ((AbstractSelector)sel).register(this , ops, att); addKey(k); } return k; } } }
第4行到第7行其实就是做了一些校验,一般都不会有什么问题,例如 serverSocketChannel 只能监听 accept 事件。
第15行到23行是 register 的重点,因为这是第一次注册,所以 findKey 一定是找不到的,因此会执行第21行,而第21行将 selector 强转为了 AbstractSelector 并调用了注册方法,传入了 当前对象(ServerSocketChannel),关心的事件和一个 null,这里其实解答了我们开始的问题,最终还是调用了 selector 的注册方法,在注册完成后把返回值添加到了某个容器里。
我们首先看 selector 的 register 是如何实现的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private final Set<SelectionKey> keys; protected final SelectionKey register (AbstractSelectableChannel ch, int ops, Object attachment) { if (!(ch instanceof SelChImpl)) throw new IllegalSelectorException (); SelectionKeyImpl k = new SelectionKeyImpl ((SelChImpl)ch, this ); if (attachment != null ) k.attach(attachment); implRegister(k); keys.add(k); try { k.interestOps(ops); } catch (ClosedSelectorException e) { assert ch.keyFor(this ) == null ; keys.remove(k); k.cancel(); throw e; } return k; }
第10行创建了一个对象,这个对象的构造器有两个参数,一个是 channel,另一个是 this(selector 对象本身),也就是说,**这个 SelectionKeyImpl 本身就关联起了 channel 和 selector**
接下里的第16行把这个关联对象加到了第2行定义的这个 Set 里,注释其实说的很清楚,这个 Set 管理了这个 selector 对象注册的所有 key。
随后的第18行,为 SelectionKeyImpl 这个关联对象注册感兴趣的事件,
1 2 3 4 5 6 7 8 9 10 11 12 public SelectionKey interestOps (int ops) { ensureValid(); if ((ops & ~channel().validOps()) != 0 ) throw new IllegalArgumentException (); int oldOps = (int ) INTERESTOPS.getAndSet(this , ops); if (ops != oldOps) { selector.setEventOps(this ); } return this ; }protected abstract void setEventOps (SelectionKeyImpl ski) ;
第7行调用了第12行的抽象函数,这个抽象函数被 EPollSelectorImpl 重写了
1 2 3 4 5 6 7 8 private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque <>();@Override public void setEventOps (SelectionKeyImpl ski) { ensureOpen(); synchronized (updateLock) { updateKeys.addLast(ski); } }
分析上述代码,这个感兴趣的事件其实没有真正的注册到内核中,只是用一个队列缓存起来了~
上述这么多的代码,其实只是完成了本小节最开始那段代码中的k = ((AbstractSelector)sel).register(this, ops, att);,为了不用回头看,下面又复制了一份本小节最开始的那段代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public final SelectionKey register (Selector sel, int ops, Object att) throws ClosedChannelException { if ((ops & ~validOps()) != 0 ) throw new IllegalArgumentException (); if (!isOpen()) throw new ClosedChannelException (); synchronized (regLock) { if (isBlocking()) throw new IllegalBlockingModeException (); synchronized (keyLock) { if (!isOpen()) throw new ClosedChannelException (); SelectionKey k = findKey(sel); if (k != null ) { k.attach(att); k.interestOps(ops); } else { k = ((AbstractSelector)sel).register(this , ops, att); addKey(k); } return k; } } }
在执行完第21行注册在 selector 中注册完成后,把返回的这个关联对象在 this(ServerSocketChannel) 中也添加起来,这里其实是添加到了一个 SelectionKey 的数组中,至于为什么在 selector 对象中用 Set 存储而在 Channel 对象中用数组存储,也是一个值得思考的问题
1 private SelectionKey[] keys = null ;
至此我们走完了第19-23行 else 的逻辑,现在开始回顾第15行的逻辑
1 2 3 4 5 6 7 8 9 private SelectionKey findKey (Selector sel) { assert Thread.holdsLock(keyLock); if (keys == null ) return null ; for (int i = 0 ; i < keys.length; i++) if ((keys[i] != null ) && (keys[i].selector() == sel)) return keys[i]; return null ; }
findKey 其实就是在当前的 SelectionKey[] 数组中找当前的 selector 是否添加过,如果已经添加过了,那么第18行会根据一些逻辑判断修改其注册的事件(当然也是添加到 EPollSelectorImpl 的 updateKeys 队列里了)。
现在解答一下为什么 Channel 中存储 SelectionKey 用数组,而 selector 中存储 SelectionKey 用 Set,每一个 SocketChannel 其实一般只有一个 Selector 注册,也就只有一个 SelectionKey,少有既注册 epoll 又注册 poll 这样的,而且缓冲区中的数据最后只能读取一次,注册多个 Selector 也没用;但 Selector 中可能注册多个 Channel,当客户端关闭连接时需要删除注册的 Channel,使用 Set 以 O(1) 的速度进行操作,这也体现了 NIO 高性能的追求。
selector.select() 这也是 NIO 中的最后一个重要的方法了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Override public final int select () throws IOException { return lockAndDoSelect(null , -1 ); }private int lockAndDoSelect (Consumer<SelectionKey> action, long timeout) throws IOException { synchronized (this ) { ensureOpen(); if (inSelect) throw new IllegalStateException ("select in progress" ); inSelect = true ; try { synchronized (publicSelectedKeys) { return doSelect(action, timeout); } } finally { inSelect = false ; } } }
代码的第9-13行使用二次检查防止重入(禁止在select的过程中再次select,而接下来才是真正的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @Override protected int doSelect (Consumer<SelectionKey> action, long timeout) throws IOException { assert Thread.holdsLock(this ); int to = (int ) Math.min(timeout, Integer.MAX_VALUE); boolean blocking = (to != 0 ); boolean timedPoll = (to > 0 ); int numEntries; processUpdateQueue(); processDeregisterQueue(); try { begin(blocking); do { long startTime = timedPoll ? System.nanoTime() : 0 ; numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to); if (numEntries == IOStatus.INTERRUPTED && timedPoll) { long adjust = System.nanoTime() - startTime; to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS); if (to <= 0 ) { numEntries = 0 ; } } } while (numEntries == IOStatus.INTERRUPTED); assert IOStatus.check(numEntries); } finally { end(blocking); } processDeregisterQueue(); return processEvents(numEntries, action); }
第7-10行,由于c语言的实现阻塞时间是一个int类型,因此这里需要转换一下。
第13行 processUpdateQueue,在上一小节中提到,会把需要注册或修改的事件存到 EPollSelectorImpl.updateKeys 这个队列里了,这里会先处理这些待更新的事件队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private void processUpdateQueue () { assert Thread.holdsLock(this ); synchronized (updateLock) { SelectionKeyImpl ski; while ((ski = updateKeys.pollFirst()) != null ) { if (ski.isValid()) { int fd = ski.getFDVal(); SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski); assert (previous == null ) || (previous == ski); int newEvents = ski.translateInterestOps(); int registeredEvents = ski.registeredEvents(); if (newEvents != registeredEvents) { if (newEvents == 0 ) { EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0 ); } else { if (registeredEvents == 0 ) { EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, newEvents); } else { EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, newEvents); } } ski.registeredEvents(newEvents); } } } } }
第9行通过一个 map 拿到 SelectionKeyImpl 这个关联对象中 channel 的 fd,如果之前没有注册过,则 previous 为 null,否则肯定和之前一样,第11行将 jdk 的事件转换为内核中的事件表示,根据 newEvents 和之前注册事件 registeredEvents 的对比,决定调用 Epoll 的哪个方法(删除、增加、修改)
在处理完待更新对象后,处理取消注册的事件也是同理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 protected final void processDeregisterQueue () throws IOException { assert Thread.holdsLock(this ); assert Thread.holdsLock(publicSelectedKeys); synchronized (cancelledKeys) { SelectionKeyImpl ski; while ((ski = cancelledKeys.pollFirst()) != null ) { implDereg(ski); selectedKeys.remove(ski); keys.remove(ski); deregister(ski); SelectableChannel ch = ski.channel(); if (!ch.isOpen() && !ch.isRegistered()) ((SelChImpl)ch).kill(); } } }
注意第11、12行都调用了 Remove 方法,这也解释了上小节中为什么 SelectorImpl 存放 Selection 对象用 Set,因为可能需要频繁查找或移除,第15行从 channel 的数组中也移除。
在处理完变动的事件后,继续分析接下来的逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 try { begin(blocking); do { long startTime = timedPoll ? System.nanoTime() : 0 ; numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to); if (numEntries == IOStatus.INTERRUPTED && timedPoll) { long adjust = System.nanoTime() - startTime; to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS); if (to <= 0 ) { numEntries = 0 ; } } } while (numEntries == IOStatus.INTERRUPTED); assert IOStatus.check(numEntries); } finally { end(blocking); } processDeregisterQueue();return processEvents(numEntries, action);
第4-17行的这个循环,其实是在第一小节中创建 epoll 时 JDK 注册了一个唤醒 epoll 用的事件,这里就是判断如果是被这个事件打断了(第16行),需要根据阻塞时间重新计算 timeout 的时间(第7到13行)
正常情况下发送了其他事件,就会退出循环,再次处理取消注册队列中的 SelectionKey,numEntries 就是返回发生事件数,接下来开始处理这些事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private int processEvents (int numEntries, Consumer<SelectionKey> action) throws IOException { assert Thread.holdsLock(this ); boolean interrupted = false ; int numKeysUpdated = 0 ; for (int i=0 ; i<numEntries; i++) { long event = EPoll.getEvent(pollArrayAddress, i); int fd = EPoll.getDescriptor(event); if (fd == eventfd.efd()) { interrupted = true ; } else { SelectionKeyImpl ski = fdToKey.get(fd); if (ski != null ) { int rOps = EPoll.getEvents(event); numKeysUpdated += processReadyEvents(rOps, ski, action); } } } if (interrupted) { clearInterrupt(); } return numKeysUpdated; }
第9行的 pollArrayAddress 其实在第一小节创建 selector 时候介绍过,是 epoll 实例用来存储 epoll_wait 函数返回事件的容器起始地址,因此可以通过这个起始地址和事件序获取到这个事件对象。
如果这个事件对象的描述符是我们在创建 epoll 对象时注册的那个中断 fd,则标记位 interrupted 修改为 true,否则从 Map<fd,SelectionKeyImpl> 这个 map 中拿到一个具体的关联对象(在本节处理 updateKeys 的队列时存放进去的)。
拿到关联对象意味着拿到了注册的事件,第17行processReadyEvents会比对真正发生的事件和注册的感兴趣的事件,如果满足某些条件才认为是应用层真正感兴趣的事件,存放到 selectedKeys 容器中,最后返回应用层真正感兴趣的事件数。
业务层在解除阻塞后调用 Set<SelectionKey> selectionKeys = selector.selectedKeys(); 其实就是返回的这些内容。因此在处理完成需要调用迭代器的 Remove 方法移除,因为 NIO 只告诉应用层这些关联对象中的 Channel 有事件,但并不知道应用层是否处理完成,需要主动移除。
总结 到这里 Java NIO 的源码就梳理结束了,以上内容均基于 jdk-17.0.17 的 linux 版本,如果是 Windows 则根本不会选 Epoll 的实现,版本不一致也可能会有小的 API 差异。
这是源码中几个重要的类的 uml 图,新手可能会很懵,什么 selector、SelectionKey、SelectorImpl 什么的,尤其是成员变量有的还一致~