Netty如何实现轻量级对象池 对于 Netty 这种高吞吐、低延迟的网络框架中,对象(尤其是 ByteBuf)的创建和释放极为频繁。如果每次都通过 new 来创建对象,容易产生以下问题:
GC 频繁 :大量短生命周期对象会频繁触发 Young GC,影响系统稳定。
延迟抖动 :GC 的 Stop-The-World 事件会导致请求延迟不稳定。
内存碎片 :频繁的分配和释放会导致堆内存碎片化。
Netty 实现了轻量级的对象池 Recycler,专门用来重用那些创建开销大、使用频繁的对象,从而显著降低 GC 压力,提升系统性能。它被广泛应用于 Netty 的核心组件中,例如 PooledDirectByteBuf。
Recycler 结构 Recycler 还是比较复杂的,核心结构如图:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 【线程 A (比如 Netty 的 EventLoop)】 │ └─── Stack (线程A的私有栈,LIFO) │ ├─── [0 ] → Handle (对象X,空闲) ├─── [1 ] → Handle (对象Y,空闲) ├─── [2 ] → Handle (对象Z,使用中) ├─── ... │ └─── head (指针) ----------------------┐ │ 【WeakOrderQueue 链表 (接收其他线程的归还)】 │ │ │ ▼ │ WeakOrderQueue (for 线程B) ◄────────────┘ │ └─── Link1 → [Handle(对象M), Handle(对象N), ...] └─── Link2 → [Handle(对象P), ...] │ ▼ WeakOrderQueue (for 线程C) │ └─── Link1 → [Handle(对象Q), ...] │ ▼ WeakOrderQueue (for 线程D) ...
上图的核心组件包括:Stack 和 WeakOrderQueue。
当程序在调用 Recycler.get() 方法时,代码会首先尝试获取当前线程的 Stack,如果不存在就会创建,Stack 中存的是当前线程创建的已经空闲的对象。
Stack 中会有 head 指针指向一个 WeakOrderQueue 的链表,这里存其它线程归还给该线程的对象。例如线程A创建了一个对象,使用完后传递给了线程B处理,线程B使用完后进行归还,最终就会放到线程A 的某个 WeakOrderQueue 中(其中都是线程B归还的属于线程A创建的对象)。
注意对象会被封装为 DefaultHandle。
Recycler 获取对象 Stack 的核心定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 static final class Stack <T> { final Recycler<T> parent; final WeakReference<Thread> threadRef; final AtomicInteger availableSharedCapacity; final int maxDelayedQueues; private final int maxCapacity; private final int ratioMask; private DefaultHandle<?>[] elements; private int size; private int handleRecycleCount = -1 ; private WeakOrderQueue cursor, prev; private volatile WeakOrderQueue head; }
获取对象的大致逻辑是:优先寻找线程私有的 Stack,再找其他线程归还给该线程的 WeakOrderQuene,如果都没有这样的对象,就新建一个。
1 2 3 4 5 6 7 8 9 10 11 12 public final T get () { if (maxCapacityPerThread == 0 ) { return newObject((Handle<T>) NOOP_HANDLE); } Stack<T> stack = threadLocal.get(); DefaultHandle<T> handle = stack.pop(); if (handle == null ) { handle = stack.newHandle(); handle.value = newObject(handle); } return (T) handle.value; }
可以看到对象是通过 stack.pop() 找到的,但该对象并不一定是本来就在 stack 中的,看下 pop 的源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 DefaultHandle<T> pop () { int size = this .size; if (size == 0 ) { if (!scavenge()) { return null ; } size = this .size; } size --; DefaultHandle ret = elements[size]; elements[size] = null ; if (ret.lastRecycledId != ret.recycleId) { throw new IllegalStateException ("recycled multiple times" ); } ret.recycleId = 0 ; ret.lastRecycledId = 0 ; this .size = size; return ret; }
如果 Stack 的 elements 数组中有可用的对象实例,直接将对象实例弹出;如果 elements 数组中没有可用的对象实例,会调用 scavenge 方法。scavenge 翻译为拾荒,作用是从其他线程回收的对象实例中转移一些到 elements 数组中。
scavenge 方法会从 stack 对象 cursor 指针指向的 WeakOrderQueue 开始遍历,cursor 会保留上次遍历的位置,这样的好处是不会每次都优先遍历前面位置的 WeakOrderQueue,分摊跨线程回收对象的压力。
此外,每次移动 cursor 时,都会检查 WeakOrderQueue 对应的线程是否已经退出了,如果线程已经退出,那么线程中的对象实例都会被回收,然后将 WeakOrderQueue 节点从链表中移除。
每个 WeakOrderQueue 中都包含一个 Link 链表,Netty 每次会回收其中的一个 Link 节点所存储的对象。从图中可以看出,Link 内部会包含一个读指针 readIndex,每个 Link 节点默认存储 16 个对象,读指针到链表尾部就是可以用于回收的对象实例,每次回收对象时,readIndex 都会从上一次记录的位置开始回收。
Recycler 对象回收 Recycler 对象回收比获取对象要简单一些。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public void recycle (Object object) { if (object != value) { throw new IllegalArgumentException ("object does not belong to handle" ); } Stack<?> stack = this .stack; if (lastRecycledId != recycleId || stack == null ) { throw new IllegalStateException ("recycled already" ); } stack.push(this ); }void push (DefaultHandle<?> item) { Thread currentThread = Thread.currentThread(); if (threadRef.get() == currentThread) { pushNow(item); } else { pushLater(item, currentThread); } }
可以看到会根据当前线程是否是该创建该对象的线程,走 pushNow 和 pushLater 的逻辑。
同线程回收 同线程回收对象的逻辑非常简单,就是直接向 Stack 的 elements 数组中添加数据,对象会被存放在栈顶指针指向的位置。如果超过了 Stack 的最大容量,那么对象会被直接丢弃,同样这里使用了 dropHandle 方法控制对象的回收速率,每 8 个对象会有一个被回收到 Stack 中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private void pushNow (DefaultHandle<?> item) { if ((item.recycleId | item.lastRecycledId) != 0 ) { throw new IllegalStateException ("recycled already" ); } item.recycleId = item.lastRecycledId = OWN_THREAD_ID; int size = this .size; if (size >= maxCapacity || dropHandle(item)) { return ; } if (size == elements.length) { elements = Arrays.copyOf(elements, min(size << 1 , maxCapacity)); } elements[size] = item; this .size = size + 1 ; }
异线程回收 异线程回收对象时,异线程会在自己的本地看是否有归还给原线程的 WeakOrderQueue,如果没有则新建,然后把要归还的对象添加到这个 WeakOrderQueue 中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private void pushLater (DefaultHandle<?> item, Thread thread) { Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get(); WeakOrderQueue queue = delayedRecycled.get(this ); if (queue == null ) { if (delayedRecycled.size() >= maxDelayedQueues) { delayedRecycled.put(this , WeakOrderQueue.DUMMY); return ; } if ((queue = WeakOrderQueue.allocate(this , thread)) == null ) { return ; } delayedRecycled.put(this , queue); } else if (queue == WeakOrderQueue.DUMMY) { return ; } queue.add(item); }
添加到 quene 的源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 void add (DefaultHandle<?> handle) { handle.lastRecycledId = id; Link tail = this .tail; int writeIndex; if ((writeIndex = tail.get()) == LINK_CAPACITY) { if (!head.reserveSpace(LINK_CAPACITY)) { return ; } this .tail = tail = tail.next = new Link (); writeIndex = tail.get(); } tail.elements[writeIndex] = handle; handle.stack = null ; tail.lazySet(writeIndex + 1 ); }
在向 WeakOrderQueue 写入对象之前,会先判断 Link 链表的 tail 节点是否还有空间存放对象。如果还有空间,直接向 tail Link 尾部写入数据,否则直接丢弃对象。如果 tail Link 已经没有空间,会新建一个 Link 之后再存放对象,新建 Link 之前会检查异线程帮助回收的对象总数超过了 Stack 设置的阈值,如果超过了阈值,那么对象也会被丢弃掉。
所以异线程B在归还对象给线程A时,经过以下的步骤:
获取自己的归还对象的容器 Map<Stack<?>, WeakOrderQueue>;
通过要归还对象关联的 Stack<?> (对象是线程A创建的那么就是线程A的 Stack)获取对应的 WeakOrderQueue,如果不存在则新建(注意新建时会关联到线程A的 WeakOrderQueue 链表上);
将要归还的对象添加到 WeakOrderQueue 中。
注意:是从线程B获取要归还给线程A的 WeakOrderQueue,而不是从线程A获取线程B要归还的 WeakOrderQueue。
小结 Netty 的轻量级对象池 Recycler 在 Netty 里面使用是非常频繁的,比较常用的像 PooledHeapByteBuf 和 PooledDirectByteBuf,分别对应的堆内存和堆外内存的池化实现。
我们在使用 PooledDirectByteBuf 的时候,并不是每次都去创建新的对象实例,而是从对象池中获取预先分配好的对象实例,不再使用 PooledDirectByteBuf 时,被回收归还到对象池中。
对象池和连接池有些相似之处,例如都复用对象减少创建销毁对象时的开销。不过连接池一般都是 Thread 的容器,向其中提交任务,选择一个 Thread 执行任务;而对象池是获取对象,由取对象的这个线程进行后续操作。