RocketMQ 顺序消息队列“假死”:一个 NPE 引发的百万级积压与 ConsumeOrderly 死锁惨案

某次核心交易链路报警,监控大盘上 RocketMQ 的 Consumer Lag 指标在短短十几分钟内飙升突破 200 万,业务侧反馈订单状态机完全停滞,P99 延迟直接变成一条横线(超时)。排查发现,问题根因极度低级:业务开发在处理顺序消息(Orderly)的消费逻辑时,漏抓了一个 NullPointerException。这个异常导致 RocketMQ 客户端为了保证严格的局部顺序,不断挂起当前队列并无限重试,彻底锁死了该 MessageQueue,后续百万级消息全部被堵死在单车道上。

结论先行:与并发消费(Concurrent)将失败消息发往 Broker 端的 %RETRY% 队列不同,RocketMQ 的顺序消费在遇到异常时,默认会在 Consumer 本地客户端无限重试MaxReconsumeTimes 默认为 -1,即 Integer.MAX_VALUE)。 在 MessageListenerOrderly 中,绝对不能让未经捕获的异常抛出到框架层。务必严格使用 try-catch 包裹所有业务逻辑,并结合 msg.getReconsumeTimes() 实现阈值阻断与自定义死信队列(DLQ)降级。

故障现场:200万Lag与“安静”的消费者

排查过程中,第一反应是消费端挂了或者 Broker 存在毛刺。但看了下基础监控,Consumer 所在的 K8S Pod 的 CPU 和内存水位都很低,甚至可以说闲得发慌。

执行 mqadmin consumerProgress 查看消费位点状态:

# sh mqadmin consumerProgress -n x.x.x.x:9876 -g Order_Trade_Consumer_Group
Topic             Broker Name  QID  Broker Offset  Consumer Offset  Client IP      Diff
Trade_Order_Topic broker-a     0    150000         150000           10.0.x.x       0
Trade_Order_Topic broker-a     1    152000         152000           10.0.x.x       0
Trade_Order_Topic broker-a     2    3100500        100500           10.0.x.y       3000000  <-- 剧烈积压
Trade_Order_Topic broker-a     3    149000         149000           10.0.x.y       0

现象很明显:并不是整体消费能力不足,而是 broker-aQID=2 这一个队列卡死了。

进到 10.0.x.y 这个 Pod 抓 jstack,发现大量 RocketMQ 的消费线程处于 TIMED_WAITING 状态:

"ConsumeMessageThread_1" Id=85 RUNNABLE
    at java.lang.Thread.sleep(Native Method)
    at org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService$ConsumeRequest.run(ConsumeMessageOrderlyService.java:470)

再翻看业务日志,满屏都是同一个报错的死循环:

java.lang.NullPointerException: user_id is null in payload
    at com.biz.order.listener.OrderStateMachineListener.consumeMessage(OrderStateMachineListener.java:45)

业务代码极其奔放,直接在 consumeMessage 里抛出了 NPE,既没有 catch,也没有重试次数校验。

底层原理解析:为什么并发消费没事,顺序消费就崩?

很多开发习惯了 RocketMQ 的并发消费(Concurrent)模型。在并发模式下,如果 consumeMessage 抛出异常或返回 RECONSUME_LATER,RocketMQ 会将该消息重新发回 Broker 端的 %RETRY%ConsumerGroup 队列,并推进当前 MessageQueue 的消费位点。这样“毒消息”会被扔到一边,后续消息继续畅通无阻,最多重试 16 次后进入死信队列(DLQ)。

但在顺序消费(Orderly)模型下,游戏规则变了。 顺序消费的核心语义是:前一条消息不消费成功,后一条消息绝对不能处理。

为了保证局部有序,Consumer 在拉取到消息后,会向 Broker 申请锁(RebalanceImpl.lockMQPeriodically),锁定整个 MessageQueue,并生成一个 ProcessQueue。 当 MessageListenerOrderly 抛出异常,或者返回 SUSPEND_CURRENT_QUEUE_A_MOMENT 时,我们看看 RocketMQ 内核是怎么处理的:

