Netty是怎么实现JavaNIO多路复用的?(源码)
- 开源代码
- 2025-09-22 00:27:02

目录 NIO多路复用实现事件循环是什么?核心源码(1)调用 NioEventLoopGroup 默认构造器(2)指定 SelectorProvider(3)创建 `Selector`(4)创建单线程和队列(5)单线程处理就绪IO事件
最近想再巩固一下NIO等多路复用的实现思路,本文通过Netty源码来进一步总结NIO多路复用的运用。
先上一组简单的NIO多路复用实现,
NIO多路复用实现服务端通过selector组件轮询处理就绪IO事件,一个线程可以支持处理多个网络连接。
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.Set; public class NIOServer { private static final int PORT = 8080; private static final int BUFFER_SIZE = 1024; public static void main(String[] args) { try { // 1. 创建Selector Selector selector = Selector.open(); // 2. 创建ServerSocketChannel并绑定端口 ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.socket().bind(new InetSocketAddress(PORT)); serverChannel.configureBlocking(false); // 非阻塞模式 // 3. 注册ServerSocketChannel到Selector,监听ACCEPT事件 serverChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("服务器启动,监听端口: " + PORT); while (true) { // 4. 阻塞等待就绪的事件(有新连接、数据可读等) selector.select(); // 5. 获取所有就绪的事件集合 Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iter = selectedKeys.iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); // 必须手动移除,防止重复处理 if (key.isAcceptable()) { // 6. 处理ACCEPT事件:接受客户端连接 handleAcceptMethod(key, selector); } else if (key.isReadable()) { // 7. 处理READ事件:读取客户端数据 handleReadMethod(key); } } } } catch (IOException e) { ... } } ... }Netty 是基于Java NIO实现异步事件驱动的高性能网络应用框架,接下来结合Netty源码,看看在Netty的事件循环EventLoop中,是怎么实现NIO的多路复用的。
事件循环是什么?(1)NioEventLoop 是事件循环核心组件,基于Java NIO实现。
职责:负责I/O事件处理,通过Selector轮询Channel的读写事件(如accept、read、write),触发对应Handler执行。
作用:
每个NioEventLoop绑定一个独立线程,以串行无锁化方式处理所有注册到其上的Channel,确保线程安全。通过单线程处理多Channel的就绪事件,减少上下文切换,支撑高并发低延迟;(2)NioEventLoopGroup 为事件循环组,主要是为NioEventLoop提供线程资源调度和事件循环管理;
职责:每个 NioEventLoopGroup 管理一组 NioEventLoop,统一管理组内所有NioEventLoop的启动、任务执行、资源释放,通过next()方法轮询分配 EventLoop,实现负载均衡。
其中,
Boss Group:服务端专用,负责监听 TCP 连接(accept事件),将新连接分配给 Worker Group。Worker Group:处理已建立连接的 I/O 读写(read/write事件)及异步任务。 核心源码 (1)调用 NioEventLoopGroup 默认构造器 /** * boss 线程组,用于服务端接受客户端的连接 */ private EventLoopGroup bossGroup = new NioEventLoopGroup(); /** * worker 线程组,用于服务端接受客户端的数据读写 */ private EventLoopGroup workerGroup = new NioEventLoopGroup(); (2)指定 SelectorProvider可以看到 Netty 帮我们传入了 SelectorProvider.provider(),用于后续 NIO 创建 Selector 、Channel 等关键组件。
// io.netty.channel.nio.NioEventLoopGroup // 传入 SelectorProvider.provider() public NioEventLoopGroup(int nThreads, Executor executor) { this(nThreads, executor, SelectorProvider.provider()); } // 默认拒绝策略,直接排除异常 - RejectedExecutionHandlers.reject() public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); } // 真正创建 EventLoop - 单线程 // 会在下面父类 MultithreadEventExecutorGroup 构造器中被调用 @Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null; return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory); }super(…) 进入父类 MultithreadEventLoopGroup 构造器,默认创建 2倍CPU核心个数 的 EventLoop。
// io.netty.channel.MultithreadEventLoopGroup private static final int DEFAULT_EVENT_LOOP_THREADS; static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); ... } // 默认 EventLoop个数为 2倍CPU核心数 protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); } // 构造器,开始初始化 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { // 新建 EventLoop 对象,在上述 NioEventLoopGroup 中实现了 children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); } (3)创建 Selector回头来看 NioEventLoop 构造器,每个 NioEventLoop 都会创建并维护一个 Selector 。
Tips: Netty 对 Selector 做了优化, 即下面的 unwrappedSelector,在初始化时,会调用 openSelector() 方法,强制替换 Selector 的 selectedKeys 为 SelectedSelectionKeySet(内部是数组实现)。
// io.netty.channel.nio.NioEventLoop NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) { // 创建了两个任务队列,普通任务队列(taskQueue) 和 尾部任务队列(tailTaskQueue) super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler); this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider"); this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy"); final SelectorTuple selectorTuple = openSelector(); // 新建并维护 Selector 事件轮询器 this.selector = selectorTuple.selector; this.unwrappedSelector = selectorTuple.unwrappedSelector; } // 默认使用 无锁高性能队列(如 MpscChunkedArrayQueue),特点是 多生产者单消费者(MPSC),避免多线程竞争。 private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) { return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.newMpscQueue() : PlatformDependent.newMpscQueue(maxPendingTasks); } // 普通任务队列(taskQueue):处理IO事件以及用户提交的任务 channel.eventLoop().execute(...) // 尾部任务队列(tailTaskQueue):处理 低优先级或非紧急任务 (4)创建单线程和队列接着构造,每个 NioEventLoop 会维护 任务队列以及一个线程用于任务执行和调度。
进入父类 -> SingleThreadEventLoop 构造器
// io.netty.channel.SingleThreadEventLoop protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue, RejectedExecutionHandler rejectedExecutionHandler) { // 传入刚刚创建的 普通任务队列 super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler); // 传入尾部任务队列 tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue"); }进入父类 -> SingleThreadEventExecutor 构造器,实现了execute() 方法进行任务调度,
// io.netty.util.concurrent.SingleThreadEventExecutor protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) { super(parent); this.threadLock = new CountDownLatch(1); this.shutdownHooks = new LinkedHashSet(); this.state = 1; this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS; this.executor = ThreadExecutorMap.apply(executor, this); // 任务队列,用于接收任务 this.taskQueue = (Queue)ObjectUtil.checkNotNull(taskQueue, "taskQueue"); this.rejectedExecutionHandler = (RejectedExecutionHandler)ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); } /** 核心执行任务逻辑 **/ private void execute(Runnable task, boolean immediate) { // 检查是否在事件循环线程内 boolean inEventLoop = this.inEventLoop(); // 添加任务到 任务队列中 this.addTask(task); // 若首次不在事件循环线程内,即首次提交任务,需要触发线程启动 if (!inEventLoop) { // 启动线程 this.startThread(); if (this.isShutdown()) { boolean reject = false; try { if (this.removeTask(task)) { reject = true; } } catch (UnsupportedOperationException var6) { } if (reject) { reject(); } } } if (!this.addTaskWakesUp && immediate) { this.wakeup(inEventLoop); } } /** 在这里 EventLoop 会启动一个线程, 用于执行任务 **/ private void startThread() { if (this.state == 1 && STATE_UPDATER pareAndSet(this, 1, 2)) { boolean success = false; try { this.doStartThread(); success = true; } finally { if (!success) { STATE_UPDATER pareAndSet(this, 2, 1); } } } } /** this.executor 默认是 ThreadPerTaskExecutor,该执行器为每个任务创建一个新线程,因此每个 NioEventLoop 都会绑定一个独立线程 **/ private void doStartThread() { assert this.thread == null; // ThreadPerTaskExecutor 会创建一个新的线程 this.executor.execute(new Runnable() { public void run() { ... // 执行事件循环 - NioEventLoop 具体实现 run() SingleThreadEventExecutor.this.run(); ... } }); } (5)单线程处理就绪IO事件回到 NioEventLoop,查看 run() 方法处理就绪的I/O事件
// 主要框架如下, protected void run() { for (;;) { // 无限循环 // 1. 轮询 I/O 事件(strategy = Selector.select) // 2. 处理 I/O 事件(selector.selectedKeys() --> processSelectedKeys) // 3. 执行任务队列(runAllTasks) } }源码如下,
// io.netty.channel.nio.NioEventLoop @Override protected void run() { // 记录连续无有效事件时的 select 调用次数(用于检测空轮询) int selectCnt = 0; for (;;) { try { // (1) 超时等待 I/O事件就绪 -> select int strategy; try { strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO case SelectStrategy.SELECT: long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); try { // 任务队列为空,超时等待 I/O 事件 if (!hasTasks()) { strategy = select(curDeadlineNanos); } } finally { // This update is just to help block unnecessary selector wakeups // so use of lazySet is ok (no race condition) nextWakeupNanos.lazySet(AWAKE); } // fall through default: } } catch (IOException e) { // If we receive an IOException here its because the Selector is messed up. Let's rebuild // the selector and retry. github /netty/netty/issues/8566 rebuildSelector0(); // 出现异常,重置 select 计数 selectCnt = 0; handleLoopException(e); continue; } // (2) 执行IO事件以及执行任务 selectCnt++; // 递增 select 调用次数 cancelledKeys = 0; needsToSelectAgain = false; // 获取 I/O 时间占比(默认 50,即 50% 时间处理 I/O,50% 处理任务) final int ioRatio = this.ioRatio; boolean ranTasks; // 根据 ioRatio 分配 I/O 处理与任务执行时间 if (ioRatio == 100) {// 全时处理 I/O,任务无时间限制 try { if (strategy > 0) { // 有就绪I/O事件 processSelectedKeys();// 处理 I/O 事件(如 accept、read、write) } } finally { // 执行所有任务(普通队列 + 尾部队列) ranTasks = runAllTasks(); } } else if (strategy > 0) { // 限时处理IO事件 final long ioStartTime = System.nanoTime(); try { processSelectedKeys();// 处理 I/O 事件(如 accept、read、write) } finally { // 限时执行所有任务(普通队列 + 尾部队列) final long ioTime = System.nanoTime() - ioStartTime; ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else {// 无 I/O 事件(strategy == 0),仅执行任务 ranTasks = runAllTasks(0); } // (3) 检测空轮询和异常唤醒 if (ranTasks || strategy > 0) { if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } // 正常情况:执行了任务或处理了 I/O 事件,重置计数 selectCnt = 0; } else if (unexpectedSelectorWakeup(selectCnt)) { // 异常唤醒(如未处理事件却唤醒) selectCnt = 0; // 重置计数 } } catch (CancelledKeyException e) { // Harmless exception - log anyway if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } } catch (Throwable t) { handleLoopException(t); } // 处理一些关闭逻辑... try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } } /** 查询就绪I/O事件,调用 select 方法 **/ private int select(long deadlineNanos) throws IOException { if (deadlineNanos == NONE) { return selector.select(); } // Timeout will only be 0 if deadline is within 5 microsecs long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L; return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis); } /** 处理就绪的IO事件,selector.selectedKeys() 获取就绪事件 **/ private void processSelectedKeys() { // selectedKeys 为null说明Netty启用了优化模式,通过反射替换 JDK 原生的 Selector 实现,将 selectedKeys 替换为高性能的数组结构(SelectedSelectionKeySet),实现高效添加和遍历,避免Hash冲突的开销,默认开启优化模式 if (selectedKeys != null) { // 优化模式,SelectedSelectionKeySet selectedKeys 数组存储就绪事件 processSelectedKeysOptimized(); } else { // 普通模式 processSelectedKeysPlain(selector.selectedKeys()); } } // 非优化模式下的处理IO事件 private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) { if (selectedKeys.isEmpty()) { return; } // 遍历 selectedKeys 并执行相应Channel的事件 Iterator<SelectionKey> i = selectedKeys.iterator(); for (;;) { final SelectionKey k = i.next(); final Object a = k.attachment(); i.remove(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (!i.hasNext()) { break; } // run 方法已经设置 needsToSelectAgain = false if (needsToSelectAgain) { selectAgain(); selectedKeys = selector.selectedKeys(); // Create the iterator again to avoid ConcurrentModificationException if (selectedKeys.isEmpty()) { break; } else { i = selectedKeys.iterator(); } } } } private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); ... try { int readyOps = k.readyOps();// 获取就绪的操作类型 // 处理连接就绪事件(客户端已建立连接) if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT;// 移除 CONNECT 监听 k.interestOps(ops); unsafe.finishConnect(); } // 处理写就绪事件(数据可写) if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); // Socket 缓冲区可写,强制刷新待发送数据 } // 处理读就绪或 Accept 事件(数据可读/新连接) if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); // 触发读取操作(如读取数据或 Accept 连接) } } catch (CancelledKeyException ignored) { // Key 被取消,关闭 Channel unsafe.close(unsafe.voidPromise()); } } /** Netty是走的优化模式,主要在于遍历方式的区别,SelectedSelectionKeySet 是数组存储遍历高效,没有迭代器开销 **/ private void processSelectedKeysOptimized() { // 便利数组 for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; // null out entry in the array to allow to have it GC'ed once the Channel close // See github /netty/netty/issues/2363 selectedKeys.keys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); // 同上 } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See github /netty/netty/issues/2363 selectedKeys.reset(i + 1); selectAgain(); i = -1; } }总结,NIO多路复用核心思想就是单线程管理多通道:通过一个Selector(选择器)监控多个通道(Channel)的I/O事件(如连接、读、写),由单线程轮询就绪事件,避免为每个连接创建独立线程,实现非阻塞高效并发。本质上用事件通知机制替代线程轮询,以最小线程开销处理海量连接。
Netty是怎么实现JavaNIO多路复用的?(源码)由讯客互联开源代码栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“Netty是怎么实现JavaNIO多路复用的?(源码)”