Skip to content

[RIP‐73] Pop Consumption Improvement Based on RocksDB

lizhimins edited this page Dec 10, 2024 · 1 revision

RIP-73,Pop Consumption Improvement Based on RocksDB

Status

Background & Motivation

What do we need to do

  • Will we add a new module? No.
  • Will we add new APIs? No.
  • Will we add a new feature? Yes.

Why should we do that

Are there any problems with our current project?

Pop consumption is a new consumption model with the advantages of being lightweight, stateless on the client side, and non-exclusive queueing in load balance. However, the current pop implementation has a large codebase, high disk write traffic when the buffer is closed, and more complex state maintenance when the buffer is enabled. Therefore, we propose a new implementation based on rocksdb storage to elegantly solve the compatibility issues between pop retry topic v1 and v2 formats and the probabilistic retry access problem. Other improvements include full asynchrony operation, optimized cache design, accelerated encoding and decoding, and pop message fairness probleam. Performance testing with a read-write ratio of 1:1 in the same environment compared to the current mainline (Version 5.3.1), the new pop implementation saves 28% of the overall cpu usage of the broker when the buffer is closed, and reduces the overall cpu usage by 4.5% when the Buffer is enabled, with the pop part showing a reduction of 17%.

Pop 消费是一种全新的消费模式,具有轻量,客户端无状态,无队列独占等优点,目前 Pop 实现代码量大,关闭 buffer 时磁盘写入流量高,开启 buffer 后状态维护较为复杂。因此我们提出了一个基于 Rocksdb 来存储状态的新实现,优雅的处理了 PopRetry v1 v2 格式兼容和概率重试的问题,其余改进还包括全异步、缓存设计优化、编解码加速以及访问公平性等多个细节。在相同环境下,对比当前主干分支(5.3.1 版本)进行读写比为1:1的流量性能测试。数据表明,改进后的 Pop 实现在相同的,关闭 buffer 时节约 Broker 整体 CPU 的 28%,开启 buffer 时降低整体 CPU 使用率的 4.5%,其中 Pop 部分对比降低了 17%。

What can we benefit from proposed changes?

  1. The new POP KV implementation no longer relies on scheduled (timer / delay) messages.
  2. The new POP KV implementation significantly reduces the codebase, to about one-third of the original (excluding comments, unit tests, and switch logic).
  3. The old implementation, due to pop revive log writing to the commitLog, would occupy disk space and cause read amplification. This solution effectively addresses these issues.
  4. Performance improvement, as most consumers can immediately return a successful consumption response upon receiving a message, leading to the introduction of cache design:
    1. When consumers quickly ACK, enabling the Buffer reduces the overall CPU usage by 4.5%, with the POP part showing a 17% reduction.
    2. When consumers slowly ACK (cache expires) or the Buffer is disabled, it reduces the overall CPU usage of the Broker by 28%.
    3. During a large number of change invisible time operations, performance is improved several times over.

方案收益:

  1. 新的 POP KV 实现不再依赖定时消息。

  2. 新的 POP KV 实现代码量显著减少,大约是原来的三分之一(不计算注释,单测和切换逻辑)。

  3. 旧实现由于 Pop Revive 写入 CommitLog,会挤占磁盘空间,带来读放大,本方案有效解决。

  4. 性能提升,由于绝大部分消费者对于收到的消息都能立刻返回消费成功。

    现有实现已经有 PopBufferMerge 的缓存设计,经过性能测试,可以发现启用新版实现后:

    1. 消费者快速 ack 时且开启 buffer 时降低整体 CPU 使用率的 4.5%,其中 POP 部分对比降低 17%
    2. 消费者慢速 ack 时(缓存过期淘汰)或者关闭 Buffer 时,降低 Broker 整体 CPU 的 28%
    3. 大量 change invisible time 操作时,提升数倍性能。

Goals

What problem is this proposal designed to solve?

