Netty的EventLoopGroup是什么 近期看到了一个 手写EventLoopGroup理解原理 的视频,觉得挺有意思的。
EventLoopGroup 与 EventLoop 如果查看一下 Netty EventLoopGroup 的继承关系,我们会发现什么:
EventLoopGroup 继承了 ScheduledExecutorService,进而也继承了 ExecutorService,所以 EventLoopGroup 其实就是一个线程池,并且是能执行定时任务的线程池 。
1 2 3 4 public interface EventLoop extends OrderedEventExecutor , EventLoopGroup { @Override EventLoopGroup parent () ; }
而 EventLoop 又继承了 EventLoopGroup,所以 EventLoop 其实也是一个线程池,但它是使用一个线程执行所有任务的线程池 。
那 EventLoopGroup 相比 JDK 提供的线程池有什么不同呢?
EventLoopGroup 与 JDK 线程池 来看一段 demo:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public static void main (String[] args) { EventLoopGroup eventLoopGroup = new DefaultEventLoopGroup (6 ); for (int i = 0 ; i < 10 ; i++) { eventLoopGroup.execute(() -> System.out.println(Thread.currentThread().getName())); } ExecutorService executorService = new ThreadPoolExecutor ( 6 , 6 , 120L , TimeUnit.SECONDS, new ArrayBlockingQueue <>(1000000 ), new ThreadPoolExecutor .AbortPolicy() ); for (int i = 0 ; i < 10 ; i++) { executorService.execute(() -> System.out.println(Thread.currentThread().getName())); } }
无论执行多少次,eventLoopGroup 总是 1-4 号线程打印了两次,而 executorService 则不固定。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 defaultEventLoopGroup-2-1 defaultEventLoopGroup-2-2 defaultEventLoopGroup-2-3 defaultEventLoopGroup-2-4 defaultEventLoopGroup-2-5 defaultEventLoopGroup-2-6 defaultEventLoopGroup-2-1 defaultEventLoopGroup-2-2 defaultEventLoopGroup-2-4 defaultEventLoopGroup-2-3 pool-1-thread-2 pool-1-thread-1 pool-1-thread-4 pool-1-thread-1 pool-1-thread-3 pool-1-thread-6 pool-1-thread-2 pool-1-thread-1 pool-1-thread-5 pool-1-thread-4
据此我们合理推测:EventLoopGroup 里的线程其实是有顺序的,换句话说,EventLoopGroup 可以保证由某个线程来处理提交的任务。
Netty 的 “无锁串行化” 设计,要求一个 channel 的所有事件只能被同一个线程执行,这也是 Netty 设计 EventLoopGroup 的原因,它会把传进来的任务进行分组,交给对应的线程执行。
1 2 3 4 5 6 public abstract class AbstractEventExecutorGroup implements EventExecutorGroup { @Override public void execute (Runnable command) { next().execute(command); } }
而 next() 方法的返回值就是一个 EventLoop:
1 2 3 4 5 6 7 public interface EventLoopGroup extends EventExecutorGroup { @Override EventLoop next () ; }
由于 next() 调度的原因,在 demo 中前 4 个 线程会再执行一次打印线程名的任务,同时由于 EventLoopGroup 继承了 ScheduledExecutorService,它还拥有执行定时任务的能力。
EventLoopGroup 与 JDK 线程池还有显著不同的一点,在它的构造方法里:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 protected MultithreadEventExecutorGroup (int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { children = new EventExecutor [nThreads]; for (int i = 0 ; i < nThreads; i ++) { boolean success = false ; try { children[i] = newChild(executor, args); success = true ; } catch (Exception e) { throw new IllegalStateException ("failed to create a child event loop" , e); } finally { } } }
即:EventLoopGroup 会在创建时就启动所有的核心线程,这与常规的 JDK 线程池的工作方式截然不同。
由于 EventLoopGroup 的每一个线程都是线程池,线程池中会有任务队列存放来不及执行的任务,所以每一个 EventLoop 都有一个任务队列,这点也导致了一个显著不同:EventLoopGroup 每个线程绑定一个任务队列,而 JDK 线程池多线程共享线程队列,所以 JDK 线程池中的任务可能被任意空闲线程执行,存在线程切换和锁竞争。
整理一下 EventLoopGroup 与 JDK 线程池的区别:
EventLoopGroup 线程有顺序,可以保证指定线程处理提交的某个任务;
EventLoopGroup 在创建时就启动所有的核心线程,JDK 线程池则是来任务先创建到核心线程数,再加到任务队列,最后增加到最大线程数的逻辑;
EventLoopGroup 每个线程负责队列任务的全生命周期,而 JDK 线程池可能存在执行任务的线程切换。
手写 EventLoopGroup 由上述分析知,EventLoopGroup 其实是实现了执行普通任务和定时任务的线程池:
自定义 CusEventLoopGroup:
1 2 3 4 5 6 7 8 9 10 11 public interface CusEventLoopGroup { void execute (Runnable task) ; void schedule (Runnable command, long initialDelay, TimeUnit unit) ; void scheduleAtFixedRate (Runnable command, long initialDelay, long period, TimeUnit unit) ; CusEventLoop next () ; }
自定义 CusEventLoop:
1 2 3 public interface CusEventLoop extends CusEventLoopGroup { }
自定义 CusEventLoopGroup 实现类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class CusEventLoopGroupImpl implements CusEventLoopGroup { private final CusEventLoop[] children; AtomicInteger index = new AtomicInteger (0 ); public CusEventLoopGroupImpl (int threadNum) { children = new CusEventLoopImpl [threadNum]; for (int i = 0 ; i < threadNum; i++) { children[i] = new CusEventLoopImpl (); } } @Override public void execute (Runnable task) { next().execute(task); } @Override public void schedule (Runnable command, long initialDelay, TimeUnit unit) { next().schedule(command, initialDelay, unit); } @Override public void scheduleAtFixedRate (Runnable command, long initialDelay, long period, TimeUnit unit) { next().scheduleAtFixedRate(command, initialDelay, period, unit); } @Override public CusEventLoop next () { return children[index.getAndIncrement() % children.length]; } }
CusEventLoopGroupImpl 的两个特点:
创建时就初始化所有线程;
真正的实现调用 next() 获得的 CusEventLoop 来实现。
自定义 CusEventLoop 实现类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public class CusEventLoopImpl implements CusEventLoop { private final BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue <>(1024 ); public CusEventLoopImpl () { Thread thread = new Thread (() -> { while (true ) { try { Runnable take = taskQueue.take(); take.run(); } catch (InterruptedException e) { throw new RuntimeException (e); } } }); thread.start(); } @Override public void execute (Runnable task) { if (!taskQueue.offer(task)) { throw new RuntimeException ("队列已满,无法再分配任务" ); } } @Override public void schedule (Runnable command, long initialDelay, TimeUnit unit) { } @Override public void scheduleAtFixedRate (Runnable command, long initialDelay, long period, TimeUnit unit) { } @Override public CusEventLoop next () { return this ; } }
由于 EventLoop 还需要执行定时任务,所以还需要一个队列存放定时任务,常见的实现有 DelayedWorkQueue、PriorityQueue,我们可以自定义任务类型实现 Delayed 接口,这个接口的方法可以知道距离下次执行的时间,以及如何与其他 task 进行比较:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 public class ScheduledTask implements Delayed , Runnable { private final Runnable command; @Getter private volatile long executeTime; private final long period; private final AtomicBoolean cancelled = new AtomicBoolean (false ); private final CusEventLoop eventLoop; public ScheduledTask (Runnable command, long delay, TimeUnit unit, CusEventLoop eventLoop) { this (command, delay, 0 , unit, eventLoop); } public ScheduledTask (Runnable command, long initialDelay, long period, TimeUnit unit, CusEventLoop eventLoop) { this .command = command; this .executeTime = System.currentTimeMillis() + unit.toMillis(initialDelay); this .period = unit.toMillis(period); this .eventLoop = eventLoop; } private void updateExecuteTime () { this .executeTime = System.currentTimeMillis() + period; } public void cancel () { cancelled.set(true ); } @Override public long getDelay (TimeUnit unit) { long delay = executeTime - System.currentTimeMillis(); return unit.convert(delay, TimeUnit.MILLISECONDS); } @Override public int compareTo (Delayed other) { if (this == other) { return 0 ; } ScheduledTask otherTask = (ScheduledTask) other; long diff = this .executeTime - otherTask.executeTime; return Long.compare(diff, 0 ); } @Override public void run () { if (!cancelled.get()) { try { command.run(); if (period > 0 ) { updateExecuteTime(); eventLoop.getScheduledTaskQueue().offer(this ); } } catch (Exception e) { e.printStackTrace(); } } } }
再添加完定时任务队列后,线程获取一个任务就比较复杂了,因为完全有可能在等待定时任务的过程中来了一个普通任务,重新实现的 CusEventLoopImpl 如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 public class CusEventLoopImpl implements CusEventLoop { private final BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue <>(1024 ); private final DelayQueue<ScheduledTask> scheduledTaskQueue = new DelayQueue <>(); private final Runnable WAKE_UP = () -> { }; public CusEventLoopImpl () { Thread thread = new Thread (() -> { while (true ) { Runnable task = getTask(); if (task != null ) { task.run(); } } }); thread.start(); } private Runnable getTask () { ScheduledTask scheduledTask = scheduledTaskQueue.peek(); if (scheduledTask == null ) { Runnable task = null ; try { task = taskQueue.take(); if (task == WAKE_UP) { task = null ; } } catch (InterruptedException ignore) { } return task; } long executeTime = scheduledTask.getExecuteTime(); if (System.currentTimeMillis() >= executeTime) { return scheduledTaskQueue.poll(); } Runnable task = null ; try { task = taskQueue.poll(System.currentTimeMillis() - executeTime, TimeUnit.MILLISECONDS); if (task == WAKE_UP) { task = null ; } } catch (InterruptedException ignore) { } return task; } @Override public void execute (Runnable task) { if (!taskQueue.offer(task)) { throw new RuntimeException ("队列已满,无法再分配任务" ); } } @Override public void schedule (Runnable command, long initialDelay, TimeUnit unit) { ScheduledTask scheduledTask = new ScheduledTask (command, initialDelay, unit, this ); scheduledTaskQueue.offer(scheduledTask); execute(WAKE_UP); } @Override public void scheduleAtFixedRate (Runnable command, long initialDelay, long period, TimeUnit unit) { ScheduledTask scheduledTask = new ScheduledTask (command, initialDelay, period, unit, this ); scheduledTaskQueue.offer(scheduledTask); execute(WAKE_UP); } @Override public CusEventLoop next () { return this ; } @Override public Queue<ScheduledTask> getScheduledTaskQueue () { return scheduledTaskQueue; } }
由于固定频率执行的任务在执行完后还要加到定时任务队列中,所以需要在创建定时任务时传递一个 EventLoop(暴露出获取定时任务队列的方法)。同时如果需要实现指定线程执行:可以通过自定义 Runnable 添加一个指定线程在 EventLoopGroup 的索引,或重写 next() 方法来实现。
现在思考一个这样的场景:EventLoop 执行完任务,可能需要重新把任务添加到队列中,同时 acceptor 可能接收了新连接,需要把这个新连接的后续任务交给 EventLoop 处理,双方可能同时要添加任务到队列,Netty 使用了 MpscQueue 使多生产者可以无锁添加任务。
虽然这样的 demo 并不能处理网络请求,但描述了 netty 处理任务的大致流程。
小结 EventLoopGroup 底层是支持普通任务 + 定时任务的特殊线程池 ,继承了 JDK 定时线程池接口;而 EventLoop 既是组也是单个事件循环线程,一个 EventLoop 绑定唯一线程 ,独享任务队列。和 JDK 线程池对比有以下区别:
任务调度:EventLoopGroup 按固定规则分配线程,同一个 Channel 所有事件始终绑定同一个 EventLoop 线程 ,实现无锁串行化 ;JDK 线程池任务随机被空闲线程执行,存在线程切换与锁竞争。
线程创建:EventLoopGroup 初始化时就一次性创建并启动所有工作线程 ;JDK 线程池是按需懒创建线程。
队列模型:EventLoop 是每个线程独享一个任务队列 ;JDK 线程池是所有线程共享一个全局任务队列 。
Netty 的这套 EventLoop 模型,通过线程绑定、串行执行、队列隔离 ,规避了多线程锁竞争和上下文切换开销,是 Netty 高性能、高并发、低延迟的核心基石。