Netty的EventLoopGroup是什么

Netty的EventLoopGroup是什么

近期看到了一个 手写EventLoopGroup理解原理 的视频,觉得挺有意思的。

EventLoopGroup 与 EventLoop

如果查看一下 Netty EventLoopGroup 的继承关系,我们会发现什么:

image-20260511153523304

EventLoopGroup 继承了 ScheduledExecutorService,进而也继承了 ExecutorService,所以 EventLoopGroup 其实就是一个线程池,并且是能执行定时任务的线程池

1
2
3
4
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
@Override
EventLoopGroup parent();
}

而 EventLoop 又继承了 EventLoopGroup,所以 EventLoop 其实也是一个线程池,但它是使用一个线程执行所有任务的线程池

那 EventLoopGroup 相比 JDK 提供的线程池有什么不同呢?

EventLoopGroup 与 JDK 线程池

来看一段 demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) {
EventLoopGroup eventLoopGroup = new DefaultEventLoopGroup(6);
for (int i = 0; i < 10; i++) {
eventLoopGroup.execute(() -> System.out.println(Thread.currentThread().getName()));
}

ExecutorService executorService = new ThreadPoolExecutor(
6, 6, 120L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000000),
new ThreadPoolExecutor.AbortPolicy()
);
for (int i = 0; i < 10; i++) {
executorService.execute(() -> System.out.println(Thread.currentThread().getName()));
}
}

无论执行多少次,eventLoopGroup 总是 1-4 号线程打印了两次,而 executorService 则不固定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
defaultEventLoopGroup-2-1
defaultEventLoopGroup-2-2
defaultEventLoopGroup-2-3
defaultEventLoopGroup-2-4
defaultEventLoopGroup-2-5
defaultEventLoopGroup-2-6
defaultEventLoopGroup-2-1
defaultEventLoopGroup-2-2
defaultEventLoopGroup-2-4
defaultEventLoopGroup-2-3
pool-1-thread-2
pool-1-thread-1
pool-1-thread-4
pool-1-thread-1
pool-1-thread-3
pool-1-thread-6
pool-1-thread-2
pool-1-thread-1
pool-1-thread-5
pool-1-thread-4

据此我们合理推测:EventLoopGroup 里的线程其实是有顺序的,换句话说,EventLoopGroup 可以保证由某个线程来处理提交的任务

Netty 的 “无锁串行化” 设计,要求一个 channel 的所有事件只能被同一个线程执行,这也是 Netty 设计 EventLoopGroup 的原因,它会把传进来的任务进行分组,交给对应的线程执行。

1
2
3
4
5
6
public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
@Override
public void execute(Runnable command) {
next().execute(command);
}
}

next() 方法的返回值就是一个 EventLoop:

1
2
3
4
5
6
7
public interface EventLoopGroup extends EventExecutorGroup {
/**
* Return the next {@link EventLoop} to use
*/
@Override
EventLoop next();
}

由于 next() 调度的原因,在 demo 中前 4 个 线程会再执行一次打印线程名的任务,同时由于 EventLoopGroup 继承了 ScheduledExecutorService,它还拥有执行定时任务的能力。

EventLoopGroup 与 JDK 线程池还有显著不同的一点,在它的构造方法里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {

children = new EventExecutor[nThreads];

for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {

}
}

}

即:EventLoopGroup 会在创建时就启动所有的核心线程,这与常规的 JDK 线程池的工作方式截然不同。

由于 EventLoopGroup 的每一个线程都是线程池,线程池中会有任务队列存放来不及执行的任务,所以每一个 EventLoop 都有一个任务队列,这点也导致了一个显著不同:EventLoopGroup 每个线程绑定一个任务队列,而 JDK 线程池多线程共享线程队列,所以 JDK 线程池中的任务可能被任意空闲线程执行,存在线程切换和锁竞争。

整理一下 EventLoopGroup 与 JDK 线程池的区别:

  • EventLoopGroup 线程有顺序,可以保证指定线程处理提交的某个任务;
  • EventLoopGroup 在创建时就启动所有的核心线程,JDK 线程池则是来任务先创建到核心线程数,再加到任务队列,最后增加到最大线程数的逻辑;
  • EventLoopGroup 每个线程负责队列任务的全生命周期,而 JDK 线程池可能存在执行任务的线程切换。

手写 EventLoopGroup