As mentioned above, the proposal aims to eliminate the dependency of Pop consumption on scheduled messages, reduce the codebase and implementation complexity, and improve performance to some extent.

Non-Goals

What problem is this proposal NOT designed to solve?

  1. To maintain compatibility, the protocol for Pop consumption will not be modified.
  2. Metrics related to Pop can be measured more accurately, but this will not be included in this submission.

Changes

Architecture

image

1. Pop Definition

The following introduces relevant terms in Pop consumption:

  1. Pop Queue: Initiating a Pop request for a specific queue will only return messages from this queue and its retries.
  2. Pop Broker: Also known as Pop -1, initiating a Pop request for user topics on the Broker can return messages from multiple queues.
  3. Pop Handle: In Pop mode, messages obtained by the consumer do not have a handle by default, such as Pop time, invisible time, and position information. The client uses information in the header and calculates the handle using PutIfAbsent.
  4. CK Mock: Refers to generating CK (Commit Key) with only position submission, without including the MarkBitSets design.
  5. Notify On-Demand Wakeup: After new messages are written to the CommitLog and dispatched, they need to notify long polling to return messages. If the new messages do not match the Tag/SQL rules in the subscription relationship, they will not trigger a wakeup. This strategy can effectively reduce CPU usage in bus usage scenarios.
  6. Retry Compatibility: To resolve the issue where underscores in %RETRY%Group_Topic cannot be reversed to identify Group and Topic in Pop, Retry v2 introduces the use of ‘+’ as the delimiter.
  7. Retry Restoration: To address the problem of overly long Topics in the storage layer, the Broker re-decodes and re-encodes the retrieved Retry messages during encoding, which introduces computational overhead. Note that normal Topics are 0, Retries are 1, and lower-version clients cannot parse Retries as 2. Therefore, clients only receive a retry flag at the protocol level, while the Broker finely distinguishes between the two.
  8. Revive: Refers to the state merging process in Pop, specifically Buffer merge in the Pop Buffer. When an uncancelled CK is detected, the retry count of the message is incremented by 1, and the next invisible time is calculated based on the backoff strategy.
  9. Pop Orderly: Refers to ordered consumption in Pop, for the same queue (not a single Pop Request). If the first response is ABC: a. If A is acknowledged, the second request will return BC or BCD. b. If BC is acknowledged, consumption will block by default if the handle is not carried.
  10. Ack Orderly: When submitting an ACK for an ordered message, if there was a previous situation where restCount > 0 and long polling was suspended, the system will notify the long polling again to ensure real-time delivery of ordered messages.

以下介绍 Pop 消费中的相关名词:

  1. Pop Queue:对某一个特定的队列发起 Pop 请求,只会返回这一个队列和 Retry 的消息。
  2. Pop Broker:也称为 Pop -1,对 Broker 上用户 Topic 发起 Pop 请求,可返回多个队列的消息。
  3. Pop Handle:Pop 模式下客户端获取到的消息,默认是没有 Handle 的,比如说 Pop 时间,不可见时间,位点信息等。客户端会使用 Header 中的信息,使用 PutIfAbsent 来计算 Handle。
  4. CK Mock,指生成 CK 仅提交位点,不包含 MarkBitSets 设计。
  5. Notify 按需唤醒,指新消息写入 CommitLog 并进行 dispatch 后,需要 notify 长轮询返回消息,此时和订阅关系中的 Tag / SQL 规则不匹配则不会唤醒,该策略在总线使用场景能够非常有效的降低 CPU 使用率。
  6. Retry 兼容:为了解决 Pop 中 %RETRY%Group_Topic 里下划线无法反解 Group 和 Topic 的问题,引入 Retry v2 将连接符替换为 ‘+’。
  7. Retry 还原:为了解决存储层 Topic 过长的问题,Broker 在 Encode 的时候会将获取到的 Retry 消息重新解码编码,这里会引入计算开销。注意这里正常 Topic 是 0,Retry 是 1,低版本客户端是无法解析 Retry 为 2 的情况,因此客户端只会在协议层拿到是否重试的标记,而 Broker 是会精细区分这两者的。
  8. Revive:指 Pop 的状态合并过程,在 Pop Buffer 里面是 Buffer merge。当发现没有抵消的 CK 时,会给消息的重试次数 + 1,并根据退避策略计算 next invisible time。
  9. Pop Orderly:指 Pop 的顺序消费,对于同一个队列(不是一个 Pop Request)。如果第一次返回 ABC。 a. 如果 ack 了 A,第二次请求会返回 BC / BCD。 b. 如果 ack 了 BC,在不携带 Handle 的情况下默认会消费阻塞,在不可见时间之后根据请求条数拿到 A,AB,ABC
  10. Ack Orderly:提交顺序消息 ACK 的时候,可能有 rest count > 0 但长轮询挂起的情况,此时会再次 notify 长轮询以实现顺序消息的实时性。

