分类: 故障排查与性能调优

  • RocketMQ 顺序消息队列“假死”:一个 NPE 引发的百万级积压与 ConsumeOrderly 死锁惨案

    某次核心交易链路报警,监控大盘上 RocketMQ 的 Consumer Lag 指标在短短十几分钟内飙升突破 200 万,业务侧反馈订单状态机完全停滞,P99 延迟直接变成一条横线(超时)。排查发现,问题根因极度低级:业务开发在处理顺序消息(Orderly)的消费逻辑时,漏抓了一个 NullPointerException。这个异常导致 RocketMQ 客户端为了保证严格的局部顺序,不断挂起当前队列并无限重试,彻底锁死了该 MessageQueue,后续百万级消息全部被堵死在单车道上。

    结论先行:与并发消费(Concurrent)将失败消息发往 Broker 端的 %RETRY% 队列不同,RocketMQ 的顺序消费在遇到异常时,默认会在 Consumer 本地客户端无限重试MaxReconsumeTimes 默认为 -1,即 Integer.MAX_VALUE)。 在 MessageListenerOrderly 中,绝对不能让未经捕获的异常抛出到框架层。务必严格使用 try-catch 包裹所有业务逻辑,并结合 msg.getReconsumeTimes() 实现阈值阻断与自定义死信队列(DLQ)降级。

    故障现场:200万Lag与“安静”的消费者

    排查过程中,第一反应是消费端挂了或者 Broker 存在毛刺。但看了下基础监控,Consumer 所在的 K8S Pod 的 CPU 和内存水位都很低,甚至可以说闲得发慌。

    执行 mqadmin consumerProgress 查看消费位点状态:

    # sh mqadmin consumerProgress -n x.x.x.x:9876 -g Order_Trade_Consumer_Group
    Topic             Broker Name  QID  Broker Offset  Consumer Offset  Client IP      Diff
    Trade_Order_Topic broker-a     0    150000         150000           10.0.x.x       0
    Trade_Order_Topic broker-a     1    152000         152000           10.0.x.x       0
    Trade_Order_Topic broker-a     2    3100500        100500           10.0.x.y       3000000  <-- 剧烈积压
    Trade_Order_Topic broker-a     3    149000         149000           10.0.x.y       0
    

    现象很明显:并不是整体消费能力不足,而是 broker-aQID=2 这一个队列卡死了。

    进到 10.0.x.y 这个 Pod 抓 jstack,发现大量 RocketMQ 的消费线程处于 TIMED_WAITING 状态:

    "ConsumeMessageThread_1" Id=85 RUNNABLE
        at java.lang.Thread.sleep(Native Method)
        at org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService$ConsumeRequest.run(ConsumeMessageOrderlyService.java:470)
    

    再翻看业务日志,满屏都是同一个报错的死循环:

    java.lang.NullPointerException: user_id is null in payload
        at com.biz.order.listener.OrderStateMachineListener.consumeMessage(OrderStateMachineListener.java:45)
    

    业务代码极其奔放,直接在 consumeMessage 里抛出了 NPE,既没有 catch,也没有重试次数校验。

    底层原理解析:为什么并发消费没事,顺序消费就崩?

    很多开发习惯了 RocketMQ 的并发消费(Concurrent)模型。在并发模式下,如果 consumeMessage 抛出异常或返回 RECONSUME_LATER,RocketMQ 会将该消息重新发回 Broker 端的 %RETRY%ConsumerGroup 队列,并推进当前 MessageQueue 的消费位点。这样“毒消息”会被扔到一边,后续消息继续畅通无阻,最多重试 16 次后进入死信队列(DLQ)。

    但在顺序消费(Orderly)模型下,游戏规则变了。 顺序消费的核心语义是:前一条消息不消费成功,后一条消息绝对不能处理。

    为了保证局部有序,Consumer 在拉取到消息后,会向 Broker 申请锁(RebalanceImpl.lockMQPeriodically),锁定整个 MessageQueue,并生成一个 ProcessQueue。 当 MessageListenerOrderly 抛出异常,或者返回 SUSPEND_CURRENT_QUEUE_A_MOMENT 时,我们看看 RocketMQ 内核是怎么处理的:

    // 摘自 ConsumeMessageOrderlyService.java 核心逻辑
    public void processConsumeResult(
        final ConsumeOrderlyStatus status,
        final ConsumeOrderlyContext context,
        final ConsumeRequest consumeRequest) {
    
        // ... 前置省略
        case SUSPEND_CURRENT_QUEUE_A_MOMENT:
            // 检查重试次数
            if (checkReconsumeTimes(msgs)) {
                // 如果超过最大重试次数,才发往 DLQ 并推进位点
                consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
                this.submitConsumeRequestLater(
                    consumeRequest.getProcessQueue(),
                    consumeRequest.getMessageQueue(),
                    context.getSuspendCurrentQueueTimeMillis());
                continueConsume = false;
            }
    }
    

    注意这里的 checkReconsumeTimes 逻辑。在并发消费中,默认最大重试次数是 16。但在顺序消费中,DefaultMQPushConsumer.maxReconsumeTimes 的默认值是 -1。 这意味着,只要业务抛出异常,客户端就会把当前 MessageQueue 挂起(默认 sleep 1秒),然后重新把这条消息拿出来再消费一次。无限循环,永不跳过。

    业务想要的是局部严格顺序,却没考虑过异常数据的降级处理。这就好比在单行道上,一辆车抛锚了,司机不仅不叫拖车,还坐在车里无限期尝试打火,导致后面的百万车流死死堵住。

    毁灭性后果与防御性修复

    这种积压是极其致命的。因为 MessageQueue 被无限重试的线程死死锁住,哪怕你重启 Consumer Pod,由于 Rebalance 机制,这批“毒消息”只会漂移到另一个 Pod 上,继续锁死那个 Pod 的消费线程。最终导致整个业务集群在处理特定 Shard Key 时彻底瘫痪。

    防御性编程不是挂在嘴边的废话,是不让你半夜爬起来擦屁股的救命稻草。 正确的顺序消息消费姿势,必须具备异常兜底主动降级能力:

    @Component
    public class RobustOrderlyListener implements MessageListenerOrderly {
    
        // 严禁无限重试,设定最大容忍次数
        private static final int MAX_RETRY_TIMES = 5;
    
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            // 顺序消费默认 batch 为 1
            MessageExt msg = msgs.get(0);
    
            try {
                // 核心业务逻辑
                processBizLogic(msg);
                return ConsumeOrderlyStatus.SUCCESS;
    
            } catch (Throwable t) {
                // 拦截所有未知的 Throwable,严禁抛出到框架层
                int currentRetry = msg.getReconsumeTimes();
                log.warn("顺序消息消费异常, msgId:{}, retry:{}", msg.getMsgId(), currentRetry, t);
    
                if (currentRetry >= MAX_RETRY_TIMES) {
                    log.error("顺序消息重试到达上限,触发熔断降级。写入死信表并跳过. msgId:{}", msg.getMsgId());
                    try {
                        // 必须自己实现死信存储逻辑(如写入 DB/Redis/专用重试Topic)
                        saveToCustomDeadLetter(msg, t);
                    } catch (Exception e) {
                        log.error("写入自定义死信队列失败,继续挂起队列", e);
                        // 仅在降级系统也崩溃时,才允许挂起当前队列
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                    // 强制返回 SUCCESS 推进位点,释放队列拥堵
                    return ConsumeOrderlyStatus.SUCCESS;
                }
    
                // 未到重试上限,挂起队列一会再试
                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
            }
        }
    }
    

    排查清单(同类问题速查)

    1. 单队列卡死确认:使用 mqadmin consumerProgress 检查。如果 Diff 极高且集中在极少数 QID,而其他队列 Diff 为 0,100% 是局部卡死(顺序消息死锁或单分片数据倾斜严重)。

    2. 重试次数默认值陷阱:检查 Consumer 初始化代码。如果使用顺序消费且未显式设置 consumer.setMaxReconsumeTimes(次数),默认会进入 -1(无限重试)模式。强烈建议根据业务容忍度显式设置为 3~5 次。

    3. 消费者线程堆栈查验:执行 jstack | grep ConsumeMessageOrderlyService。如果大量线程长期处于 TIMED_WAITINGsleep 状态,说明业务逻辑正在疯狂触发 SUSPEND

    4. 毒消息清理:一旦发生雪崩,如果业务代码无法立即修复,可使用 mqadmin resetOffsetByTime 强制将卡死队列的消费位点往后拨动(会跳过中间数据,需业务确认可接受),先让后续积压消息流转,事后再通过日志捞回丢失数据。