由上述分析知,EventLoopGroup 其实是实现了执行普通任务和定时任务的线程池:

自定义 CusEventLoopGroup:

1
2
3
4
5
6
7
8
9
10
11
public interface CusEventLoopGroup {

void execute(Runnable task);

void schedule(Runnable command, long initialDelay, TimeUnit unit);

void scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

CusEventLoop next();

}

自定义 CusEventLoop:

1
2
3
public interface CusEventLoop extends CusEventLoopGroup {

}

自定义 CusEventLoopGroup 实现类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class CusEventLoopGroupImpl implements CusEventLoopGroup {

private final CusEventLoop[] children;

AtomicInteger index = new AtomicInteger(0);

public CusEventLoopGroupImpl(int threadNum) {
children = new CusEventLoopImpl[threadNum];
for (int i = 0; i < threadNum; i++) {
children[i] = new CusEventLoopImpl();
}
}

@Override
public void execute(Runnable task) {
next().execute(task);
}

@Override
public void schedule(Runnable command, long initialDelay, TimeUnit unit) {
next().schedule(command, initialDelay, unit);
}

@Override
public void scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
next().scheduleAtFixedRate(command, initialDelay, period, unit);
}

@Override
public CusEventLoop next() {
return children[index.getAndIncrement() % children.length];
}

}

CusEventLoopGroupImpl 的两个特点:

  • 创建时就初始化所有线程;
  • 真正的实现调用 next() 获得的 CusEventLoop 来实现。