2. Pop Consume Service

image

In the new implementation, we use RocksDB to store Pop CheckPoints, and the introduction of the KV model significantly reduces the complexity of state storage. Common operations in Pop consumption have been converted into KV model operations:

  • Write CK => Put KV: The Key must include group+ topic + queueId + offset and pop time.
  • Write AK => Delete by Key: There is no need to reverse-parse the content of the Value.
  • Revive => Prefix Scan using seek to first: Utilize the seek to first feature provided by the KV model to perform a prefix scan.

新的实现我们使用 RocksDB 存储 Pop CheckPoint,KV 模型的引入大幅降低了状态存储的复杂性。在 Pop 消费中常见的几个操作被转换为 KV 模型的操作。

  • 写 CK => Put KV,其中 Key 需要包含 group + topic + queueId + offset 和 pop time。
  • 写 AK => Delete by Key,不需要反解析 Value 的内容。
  • Revive => 利用 KV 模型提供的 seek to first 进行前缀扫描。

3. Pop Consume Cache

In actual production, since messages delivered to consumers can typically return a quick success acknowledgment, the current version introduces PopBufferService to cache in-flight messages. We have implemented a simpler caching mechanism and used ConsumerOffsetManager to uniformly manage commit offsets and pull offsets, aligning the position management entirely with the pull model.

在实际生产中,由于投递到消费者的消息通常可以快速返回消费成功,当前版本中引入了 PopBufferService 用来缓存 inflight 的消息,我们做了更简单的缓存实现,并使用 ConsumerOffsetManager 来统一管理 commit offset 和 pull offset,使其位点管理实现完全与 pull 模型对齐。

4. Frequently Asked Questions

1. Not use Caffeine as Cache

Caffeine provides asynchronous deletion capabilities and abstractions such as remove listener, as well as some blocking queue implementations to reduce contention and improve performance. However, the POC implementation performs worse than the native map and introduces some uncertain timing issues.

2. Message Real-Time Issues
  1. Retry Messages:
  • Currently, when using the underscore concatenation mode v1, the real-time nature of retry messages is problematic and requires enabling v2 to resolve.

  • In the KV version, because revive does not depend on scheduled messages, even v1 can support real-time behavior. Real-time behavior in v1 can also be achieved by adding context to props. The specific solution is to call the notify long polling logic when writing retry messages during revive.

  1. Ordered Messages:
  • When acking, the notify mechanism is triggered, even if the first attempt fails to acquire the lock due to concurrency.
3. Compatibility Issues with Revive queueId
  • In the Native FS version implementation, the revive queueId is calculated as an incrementing value, and the revive and ACK actions only handle specific queues.

  • In the KV / Rocksdb version, since the persistent part is shared and does not depend on a specific revive queueId, to maintain protocol compatibility, the new implementation still returns the revive queueId to the client, which is also relied upon by the RPC selection logic during rollback.

image

