时序数据库中的性能优化技术:sync.Pool 在 CPU 密集型操作中的应用

本文翻译自 https://victoriametrics.com/blog/tsdb-performance-techniques-sync-pool/

在 VictoriaMetrics 的内部实现中,大量使用了 Go 语言标准库中的sync.Pool数据结构。sync.Pool的主要作用是存储临时的、可替换的对象,以供重复使用,从而减轻垃圾回收器的负担。你可以将sync.Pool视为一种线程安全的实现空闲列表的数据结构。

sync.Pool 特别有用的一个例子是重用bytes.Buffer 对象。bytes.Buffer对象特别适用于需要读取原始数据,并暂时将其存储在内存中的场景。例如,VictoriaMetrics 在从数据库解压缩数据以及在抓取时解析指标元数据时,大量使用了bytes.Buffer对象。

重要的是,bytes.Buffer对象本质上是分配了一部分内存并内置了一些容量和可用性跟踪的临时辅助对象。由于它们是临时的辅助对象,一个bytes.Buffer可以很容易地被另一个替换。

为了避免不必要的bytes.Buffer对象分配,并减轻 GC 的压力,VictoriaMetrics 内部使用了一种数据结构,利用sync.Pool来管理已经分配的bytes.Buffer对象的生命周期。

type ByteBufferPool struct {
    p sync.Pool
}

func (bbp *ByteBufferPool) Get() *ByteBuffer {
bbv := bbp.p.Get()
if bbv == nil {
return &ByteBuffer{}
}
return bbv.(*ByteBuffer)
}

func (bbp *ByteBufferPool) Put(bb *ByteBuffer) {
bb.Reset()
bbp.p.Put(bb)
}

上面的代码定义了一个名为ByteBufferPool的类型,其中包含一个sync.Pool,用于存储bytes.Buffer对象。该类型还定义了两个方法:

  • Get 方法要么从池中返回一个 bytes.Buffer,要么分配并返回一个新的对象。
  • Put 方法重置一个 bytes.Buffer,然后将其放回池中,以便稍后可以通过 Get 方法重用。

ByteBufferPool在 VictoriaMetrics 的代码库中被广泛使用,它显著减少了需要执行的新分配次数。不过在使用时需要注意以下问题:

对象窃取

sync.Pool的原理是是通过每个 Proccesser 的 localPool 实现的。当 goroutine 被调度到与特定 processer(P)相关联的特定 thread(M)上运行,并尝试从池中检索对象时,sync.Pool首先会在当前P的 localPool 中查找。如果在 localPool 中找不到对象,它会尝试从另一个 P 的 Pool 中“窃取”一个对象。由于需要跨 CPU 同步,窃取另一个池中的对象需要时间。如果窃取失败,则会分配一个新的对象。

由于 localPool 的存在,使用 sync.Pool 的最佳场景是在同一个 goroutine 中检索和释放对象, 这样这些对象将属于 goroutine 运行的同一个 P 的 localPool。这大大减少了从 pool 中检索对象和将其返回 Pool 的上下文切换。它防止了对象窃取,并减少了总体分配的对象数量,进一步减轻了垃圾回收器的压力。

一个次优但仍然可以使用 sync.Pool 的场景是,通过加锁,让对象由单个 goroutine 处理。例如,一个 goroutine 从 Pool 中检索一个对象,然后将其传递给另一个 goroutine,后者使用该对象并将其返回 Pool 中。

在同步处理的情况下,问题在于不同的 goroutine 更有可能被调度到不同的 thread(M)上,这意味着对象将从一个 P 的 localPool 中检索出来,并返回到另一个处理器的 pool 中。这增加了 sync.Pool 窃取对象的机会,从而降低了性能。

I/O密集型任务 在 I/O 密集型任务中,使用sync.Pool来重用对象的效率要远低于在 CPU 密集型任务中重用对象。

I/O 操作可能会比较慢并且是分散的,这意味着 I/O 操作所需时间具有很大的随机性。这可能导致 GetPut 调用的次数不均衡,从而导致对象重用效果不佳:

  • 如果 I/O 操作被挂起,sync.Pool中的对象就会一直占用内存;
  • 当 I/O 操作返回结果时,sync.Pool中的对象可能已经被GC,从而需要重新分配。

由于这些原因,在 I/O 密集型任务中,依赖 sync.Pool 来管理临时对象可能无法获得预期的性能提升。在 I/O 操作期间,内存中的对象可能会因为GC而丢失,这使得 sync.Pool 减少内存分配和垃圾回收压力的优势变得不再明显。此外,I/O 操作的不可预测性可能会导致 sync.Pool 中的资源闲置或频繁地进行对象分配,从而降低了其效率。