自定义 CusEventLoop 实现类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class CusEventLoopImpl implements CusEventLoop {

private final BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<>(1024);

public CusEventLoopImpl() {
Thread thread = new Thread(() -> {
while (true) {
try {
Runnable take = taskQueue.take();
take.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
thread.start();
}

@Override
public void execute(Runnable task) {
if (!taskQueue.offer(task)) {
throw new RuntimeException("队列已满,无法再分配任务");
}
}

@Override
public void schedule(Runnable command, long initialDelay, TimeUnit unit) {

}

@Override
public void scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {

}

@Override
public CusEventLoop next() {
return this;
}

}

由于 EventLoop 还需要执行定时任务,所以还需要一个队列存放定时任务,常见的实现有 DelayedWorkQueue、PriorityQueue,我们可以自定义任务类型实现 Delayed 接口,这个接口的方法可以知道距离下次执行的时间,以及如何与其他 task 进行比较:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/**
* 定时任务封装类,用于支持延迟执行和固定频率执行
*/
public class ScheduledTask implements Delayed, Runnable {

private final Runnable command;
@Getter
private volatile long executeTime; // 执行时间戳(毫秒)
private final long period; // 执行周期(毫秒),0表示一次性任务
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private final CusEventLoop eventLoop;

public ScheduledTask(Runnable command, long delay, TimeUnit unit, CusEventLoop eventLoop) {
this(command, delay, 0, unit, eventLoop);
}

public ScheduledTask(Runnable command, long initialDelay, long period, TimeUnit unit, CusEventLoop eventLoop) {
this.command = command;
this.executeTime = System.currentTimeMillis() + unit.toMillis(initialDelay);
this.period = unit.toMillis(period);
this.eventLoop = eventLoop;
}

/**
* 更新下一次执行时间(用于固定频率任务)
*/
private void updateExecuteTime() {
this.executeTime = System.currentTimeMillis() + period;
}

/**
* 取消任务
*/
public void cancel() {
cancelled.set(true);
}

@Override
public long getDelay(TimeUnit unit) {
long delay = executeTime - System.currentTimeMillis();
return unit.convert(delay, TimeUnit.MILLISECONDS);
}

@Override
public int compareTo(Delayed other) {
if (this == other) {
return 0;
}
ScheduledTask otherTask = (ScheduledTask) other;
long diff = this.executeTime - otherTask.executeTime;
return Long.compare(diff, 0);
}

@Override
public void run() {
if (!cancelled.get()) {
try {
command.run();
if (period > 0) {
updateExecuteTime();
eventLoop.getScheduledTaskQueue().offer(this);
}
} catch (Exception e) {
// 防止任务异常影响事件循环
e.printStackTrace();
}
}
}

}

再添加完定时任务队列后,线程获取一个任务就比较复杂了,因为完全有可能在等待定时任务的过程中来了一个普通任务,重新实现的 CusEventLoopImpl 如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public class CusEventLoopImpl implements CusEventLoop {

private final BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<>(1024);
private final DelayQueue<ScheduledTask> scheduledTaskQueue = new DelayQueue<>();
private final Runnable WAKE_UP = () -> {

};

public CusEventLoopImpl() {
Thread thread = new Thread(() -> {
while (true) {
Runnable task = getTask();
if (task != null) {
task.run();
}
}
});
thread.start();
}

private Runnable getTask() {
ScheduledTask scheduledTask = scheduledTaskQueue.peek();
if (scheduledTask == null) {
Runnable task = null;
try {
// 有可能在这里阻塞的时候来了一个可以执行的定时任务
task = taskQueue.take();
if (task == WAKE_UP) {
task = null;
}
} catch (InterruptedException ignore) {
}
return task;
}
long executeTime = scheduledTask.getExecuteTime();
if (System.currentTimeMillis() >= executeTime) {
// 这里注意, scheduledTaskQueue.poll() 未必是上面的 scheduledTask,有可能在这个判断的时间内有新的 scheduledTask 来了
// 但是既然能 poll 出来,说明它的执行时间肯定也到了
return scheduledTaskQueue.poll();
}

Runnable task = null;
try {
// 有可能在这里阻塞的时候来了一个可以执行的定时任务
task = taskQueue.poll(System.currentTimeMillis() - executeTime, TimeUnit.MILLISECONDS);
if (task == WAKE_UP) {
task = null;
}
} catch (InterruptedException ignore) {
}
return task;
}

@Override
public void execute(Runnable task) {
if (!taskQueue.offer(task)) {
throw new RuntimeException("队列已满,无法再分配任务");
}
}

@Override
public void schedule(Runnable command, long initialDelay, TimeUnit unit) {
ScheduledTask scheduledTask = new ScheduledTask(command, initialDelay, unit, this);
scheduledTaskQueue.offer(scheduledTask);
execute(WAKE_UP);
}

@Override
public void scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
ScheduledTask scheduledTask = new ScheduledTask(command, initialDelay, period, unit, this);
scheduledTaskQueue.offer(scheduledTask);
execute(WAKE_UP);
}

@Override
public CusEventLoop next() {
return this;
}

@Override
public Queue<ScheduledTask> getScheduledTaskQueue() {
return scheduledTaskQueue;
}
}

由于固定频率执行的任务在执行完后还要加到定时任务队列中,所以需要在创建定时任务时传递一个 EventLoop(暴露出获取定时任务队列的方法)。同时如果需要实现指定线程执行:可以通过自定义 Runnable 添加一个指定线程在 EventLoopGroup 的索引,或重写 next() 方法来实现。

现在思考一个这样的场景:EventLoop 执行完任务,可能需要重新把任务添加到队列中,同时 acceptor 可能接收了新连接,需要把这个新连接的后续任务交给 EventLoop 处理,双方可能同时要添加任务到队列,Netty 使用了 MpscQueue 使多生产者可以无锁添加任务。

虽然这样的 demo 并不能处理网络请求,但描述了 netty 处理任务的大致流程。

小结

EventLoopGroup 底层是支持普通任务 + 定时任务的特殊线程池,继承了 JDK 定时线程池接口;而 EventLoop 既是组也是单个事件循环线程,一个 EventLoop 绑定唯一线程,独享任务队列。和 JDK 线程池对比有以下区别:

  • 任务调度:EventLoopGroup 按固定规则分配线程,同一个 Channel 所有事件始终绑定同一个 EventLoop 线程,实现无锁串行化;JDK 线程池任务随机被空闲线程执行,存在线程切换与锁竞争。
  • 线程创建:EventLoopGroup 初始化时就一次性创建并启动所有工作线程;JDK 线程池是按需懒创建线程。
  • 队列模型:EventLoop 是每个线程独享一个任务队列;JDK 线程池是所有线程共享一个全局任务队列

Netty 的这套 EventLoop 模型,通过线程绑定、串行执行、队列隔离,规避了多线程锁竞争和上下文切换开销,是 Netty 高性能、高并发、低延迟的核心基石。


Netty的EventLoopGroup是什么
https://zhuwenjie0716.github.io/2026/05/12/Netty的EventLoopGroup是什么/
作者
Wenjie Zhu
发布于
2026年5月12日
许可协议