image

4. Fixing Inflight Issues

Due to the filtering logic, when the broker restarts, or the consumption time exceeds 10 seconds, or when ACKs are written to storage, and the user sends multiple ACKs, the AtomicLong used in the current version cannot guarantee accuracy and becomes increasingly inaccurate over time. In the KV version, only the buffer situation is counted, which is accurate in most scenarios. If the user consumes very slowly, the inflight value may still be low, and this is no worse than the current version in terms of performance.

5. The Last Message Appears Not Consumed

One possible reason is that the try lock path and the change/ACK path in PopBufferMergeService#scanCommitOffset are concurrent. When the buffer is removed, the offset commit fails. If new messages arrive, the offset commit succeeds, and there is no backlog.

Interface Design/Change

  • Method signature changes? Nothing specific.
  • Method behavior changes? Nothing specific.
  • CLI command changes? Nothing specific.
  • Log format or content changes? Nothing specific.

Compatibility, Deprecation, and Migration Plan

  • Are backward and forward compatibility taken into consideration? Yes
  • Are there deprecated APIs? No
  • How do we do migration? Below is a detailed introduction

Unit Test Coverage

Transfer to new Implements

Enable the new KV implementation and switch to the new implementation at the appropriate time!

开启新的 KV 实现,并在合适的时候切流到新的实现上!

As an optimization of the existing pop logic, an ideal solution would be to create a common abstraction layer and then enable the new and old implementations entirely based on configuration. However, this approach would inevitably require moving a large amount of code, potentially compromising the reliability of the existing production code. Therefore, the following approach was chosen:

  1. After enabling POP KV, the "forwarding" logic to the new implementation is directly added at the Processor layer.
  2. Consequently, there will be two switches: Init and Enable.
  • Init: Indicates the initialization of the service.
  • Enable: Indicates the acceptance of new traffic for writing.

This approach minimizes disruption to the existing codebase while allowing for a gradual transition to the new implementation.

在旧实现中,记录的状态(持久化的部分)主要包括:

  1. PopRevive 中的 ck,ak,ckMock 以及处理位点。
  2. 定时消息中待出队的 ck ak 记录,这个部分是很难导出的。
  3. 顺序消费的状态,保存在 ConsumerOrderInfo。

作为对原有 POP 逻辑的优化,一个理想的方案是做一个公共抽象层,然后完全根据配置启用新旧实现。
这个方案不可避免的需要挪动大量代码,破坏原有生产代码的可靠性,因此选择另一种 “转发方案”:

  1. 当启用 POP KV 之后,在 Processor 层直接添加 “转发” 到新实现的逻辑。
  2. 这个设计有两个开关,Init 和 Enable,Init 表示初始化服务,Enable 表示接受新流量写入。
@Override
public RemotingCommand processRequest(
    final ChannelHandlerContext ctx, RemotingCommand request) 
        throws RemotingCommandException {
    
    if (brokerConfig.isPopConsumerKVServiceEnable()) {
        // do something
        return;
    }
    // fs pop impl
}

Rollback to old version(based on Native FileSystem)

Under the new implementation, pop does not rely on schedule (delay) message feature, so the states (corresponding to Checkpoints) are only the records in KV and ConsumerOrderInfo.

  1. Disable the popConsumerKVServiceEnable switch. During the upgrade period, the pop service remains active, and the hot update will fallback to the old implementation.
  2. After executing step 1, use rollback RPC by remoting command to export the records in rocksdb directly as old "checkpoints scheduled messages. Here, it is sufficient for the scheduled messages to be successfully written to the CommitLog. Even if a rollback occurs and there is a system failure, the switch-over is still safe.

Implementation Outline

We will implement the proposed changes in pull requests.

Rejected Alternatives

  • How does alternatives solve the issue you proposed? No
  • Pros and Cons of alternatives。No
  • Why should we reject the above alternatives。No
Clone this wiki locally