某次核心交易链路报警,监控大盘上 RocketMQ 的 Consumer Lag 指标在短短十几分钟内飙升突破 200 万,业务侧反馈订单状态机完全停滞,P99 延迟直接变成一条横线(超时)。排查发现,问题根因极度低级:业务开发在处理顺序消息(Orderly)的消费逻辑时,漏抓了一个 NullPointerException。这个异常导致 RocketMQ 客户端为了保证严格的局部顺序,不断挂起当前队列并无限重试,彻底锁死了该 MessageQueue,后续百万级消息全部被堵死在单车道上。
结论先行:与并发消费(Concurrent)将失败消息发往 Broker 端的 %RETRY% 队列不同,RocketMQ 的顺序消费在遇到异常时,默认会在 Consumer 本地客户端无限重试(MaxReconsumeTimes 默认为 -1,即 Integer.MAX_VALUE)。
在 MessageListenerOrderly 中,绝对不能让未经捕获的异常抛出到框架层。务必严格使用 try-catch 包裹所有业务逻辑,并结合 msg.getReconsumeTimes() 实现阈值阻断与自定义死信队列(DLQ)降级。
故障现场:200万Lag与“安静”的消费者
排查过程中,第一反应是消费端挂了或者 Broker 存在毛刺。但看了下基础监控,Consumer 所在的 K8S Pod 的 CPU 和内存水位都很低,甚至可以说闲得发慌。
执行 mqadmin consumerProgress 查看消费位点状态:
# sh mqadmin consumerProgress -n x.x.x.x:9876 -g Order_Trade_Consumer_Group
Topic Broker Name QID Broker Offset Consumer Offset Client IP Diff
Trade_Order_Topic broker-a 0 150000 150000 10.0.x.x 0
Trade_Order_Topic broker-a 1 152000 152000 10.0.x.x 0
Trade_Order_Topic broker-a 2 3100500 100500 10.0.x.y 3000000 <-- 剧烈积压
Trade_Order_Topic broker-a 3 149000 149000 10.0.x.y 0
现象很明显:并不是整体消费能力不足,而是 broker-a 的 QID=2 这一个队列卡死了。
进到 10.0.x.y 这个 Pod 抓 jstack,发现大量 RocketMQ 的消费线程处于 TIMED_WAITING 状态:
"ConsumeMessageThread_1" Id=85 RUNNABLE
at java.lang.Thread.sleep(Native Method)
at org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService$ConsumeRequest.run(ConsumeMessageOrderlyService.java:470)
再翻看业务日志,满屏都是同一个报错的死循环:
java.lang.NullPointerException: user_id is null in payload
at com.biz.order.listener.OrderStateMachineListener.consumeMessage(OrderStateMachineListener.java:45)
业务代码极其奔放,直接在 consumeMessage 里抛出了 NPE,既没有 catch,也没有重试次数校验。
底层原理解析:为什么并发消费没事,顺序消费就崩?
很多开发习惯了 RocketMQ 的并发消费(Concurrent)模型。在并发模式下,如果 consumeMessage 抛出异常或返回 RECONSUME_LATER,RocketMQ 会将该消息重新发回 Broker 端的 %RETRY%ConsumerGroup 队列,并推进当前 MessageQueue 的消费位点。这样“毒消息”会被扔到一边,后续消息继续畅通无阻,最多重试 16 次后进入死信队列(DLQ)。
但在顺序消费(Orderly)模型下,游戏规则变了。 顺序消费的核心语义是:前一条消息不消费成功,后一条消息绝对不能处理。
为了保证局部有序,Consumer 在拉取到消息后,会向 Broker 申请锁(RebalanceImpl.lockMQPeriodically),锁定整个 MessageQueue,并生成一个 ProcessQueue。
当 MessageListenerOrderly 抛出异常,或者返回 SUSPEND_CURRENT_QUEUE_A_MOMENT 时,我们看看 RocketMQ 内核是怎么处理的:
// 摘自 ConsumeMessageOrderlyService.java 核心逻辑
public void processConsumeResult(
final ConsumeOrderlyStatus status,
final ConsumeOrderlyContext context,
final ConsumeRequest consumeRequest) {
// ... 前置省略
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
// 检查重试次数
if (checkReconsumeTimes(msgs)) {
// 如果超过最大重试次数,才发往 DLQ 并推进位点
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
}
}
注意这里的 checkReconsumeTimes 逻辑。在并发消费中,默认最大重试次数是 16。但在顺序消费中,DefaultMQPushConsumer.maxReconsumeTimes 的默认值是 -1。
这意味着,只要业务抛出异常,客户端就会把当前 MessageQueue 挂起(默认 sleep 1秒),然后重新把这条消息拿出来再消费一次。无限循环,永不跳过。
业务想要的是局部严格顺序,却没考虑过异常数据的降级处理。这就好比在单行道上,一辆车抛锚了,司机不仅不叫拖车,还坐在车里无限期尝试打火,导致后面的百万车流死死堵住。
毁灭性后果与防御性修复
这种积压是极其致命的。因为 MessageQueue 被无限重试的线程死死锁住,哪怕你重启 Consumer Pod,由于 Rebalance 机制,这批“毒消息”只会漂移到另一个 Pod 上,继续锁死那个 Pod 的消费线程。最终导致整个业务集群在处理特定 Shard Key 时彻底瘫痪。
防御性编程不是挂在嘴边的废话,是不让你半夜爬起来擦屁股的救命稻草。 正确的顺序消息消费姿势,必须具备异常兜底和主动降级能力:
@Component
public class RobustOrderlyListener implements MessageListenerOrderly {
// 严禁无限重试,设定最大容忍次数
private static final int MAX_RETRY_TIMES = 5;
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// 顺序消费默认 batch 为 1
MessageExt msg = msgs.get(0);
try {
// 核心业务逻辑
processBizLogic(msg);
return ConsumeOrderlyStatus.SUCCESS;
} catch (Throwable t) {
// 拦截所有未知的 Throwable,严禁抛出到框架层
int currentRetry = msg.getReconsumeTimes();
log.warn("顺序消息消费异常, msgId:{}, retry:{}", msg.getMsgId(), currentRetry, t);
if (currentRetry >= MAX_RETRY_TIMES) {
log.error("顺序消息重试到达上限,触发熔断降级。写入死信表并跳过. msgId:{}", msg.getMsgId());
try {
// 必须自己实现死信存储逻辑(如写入 DB/Redis/专用重试Topic)
saveToCustomDeadLetter(msg, t);
} catch (Exception e) {
log.error("写入自定义死信队列失败,继续挂起队列", e);
// 仅在降级系统也崩溃时,才允许挂起当前队列
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
// 强制返回 SUCCESS 推进位点,释放队列拥堵
return ConsumeOrderlyStatus.SUCCESS;
}
// 未到重试上限,挂起队列一会再试
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
}
排查清单(同类问题速查)
-
单队列卡死确认:使用
mqadmin consumerProgress检查。如果Diff极高且集中在极少数QID,而其他队列Diff为 0,100% 是局部卡死(顺序消息死锁或单分片数据倾斜严重)。 -
重试次数默认值陷阱:检查 Consumer 初始化代码。如果使用顺序消费且未显式设置
consumer.setMaxReconsumeTimes(次数),默认会进入-1(无限重试)模式。强烈建议根据业务容忍度显式设置为 3~5 次。 -
消费者线程堆栈查验:执行
jstack。如果大量线程长期处于| grep ConsumeMessageOrderlyService TIMED_WAITING或sleep状态,说明业务逻辑正在疯狂触发SUSPEND。 -
毒消息清理:一旦发生雪崩,如果业务代码无法立即修复,可使用
mqadmin resetOffsetByTime强制将卡死队列的消费位点往后拨动(会跳过中间数据,需业务确认可接受),先让后续积压消息流转,事后再通过日志捞回丢失数据。