// 摘自 ConsumeMessageOrderlyService.java 核心逻辑
public void processConsumeResult(
    final ConsumeOrderlyStatus status,
    final ConsumeOrderlyContext context,
    final ConsumeRequest consumeRequest) {

    // ... 前置省略
    case SUSPEND_CURRENT_QUEUE_A_MOMENT:
        // 检查重试次数
        if (checkReconsumeTimes(msgs)) {
            // 如果超过最大重试次数,才发往 DLQ 并推进位点
            consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
            this.submitConsumeRequestLater(
                consumeRequest.getProcessQueue(),
                consumeRequest.getMessageQueue(),
                context.getSuspendCurrentQueueTimeMillis());
            continueConsume = false;
        }
}

注意这里的 checkReconsumeTimes 逻辑。在并发消费中,默认最大重试次数是 16。但在顺序消费中,DefaultMQPushConsumer.maxReconsumeTimes 的默认值是 -1。 这意味着,只要业务抛出异常,客户端就会把当前 MessageQueue 挂起(默认 sleep 1秒),然后重新把这条消息拿出来再消费一次。无限循环,永不跳过。

业务想要的是局部严格顺序,却没考虑过异常数据的降级处理。这就好比在单行道上,一辆车抛锚了,司机不仅不叫拖车,还坐在车里无限期尝试打火,导致后面的百万车流死死堵住。

毁灭性后果与防御性修复

这种积压是极其致命的。因为 MessageQueue 被无限重试的线程死死锁住,哪怕你重启 Consumer Pod,由于 Rebalance 机制,这批“毒消息”只会漂移到另一个 Pod 上,继续锁死那个 Pod 的消费线程。最终导致整个业务集群在处理特定 Shard Key 时彻底瘫痪。

防御性编程不是挂在嘴边的废话,是不让你半夜爬起来擦屁股的救命稻草。 正确的顺序消息消费姿势,必须具备异常兜底主动降级能力:

@Component
public class RobustOrderlyListener implements MessageListenerOrderly {

    // 严禁无限重试,设定最大容忍次数
    private static final int MAX_RETRY_TIMES = 5;

    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        // 顺序消费默认 batch 为 1
        MessageExt msg = msgs.get(0);

        try {
            // 核心业务逻辑
            processBizLogic(msg);
            return ConsumeOrderlyStatus.SUCCESS;

        } catch (Throwable t) {
            // 拦截所有未知的 Throwable,严禁抛出到框架层
            int currentRetry = msg.getReconsumeTimes();
            log.warn("顺序消息消费异常, msgId:{}, retry:{}", msg.getMsgId(), currentRetry, t);

            if (currentRetry >= MAX_RETRY_TIMES) {
                log.error("顺序消息重试到达上限,触发熔断降级。写入死信表并跳过. msgId:{}", msg.getMsgId());
                try {
                    // 必须自己实现死信存储逻辑(如写入 DB/Redis/专用重试Topic)
                    saveToCustomDeadLetter(msg, t);
                } catch (Exception e) {
                    log.error("写入自定义死信队列失败,继续挂起队列", e);
                    // 仅在降级系统也崩溃时,才允许挂起当前队列
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
                // 强制返回 SUCCESS 推进位点,释放队列拥堵
                return ConsumeOrderlyStatus.SUCCESS;
            }

            // 未到重试上限,挂起队列一会再试
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
    }
}

排查清单(同类问题速查)

  1. 单队列卡死确认:使用 mqadmin consumerProgress 检查。如果 Diff 极高且集中在极少数 QID,而其他队列 Diff 为 0,100% 是局部卡死(顺序消息死锁或单分片数据倾斜严重)。

  2. 重试次数默认值陷阱:检查 Consumer 初始化代码。如果使用顺序消费且未显式设置 consumer.setMaxReconsumeTimes(次数),默认会进入 -1(无限重试)模式。强烈建议根据业务容忍度显式设置为 3~5 次。

  3. 消费者线程堆栈查验:执行 jstack | grep ConsumeMessageOrderlyService。如果大量线程长期处于 TIMED_WAITINGsleep 状态,说明业务逻辑正在疯狂触发 SUSPEND

  4. 毒消息清理:一旦发生雪崩,如果业务代码无法立即修复,可使用 mqadmin resetOffsetByTime 强制将卡死队列的消费位点往后拨动(会跳过中间数据,需业务确认可接受),先让后续积压消息流转,事后再通过日志捞回丢失数据。