因此,在设计系统时,应根据任务的特性(I/O 密集型还是 CPU 密集型)来决定是否以及如何在这些场景中使用 sync.Pool。对于 I/O 密集型任务,可能需要考虑其他优化策略,例如使用异步 I/O、缓冲区管理或其他减少 I/O 延迟的方法。

ByteBufferPool使用示例

在 VictoriaMetrics 中,ByteBufferPool的一个使用场景是对存储数据进行解压缩。解压缩操作是 CPU 密集型的,它会分配一些临时内存,因此非常适合使用ByteBufferPool。下面的代码示例说明了在解压缩操作期间如何使用 ByteBufferPool

bb := bbPool.Get() // acquire buffer from pool
// perform decompressing in acquired buffer
bb.B, err = DecompressZSTD(bb.B[:0], src)
if err != nil {
    return nil, fmt.Errorf("cannot decompress: %w", err)
}
// unmarshal from temporary buffer to destination buffer
dst, err = unmarshalInt64NearestDelta(dst, bb.B)
bbPool.Put(bb) // release buffer to the pool, so it can be reused

上述代码从bbPoolByteBufferPool类型)中获取了一个bytes.Buffer对象。接着,它对已经从磁盘读取的一个数据块进行解压缩,并将结果存入这个bytes.Buffer中。需要注意的是,数据块已经从磁盘读取,因此解压缩操作完全是 CPU 密集型的,同时也保持了GetPut调用之间的平衡。

代码在解压缩数据块后,会将其写入目标dst,然后将buffer返回到池pool中,以供重用。需要注意的是,不要返回bb或者保留对它的引用,因为在返回给pool之后,它可能会在任何时候被另一个协程获取并进行修改。

带层级的ByteBuffer

到目前为止,本文中的例子都假设所有的bytes.Buffer是可互换的。虽然从技术上讲这是正确的,但在实际应用中,缓冲区的大小各不相同。如果一个使用少量内存的代码从池中接收到一个大缓冲区,或者反之,这可能会导致内存使用效率低下。

例如,一个指标抓取的目标可能会公开 100 个指标,而另一个可能会公开 10,000 个。抓取每个 targets 的 vmagent 的 goroutine 需要不同大小的缓冲区。在上述的ByteBufferPool实现中,调用代码无法控制它接收到的缓冲区的大小。因此,抓取 goroutine 可能会接收到一个比所需更小的缓冲区,并花费额外的时间进行扩展。这将逐渐用更大的缓冲区替换池中较小的缓冲区,从而增加了整体的内存使用量。

你可以通过将ByteBufferPool分割成多个级别或“桶”来提高其效率。

每个级别包含不同范围的缓冲区大小,对池的请求可以根据预期需求请求特定大小的缓冲区。这种设计允许更精细地管理内存,通过确保每个任务或操作获得合适大小的缓冲区来优化内存使用和性能。

// pools contains pools for byte slices of various capacities.
//
// pools[0] is for capacities from 0 to 8
// pools[1] is for capacities from 9 to 16
// pools[2] is for capacities from 17 to 32
// ...
// pools[n] is for capacities from 2^(n+2)+1 to 2^(n+3)
//
// Limit the maximum capacity to 2^18, since there are no performance benefits
// in caching byte slices with bigger capacities.
var pools [17]sync.Pool

上面的代码片段展示了在 VictoriaMetrics 的 leveledbytebufferpool 包中这些级别是如何表示的。缓存池的最大容量被限制为2^18字节,因为我们发现存储大于这个限制的缓冲区的 RAM 成本比重新创建这些缓冲区的成本要高。

为 pool 分桶之后,修改Get方法使其能够提供预期大小。这样,池就能够返回适当大小的缓冲区。以下是在vmagent抓取示例中如何使用此功能的代码片段:

func (sw *scrapeWork) scrape() {
    body := leveledbytebufferpool.Get(sw.previousResponseBodyLength)
    body.B = sw.ReadData(body.B[:0])
    sw.processScrapedData(body)
    leveledbytebufferpool.Put(body)
}

上面的代码片段根据最后一次请求抓取目标所需的大小来获取一个缓冲区。由于抓取目标暴露的指标数量在每次抓取时变化不大,因此这是一个关于本次需要多大缓冲区的合理猜测。由于函数本身可以确定缓冲区的大小,所以Put函数的签名自上次以来没有改变。


这是一个从 https://juejin.cn/post/7368701816442470451 下的原始话题分离的讨论话题