标签: RocketMQ

  • RocketMQ 顺序消息队列“假死”:一个 NPE 引发的百万级积压与 ConsumeOrderly 死锁惨案

    某次核心交易链路报警,监控大盘上 RocketMQ 的 Consumer Lag 指标在短短十几分钟内飙升突破 200 万,业务侧反馈订单状态机完全停滞,P99 延迟直接变成一条横线(超时)。排查发现,问题根因极度低级:业务开发在处理顺序消息(Orderly)的消费逻辑时,漏抓了一个 NullPointerException。这个异常导致 RocketMQ 客户端为了保证严格的局部顺序,不断挂起当前队列并无限重试,彻底锁死了该 MessageQueue,后续百万级消息全部被堵死在单车道上。

    结论先行:与并发消费(Concurrent)将失败消息发往 Broker 端的 %RETRY% 队列不同,RocketMQ 的顺序消费在遇到异常时,默认会在 Consumer 本地客户端无限重试MaxReconsumeTimes 默认为 -1,即 Integer.MAX_VALUE)。 在 MessageListenerOrderly 中,绝对不能让未经捕获的异常抛出到框架层。务必严格使用 try-catch 包裹所有业务逻辑,并结合 msg.getReconsumeTimes() 实现阈值阻断与自定义死信队列(DLQ)降级。

    故障现场:200万Lag与“安静”的消费者

    排查过程中,第一反应是消费端挂了或者 Broker 存在毛刺。但看了下基础监控,Consumer 所在的 K8S Pod 的 CPU 和内存水位都很低,甚至可以说闲得发慌。

    执行 mqadmin consumerProgress 查看消费位点状态:

    # sh mqadmin consumerProgress -n x.x.x.x:9876 -g Order_Trade_Consumer_Group
    Topic             Broker Name  QID  Broker Offset  Consumer Offset  Client IP      Diff
    Trade_Order_Topic broker-a     0    150000         150000           10.0.x.x       0
    Trade_Order_Topic broker-a     1    152000         152000           10.0.x.x       0
    Trade_Order_Topic broker-a     2    3100500        100500           10.0.x.y       3000000  <-- 剧烈积压
    Trade_Order_Topic broker-a     3    149000         149000           10.0.x.y       0
    

    现象很明显:并不是整体消费能力不足,而是 broker-aQID=2 这一个队列卡死了。

    进到 10.0.x.y 这个 Pod 抓 jstack,发现大量 RocketMQ 的消费线程处于 TIMED_WAITING 状态:

    "ConsumeMessageThread_1" Id=85 RUNNABLE
        at java.lang.Thread.sleep(Native Method)
        at org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService$ConsumeRequest.run(ConsumeMessageOrderlyService.java:470)
    

    再翻看业务日志,满屏都是同一个报错的死循环:

    java.lang.NullPointerException: user_id is null in payload
        at com.biz.order.listener.OrderStateMachineListener.consumeMessage(OrderStateMachineListener.java:45)
    

    业务代码极其奔放,直接在 consumeMessage 里抛出了 NPE,既没有 catch,也没有重试次数校验。

    底层原理解析:为什么并发消费没事,顺序消费就崩?

    很多开发习惯了 RocketMQ 的并发消费(Concurrent)模型。在并发模式下,如果 consumeMessage 抛出异常或返回 RECONSUME_LATER,RocketMQ 会将该消息重新发回 Broker 端的 %RETRY%ConsumerGroup 队列,并推进当前 MessageQueue 的消费位点。这样“毒消息”会被扔到一边,后续消息继续畅通无阻,最多重试 16 次后进入死信队列(DLQ)。

    但在顺序消费(Orderly)模型下,游戏规则变了。 顺序消费的核心语义是:前一条消息不消费成功,后一条消息绝对不能处理。

    为了保证局部有序,Consumer 在拉取到消息后,会向 Broker 申请锁(RebalanceImpl.lockMQPeriodically),锁定整个 MessageQueue,并生成一个 ProcessQueue。 当 MessageListenerOrderly 抛出异常,或者返回 SUSPEND_CURRENT_QUEUE_A_MOMENT 时,我们看看 RocketMQ 内核是怎么处理的:

    // 摘自 ConsumeMessageOrderlyService.java 核心逻辑
    public void processConsumeResult(
        final ConsumeOrderlyStatus status,
        final ConsumeOrderlyContext context,
        final ConsumeRequest consumeRequest) {
    
        // ... 前置省略
        case SUSPEND_CURRENT_QUEUE_A_MOMENT:
            // 检查重试次数
            if (checkReconsumeTimes(msgs)) {
                // 如果超过最大重试次数,才发往 DLQ 并推进位点
                consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
                this.submitConsumeRequestLater(
                    consumeRequest.getProcessQueue(),
                    consumeRequest.getMessageQueue(),
                    context.getSuspendCurrentQueueTimeMillis());
                continueConsume = false;
            }
    }
    

    注意这里的 checkReconsumeTimes 逻辑。在并发消费中,默认最大重试次数是 16。但在顺序消费中,DefaultMQPushConsumer.maxReconsumeTimes 的默认值是 -1。 这意味着,只要业务抛出异常,客户端就会把当前 MessageQueue 挂起(默认 sleep 1秒),然后重新把这条消息拿出来再消费一次。无限循环,永不跳过。

    业务想要的是局部严格顺序,却没考虑过异常数据的降级处理。这就好比在单行道上,一辆车抛锚了,司机不仅不叫拖车,还坐在车里无限期尝试打火,导致后面的百万车流死死堵住。

    毁灭性后果与防御性修复

    这种积压是极其致命的。因为 MessageQueue 被无限重试的线程死死锁住,哪怕你重启 Consumer Pod,由于 Rebalance 机制,这批“毒消息”只会漂移到另一个 Pod 上,继续锁死那个 Pod 的消费线程。最终导致整个业务集群在处理特定 Shard Key 时彻底瘫痪。

    防御性编程不是挂在嘴边的废话,是不让你半夜爬起来擦屁股的救命稻草。 正确的顺序消息消费姿势,必须具备异常兜底主动降级能力:

    @Component
    public class RobustOrderlyListener implements MessageListenerOrderly {
    
        // 严禁无限重试,设定最大容忍次数
        private static final int MAX_RETRY_TIMES = 5;
    
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            // 顺序消费默认 batch 为 1
            MessageExt msg = msgs.get(0);
    
            try {
                // 核心业务逻辑
                processBizLogic(msg);
                return ConsumeOrderlyStatus.SUCCESS;
    
            } catch (Throwable t) {
                // 拦截所有未知的 Throwable,严禁抛出到框架层
                int currentRetry = msg.getReconsumeTimes();
                log.warn("顺序消息消费异常, msgId:{}, retry:{}", msg.getMsgId(), currentRetry, t);
    
                if (currentRetry >= MAX_RETRY_TIMES) {
                    log.error("顺序消息重试到达上限,触发熔断降级。写入死信表并跳过. msgId:{}", msg.getMsgId());
                    try {
                        // 必须自己实现死信存储逻辑(如写入 DB/Redis/专用重试Topic)
                        saveToCustomDeadLetter(msg, t);
                    } catch (Exception e) {
                        log.error("写入自定义死信队列失败,继续挂起队列", e);
                        // 仅在降级系统也崩溃时,才允许挂起当前队列
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                    // 强制返回 SUCCESS 推进位点,释放队列拥堵
                    return ConsumeOrderlyStatus.SUCCESS;
                }
    
                // 未到重试上限,挂起队列一会再试
                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
            }
        }
    }
    

    排查清单(同类问题速查)

    1. 单队列卡死确认:使用 mqadmin consumerProgress 检查。如果 Diff 极高且集中在极少数 QID,而其他队列 Diff 为 0,100% 是局部卡死(顺序消息死锁或单分片数据倾斜严重)。

    2. 重试次数默认值陷阱:检查 Consumer 初始化代码。如果使用顺序消费且未显式设置 consumer.setMaxReconsumeTimes(次数),默认会进入 -1(无限重试)模式。强烈建议根据业务容忍度显式设置为 3~5 次。

    3. 消费者线程堆栈查验:执行 jstack | grep ConsumeMessageOrderlyService。如果大量线程长期处于 TIMED_WAITINGsleep 状态,说明业务逻辑正在疯狂触发 SUSPEND

    4. 毒消息清理:一旦发生雪崩,如果业务代码无法立即修复,可使用 mqadmin resetOffsetByTime 强制将卡死队列的消费位点往后拨动(会跳过中间数据,需业务确认可接受),先让后续积压消息流转,事后再通过日志捞回丢失数据。

  • RocketMQ 生产环境 P99 抖动排查实战:PageCache 剧烈回收引发的 Broker Busy 与 Mmap 预热机制解析

    排查过程中,某高并发压测场景下的 RocketMQ 集群(v4.9.4)频繁爆出 [TIMEOUT_CLEAN_QUEUE]broker busy,发送延迟 P99 从 5ms 突增至 2000ms+。核心原因是 Linux PageCache 脏页回写与 mmap 缺页中断(Page Fault)阻塞了 Broker 写线程。结论先行:通过开启 RocketMQ 的 warmMapedFileEnable=truetransientStorePoolEnable=true,配合下调 OS 内核的 vm.dirty_background_ratio,可彻底斩断内核级阻塞,将 P99 稳定压制在 10ms 以内。

    故障现场与指标观测

    某次大促前夕的全链路压测中,单 Broker 节点 QPS 压到 4w 时,客户端开始出现大量的 MQBrokerException: broker busyRemotingTooMuchRequestException 报错。

    查看 Broker 端 store.logbroker.log,满屏如下报错:

    202X-XX-XX XX:XX:XX WARN [SendMessageThread_1] - [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 205ms, size of queue: 853
    202X-XX-XX XX:XX:XX WARN [SendMessageThread_2] - OS page cache busy, osPageCacheBusyTimeOutMills=1000
    

    调出监控看板:

    1. CPU Load:平时 4-5 左右,故障发生瞬间 Load Average 飙升至 40+。

    2. 磁盘 IOiostat -xdm 1 显示 await 偶尔飙高,但 util% 只有 50% 左右,磁盘并未彻底被打满。

    3. 内存指标free -m 显示 buff/cache 占用接近 85%,物理空闲内存(free)极少。

    此时通过 strace -p -T -e trace=mmap,munmap,write,pwrite64 抓取底层系统调用,发现部分写操作耗时极其离谱,甚至超过 1 秒。这就引出了一个经典的架构错觉:我都全异步了,为什么还会卡?

    为什么异步刷盘(ASYNC_FLUSH)依然会阻塞写线程?

    很多开发人员认为,只要 RocketMQ 配置了 flushDiskType=ASYNC_FLUSH,消息只要写到内存(PageCache)就算成功,磁盘 IO 慢绝不会影响发送延迟。这是一个极其致命的认知盲区。

    RocketMQ 的 CommitLog 默认采取 1GB 固定大小,通过 mmap(Memory Mapped Files)将物理文件映射到用户态的虚拟内存中。Broker 处理写请求的核心路径是: SendMessageProcessor -> CommitLog.putMessage() -> MappedFile.appendMessagesInner() -> ByteBuffer.put(data)

    问题就出在这个 ByteBuffer.put() 上。这虽然是内存操作,但在 Linux 内核视角下,它随时可能被阻塞,原因有二:

    1. 缺页中断(Minor/Major Page Fault): 当 Broker 滚动创建新的 1GB CommitLog 并执行 mmap 时,Linux 采用的是“延迟分配”策略。仅仅是建立了虚拟内存地址映射,并未分配实际物理页。当写线程第一次往这个地址 put 数据时,会触发内核缺页中断,内核需要去寻找空闲物理页并建立页表。如果此时系统物理内存紧张,内核触发直接回收(Direct Reclaim),写线程就会被死死卡住。

    2. PageCache 脏页回写阻塞: 当脏页积累到内核阈值(vm.dirty_ratio,默认 20%)时,Linux 会挂起所有尝试生成新脏页的用户进程,强行同步刷盘。此时你的 ByteBuffer.put() 会直接退化为同步阻塞写。

    深度解析:CommitLog Mmap 与 读写分离预热机制

    为了规避上述内核级别的阻塞,RocketMQ 提供了几项极为核心的防御性存储机制。

    1. 强制预热与内存锁定(warmMapedFileEnable)

    配置 warmMapedFileEnable=true 后,Broker 在创建新的 1GB MappedFile 时,会提前在后台线程中将其填满 0,强行触发所有的缺页中断,真正分配物理内存。 不仅如此,RocketMQ 还会调用 JNA 执行 mlockmadvise

    // 核心源码示意 (MappedFile.java)
    LibC.INSTANCE.mlock(pointer, 1024 * 1024 * 1024);
    LibC.INSTANCE.madvise(pointer, 1024 * 1024 * 1024, LibC.MADV_WILLNEED);
    

    mlock 直接告诉内核:“这 1GB 内存你给我锁死在 RAM 里,绝对不允许 Swap 出去!”。这就彻底消除了写消息时发生 Page Fault 的可能性。

    2. 堆外内存写池(transientStorePoolEnable)

    这是应对 PageCache 毛刺的终极武器(仅限异步刷盘有效)。 开启后,RocketMQ 会预先向 OS 申请一块 DirectByteBuffer 内存池(不受 JVM GC 影响,也暂时不进 PageCache)。 写数据路径变为:写请求 -> DirectByteBuffer -> 立即返回客户端成功。 后台 CommitRealTimeService 线程定期将 DirectByteBuffer 的数据写入 FileChannel(进入 PageCache),再由 FlushRealTimeService 线程异步刷盘。 这是一种极致的读写分离策略,彻底将“接收消息的写线程”与“PageCache 分配/刷盘”解耦。

    极客实战:RocketMQ 存储与内核参数双向调优

    解决此类抖动问题,绝不能只改应用配置,必须深入 OS 层联动调优。以下是我在生产环境经过验证的黄金配置标准。

    RocketMQ 核心配置 (broker.conf)

    # 强制使用异步刷盘
    flushDiskType=ASYNC_FLUSH
    # 开启堆外内存池缓冲,彻底解耦写请求与PageCache抖动
    transientStorePoolEnable=true
    # 开启Mmap预热与内存锁定,消除运行时缺页中断
    warmMapedFileEnable=true
    # 优化PageCache锁超时机制(如果发生抖动,快速失败,依赖重试)
    osPageCacheBusyTimeOutMills=1000
    

    Linux 内核 IO 参数调优 (/etc/sysctl.conf)

    光配 Broker 不够,必须改造内核的脏页回写策略:

    # 脏页占总内存的 5% 时,pdflush 后台线程开始异步刷盘(原默认10%)
    # 目的:提早刷盘,细水长流,避免积压
    vm.dirty_background_ratio = 5
    
    # 脏页占总内存的 40% 时,强制阻塞所有用户态写进程(原默认20%)
    # 目的:拉开与 background_ratio 的差距,给突发流量留足 Buffer
    vm.dirty_ratio = 40
    
    # 坚决不使用 Swap(避免mmap的内存被换出)
    vm.swappiness = 1
    
    # 预留给 OS 应急的物理内存(例如 128G 内存机器配 2G)
    # 目的:避免缺页中断时因无空闲内存触发直接回收(Direct Reclaim)引发系统停顿
    vm.min_free_kbytes = 2097152
    

    执行 sysctl -p 生效。经过这一套连招组合拳,压测 P99 稳如泰山,再也没有出现过 broker busy

    常见问题 (FAQ)

    Q1:开启 transientStorePoolEnable=true 后,如果 Broker 进程直接 Crash(如 OOM Killer),数据会丢失吗? 会。这就是享受极致低延迟的代价。该模式下数据首先写入 DirectByteBuffer,这是用户态进程的堆外内存。如果进程被 kill -9 或者 Crash,这部分尚未 commit 到 OS PageCache 的数据将会丢失。如果你对数据一致性要求极度苛刻(如金融交易),只能忍受延迟,关闭此项并使用 SYNC_FLUSH

    Q2:为什么消费重试队列(%RETRY%)里的消息会导致明显的磁盘 IO 升高和 Broker 负载增加? RocketMQ 是基于 CommitLog 的混合存储。正常消费是顺序读写(刚写完的数据大概率还在 PageCache 中,命中率极高)。但重试队列消费的是过去某个时间点的冷数据。这就迫使 Broker 产生大量的随机 IO(读磁盘),导致 PageCache 污染,驱逐掉热数据,从而引发全局性能下降。应对策略通常是单独隔离重试服务,或使用 NVMe SSD 扛随机 IO。

    Q3:遇到 [TIMEOUT_CLEAN_QUEUE]broker busy,除了存储层问题,还有什么原因? 如果磁盘 IO 不高,PageCache 也没问题,你需要检查是不是 JVM 发生了长时间的 Stop-The-World (STW)。尤其是 G1 GC 配置不当,或是业务代码向 RocketMQ 发送超大消息(如几 MB 的报文),导致 Broker 在反序列化/网络传输时消耗大量 CPU 和内存资源,阻塞了 Netty 的 Worker 线程。