1. RocketMQ 架构概览

  • Producer(生产者):负责构造消息并发送到 Broker。

  • Broker(消息存储与转发):核心组件,负责:

    • 接收消息写入存储(CommitLog)

    • 构建消费索引(ConsumeQueue)

    • 提供拉取/推送能力(客户端侧通常是“拉取 + 本地推送式消费”)

  • Consumer(消费者):从 Broker 拉取消息并执行业务逻辑。

  • NameServer:轻量注册中心,维护 Topic 路由信息(Broker 地址、队列信息等)。


2. Producer(生产者)

2.1 发送链路的核心理解

典型写入链路可以理解为:

  1. Producer 发送消息到 Broker

  2. Broker 写入 CommitLog(物理日志,顺序写)

  3. Broker 构建/更新 ConsumeQueue(逻辑队列索引:指向 CommitLog 的偏移量等)

  4. Broker 返回 ACK(或不返回,取决于发送模式/刷盘复制策略)

写入 commit log + 指针索引给到 queue”非常关键:

  • CommitLog:全局顺序写,追求吞吐

  • ConsumeQueue:面向消费检索(topic + queue)快速定位消息


2.2 同步消息(Sync / 串行)

流程(概念) Producer 发送 → Broker(可能涉及主从复制/刷盘策略)→ 返回 ACK → 业务代码继续执行

理解

  • 优点:可靠性更好

  • 缺点:慢;如果不返回 ACK,会阻塞发送线程

机制

  • 发送线程阻塞,等待 Broker 响应(ACK)

失败表现

  • 发送失败(如超时、无可用 Broker、磁盘满等)会抛异常:

    • RemotingException / MQClientException / MQBrokerException(常见)

默认策略(重要)

  • 业务层面:不自动重试(不会替你写 try-catch 重试逻辑)

  • 客户端内部:存在节点切换与一定次数的发送重试(见 2.7 自动容错机制)

开发者责任

  • 必须 try-catch 捕获异常,并决定:

    • 本地重试(谨慎:避免阻塞主线程、避免无限重试)

    • 记录日志 + 走补偿

    • 降级/兜底


2.3 异步消息(Async / 并行)

流程(概念) Producer 发送 → Broker 存储 → 异步回调通知 ACK 同时业务代码不阻塞,可继续执行。

理解

  • 优点:快、吞吐高、调用线程不阻塞

  • 缺点:可靠性取决于你如何处理回调失败;“代码先往下走”意味着业务更容易漏掉异常处理

机制

  • 发送线程不阻塞,通过 SendCallback 回调拿结果

失败表现

  • 回调 onException(Throwable e) 被触发

默认策略

  • 同步/异步都一样:业务逻辑不自动重试,只是通知你失败了

开发者责任(推荐实践)

  • onException 中实现重试与兜底:

    • 失败消息进入本地内存队列(仅适合短期,应用重启会丢)

    • 更推荐:持久化(DB/本地消息表/可靠存储)+ 后台线程指数退避重试


2.4 单向消息(Oneway / 类 UDP)

流程(概念) Producer 发送 → 业务代码继续运行 不等待 ACK,也不关心成功与否。

理解

  • 优点:性能最高、延迟最低

  • 缺点:极不可靠(发送失败你也不知道)

失败表现

  • 没有反馈:即使网络断了、Broker 不可用,也不会抛异常/回调通知

适用场景

  • 日志收集

  • 非关键监控数据

  • 非核心指令(例如“写日志到磁盘”的触发指令等)

禁止场景

  • 订单、支付、库存等核心链路


2.5 延迟消息(Delay)

流程(概念) Producer 发送 → 消息在 Broker 等待(延迟级别/时间)→ 时间到后消费者可见并消费

理解与场景

  • 下单后未支付超时取消

  • 定时任务/延迟触发

  • “最终一致性”的补偿触发(例如 X 分钟后校验状态)

适当拓展: 延迟消息在 RocketMQ 常见实现是“延迟级别”,不同版本/配置支持的延迟精度不同;需要结合你们版本确认是固定等级还是支持任意时间。


2.6 批量消息(Batch)

流程(概念) Producer 一次发送多条消息集合 → 打包发送到同一 Broker / 同一 Topic / 同一 Queue

理解

  • 合理的批量可以减少网络 IO 与 RPC 次数,提高吞吐

  • 注意:批量消息通常要求同 topic、同 queue 策略一致,且单次包大小受限制(例如 4MB 限制常见)


2.7 顺序消息(Orderly / 局部有序)

  • RocketMQ 顺序消息通常只能保证局部有序

    • 同一 Topic 默认多个队列(如 4 条队列)

    • 通过 producer.send(msg, MessageQueueSelector, arg)

      • 常见策略:hash(key) % queueNum

      • 保证同一业务 key 的消息进入同一队列,从而在队列内保持顺序

关键理解

  • “顺序消息”本质是在队列粒度保证生产顺序一致,但消费端如果并发消费仍可能乱序(见 3.1)


2.8 RocketMQ 客户端的自动容错机制

虽然代码层面需要自己写重试,但 RocketMQ 客户端底层有重要的自动容错:

  • 场景:发送指定 Master,但该节点宕机/网络不通

  • 行为:客户端会检测异常,并尝试发送到该 Topic 的其它可用 Broker 节点(通常是同组其他 Master)

  • 配置:受 retryTimesWhenSendFailed(同步)/ retryTimesWhenSendAsyncFailed(异步)等参数影响(默认通常会尝试若干次,比如 2 次)

  • 局限:如果整个集群不可用,最终仍会失败并抛异常/回调异常

注意:这类“内部重试/切换”并不等同于你业务上的“保证最终投递成功”。 业务想要“最终一致性”,仍要靠补偿架构(见 2.9)。


2.9 消息发送失败/丢失的补偿策略(高可靠架构)

生产环境中只在 try-catch 里 Thread.sleep 重试是不够的,因为应用重启后内存重试任务会丢失

推荐方案:本地消息表(Local Message Table)+ 定时补偿

这是保证消息最终一致性的常见“黄金标准”(尤其金融/核心业务)。

(1) 本地事务:业务数据与消息记录同事务落库

  • 在业务数据库创建 message_log

  • 在执行核心业务(如扣减库存)的同一个本地事务中:

    1. 写入业务数据

    2. 插入消息记录,状态为 SENDING

  • 提交事务

    • 业务数据与消息要么同时成功,要么同时失败

(2) 发送消息:事务提交后投递 MQ

  • 提交后调用 producer.send()

  • 成功:更新 message_log 状态为 SENT

  • 失败:保持 SENDING,记录异常信息

(3) 定时补偿(关键)

  • 后台定时任务(例如每 5 秒扫描一次)

  • 查询 SENDING 且创建时间超过阈值(避免并发竞争)

  • 重发消息

  • 成功:更新为 SENT

  • 超过最大次数(如 10 次):标记 FAILED,报警 + 人工处理

优点

  • 不丢消息:只要 DB 不挂,消息记录就在;应用重启仍可继续补偿

  • 解耦:主业务流程不需要陷入复杂重试等待

死信队列(DLQ)作为最后兜底

  • 开启/关注死信队列

  • 人工排查、回放或修复数据


3. Consumer(消费者)

3.1 消费监听器:并发 vs 顺序

3.1.1 并发消费(默认)

consumer.registerMessageListener(new MessageListenerConcurrently(){...})
  • 默认并发模式:每个 consumer 通常有 20 个线程(常见默认值)

  • 对一个 topic(或 topic+tag)消费时,会对队列(默认如 4 条)进行轮询消费,类似 CPU 时间片轮转,提高吞吐

3.1.2 顺序消费

consumer.registerMessageListener(new MessageListenerOrderly(){...})
  • 疑问: Producer 已经把同一业务 key 的消息放进同一队列了,那 consumer 并发模式不是也能读到顺序吗?顺序模式还有什么意义?

核心答案

  • 顺序消息只能保证进入同一队列从而保证“队列内的存储顺序”。

  • 但并发消费下,同一队列的多条消息可能被多个线程同时处理,导致完成顺序不同,从而业务表现乱序:

    • 例如队列顺序 a、b、c、d

    • 并发执行耗时不同,可能变成 c 先完成,然后 b、d、a ……

    • 结果:业务侧还是乱序

RocketMQ 顺序消费的机制

  • RocketMQ 的顺序消费基于 MessageQueue(队列)粒度的锁,不是 consumer 实例的全局锁

  • 即使配置有 20 个线程:

    • 同一时刻每个队列最多由 1 个线程串行消费

    • 因此并发上限 ≈ 队列数

示例:Topic 有 4 个队列

  • Queue0 被线程 A 锁定并串行处理

  • Queue1 被线程 B 锁定并串行处理

  • Queue2 被线程 C 锁定并串行处理

  • Queue3 被线程 D 锁定并串行处理 其余线程空闲等待

关键点

  • Queue0 的慢不会影响 Queue1/2/3

  • 但 Queue0 内部一定是串行执行,保证顺序


3.2 消费失败后的策略:重试与死信队列

并发模式

  • 默认重试 16 次

  • 仍失败 → 进入死信队列(DLQ)

顺序模式

  • 无限重试(Integer.MAX_VALUE

总体流程

消费失败 → 自动重试(普通消息默认 16 次,顺序消息无限次) → 死信队列(DLQ) → 人工处理 / 也可自定义 consumer 定时消费 DLQ(例如 Topic:%DLQ%retry-consumer-group → 最终报警(邮件/告警平台)给运维/值班


4. Topic 与 Tag:为什么需要 Tag?

  • Topic:对一大类消息的划分(如 Order-Topic

  • Tag:对 Topic 下更细粒度的子分类

    • 例 carOrder、houseOrder 本质都是 order

    • 如果每个子业务都拆成一个 Topic 会冗余、管理成本高

    • 因此用 Tag 在同一 Topic 下再细分

补充:Tag 也常用于消费端的过滤与路由,但要注意不要把 Tag 当成“无限维度的字段过滤”,复杂条件更适合用 Properties + SQL92 过滤(见 6.3)。


5. 消费模型与消息生命周期

5.1 消费关系:Group 之间广播、Group 内竞争

  • 同一 Topic 被不同订阅它的消费者组(Consumer Group)消费时:

    • 组与组之间:广播关系(每个 Group 都会消费到同一条消息)

    • 组内多个 Consumer 实例:竞争关系(同一条消息只会被组内一个实例消费)


5.2 消息被消费完会消失吗?

不会。消息有自己的生命周期与清理策略:

  1. 若设置 TTL,则按 TTL 过期

  2. 若未设置 TTL,则常见默认保留时间为 72h(以实际配置为准)

  3. 磁盘写满 → 触发过期删除/拒绝写入等策略(见 9)

消费完成后,Group 只是在维护一个 offset(消费进度)

  • Broker 端消息仍保留在 CommitLog / ConsumeQueue 中

  • offset 类似“书签”,指向下一个待消费消息

  • 原消息不会因为“被消费”就立刻删除


5.3 同一消费者组内的消费模式

  1. 负载均衡(默认)

    • 一条消息只会投递给该 group 中的一个 consumer

    • 组内共享 offset

  2. 广播

    • 一条消息会投递给同 group 的所有 consumer

    • 每个 consumer 有自己的 offset

    • 应用场景:

      • 本地缓存同步(如每台机器刷新 Caffeine)

      • 本地日志/埋点上报

      • 配置热更新

    • WARNING:广播没有重试机制!


5.4 消费者组如何划分(重要约束)

同一消费者组(同一 group)通常必须满足:

  1. group 名称相同

  2. 订阅范围必须完全一致

    • 如订阅 *(所有 Tags)

    • 或订阅某 Tag

    • Tag1 || Tag2 如果订阅范围不同,可能导致:

    • 消息紊乱

    • 消息丢失

    • 重复消费


6. 消息结构:Message / MessageExt

RocketMQ 的消息结构不仅是“头 + 体”,更明确可分为三部分:

  1. Message Header(消息头):系统级元数据(路由、存储、基础控制)

  2. Message Body(消息体):业务数据(byte[])

  3. Message Properties(消息属性):扩展元数据(系统内置 + 用户自定义)

代码层面:

  • Producer 发送:Message

  • Consumer 接收:MessageExt(扩展了存储侧元数据)


6.1 Message Header(消息头)

特点:字段相对固定,由 RocketMQ 自动维护;用户通常不直接改大多数头字段(或不需要改)。

常见字段:

  • topic:主题(决定路由)

  • tags:标签(用于二级过滤)

  • keys:业务键(如订单号),便于查询轨迹

  • flag:生产者设置标记位,框架不处理

  • waitStoreMsgOK:是否等待存储确认(刷盘/复制相关)

  • transactionId:事务消息 ID (事务场景)

注:某些版本实现上 topic/tags/keys 可能落在 properties 中存储,但 SDK 逻辑上把它们视作标准头字段。


6.2 Message Body(消息体)

  • 类型:byte[]

  • 内容:JSON、Protobuf、图片流、或任意序列化数据

  • 限制:默认最大 4MB(可调但不建议过大,会影响吞吐与内存)

示例:

String body = "OrderID:1001,Action:PAY";
message.setBody(body.getBytes(RemotingHelper.DEFAULT_CHARSET));

6.3 Message Properties(消息属性)

特点:Map<String, String>,最灵活的扩展点。

A. 系统内置属性(示例)

  • UNIQ_KEY:消息全局唯一 ID(系统自动生成)

  • TAGS:对应 tags

  • KEYS:对应 keys

  • WAIT_STORE_MSG_OK:是否等待存储成功

  • TRAN_MSG:是否事务消息

  • PGROUP:生产者组

  • DELAY_TIME_LEVEL:延迟级别

  • RETRY_TOPIC:重试目标 Topic

  • MAX_RECONSUME_TIMES:最大重试次数

B. 用户自定义属性(支持 SQL92 过滤)

message.putUserProperty("orderId", "1001");
message.putUserProperty("userLevel", "VIP");
message.putUserProperty("region", "CN-HANGZHOU");

消费端过滤示例:

consumer.subscribe("TopicTest", "userLevel == 'VIP' && region == 'CN-HANGZHOU'");

6.4 Consumer 视角:MessageExt 增强字段

MessageExt 增加 Broker 存储相关元数据(发送时不存在,存储后生成):

  • msgId:Broker 全局唯一存储 ID(不同于 UNIQ_KEY)→ 精确查询/去重

  • queueOffset:在 ConsumeQueue 的逻辑偏移 → 记录消费进度

  • storeTimestamp:存入 Broker 时间 → 计算堆积时长

  • bornTimestamp:消息创建时间 → 计算链路耗时

  • storeHost / bornHost:存储/发送地址 → 追踪来源

  • reconsumeTimes:重试次数 → 判断是否重试消息

  • commitLogOffset:在 CommitLog 的物理偏移 → 底层定位


6.5 示例:查看完整结构

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            // 1. 获取消息体 (需要自己反序列化)
            String body = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
​
            // 2. 获取标准头信息
            String topic = msg.getTopic();
            String tags = msg.getTags();
            String keys = msg.getKeys();
​
            // 3. 获取所有属性 (包含系统属性和自定义属性)
            Map<String, String> props = msg.getProperties();
            String uniqKey = props.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
            String myBizField = props.get("myCustomField");
​
            // 4. 获取存储元数据 (MessageExt 特有)
            long offset = msg.getQueueOffset();
            Date storeTime = new Date(msg.getStoreTimestamp());
            int retryTimes = msg.getReconsumeTimes();
​
            System.out.println("收到消息: " + body);
            System.out.println("唯一ID: " + uniqKey + ", 重试次数: " + retryTimes);
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

实践建议 即使有 msgId / UNIQ_KEY,也建议业务自己生成唯一业务 key(如雪花、UUID、业务名+机房+IP+时间等),用于:

  • 业务追踪与排查

  • 幂等去重

  • 对账与回放


7. 消息重复消费问题(幂等性)

7.1 为什么会出现重复消费?

  1. 生产者多次投递

    • MQ 网络延迟/抖动

    • Producer 未收到 ACK 误以为失败而重试

    • 导致重复发送

  2. 消费者组扩容导致 Rebalance 重复消费

    • 例如 group 中原有 3 个 consumer 消费 4 个队列(1、1、2 分配)

    • 新增一个 consumer → 触发 rebalance → 变为 1、1、1、1

    • 因为消费过程不是原子:

      • 先执行业务,再提交 offset(或相反,具体实现/调用点导致风险)

      • 若“业务已完成但 offset 未提交”就发生 rebalance,新的 consumer 可能继续从旧 offset 拉取,造成重复消费


7.2 如何解决重复消费(核心:幂等)

  • 使用唯一标识(msgId / keys / 自定义业务 key)做去重

  • 推荐把去重 key 落到 MySQL(更稳),Redis 也可但要考虑:

    • 内存成本

    • Redis 故障导致去重信息丢失

推荐流程

  1. 准备去重表(例如 msg_dedup),把 key 设为 UNIQUE

  2. 消费前先尝试 INSERT key

    • 插入成功:说明首次消费 → 执行业务逻辑

    • 插入失败(唯一键冲突):说明重复 → 直接 ACK/跳过

  3. 若业务执行失败:

    • 回滚业务

    • 删除去重表对应 key(避免“失败但被认为已消费”)

适当拓展: 在高并发下注意“先去重再执行业务”的事务边界设计,避免出现“去重成功但业务失败”的不一致窗口(你已写到失败要 delete,方向正确)。


8. 消息堆积(积压)

8.1 什么是消息堆积?

Broker 队列中未被消费的消息大量堆积:

  • 生产速度 V(produce) >>> 消费速度 V(consume)

  • 单队列 offset 积压达到很大数量(例如 >= 10w)


8.2 为什么会出现堆积?

  1. 消费者实例太少 / 线程池小 / 接口耗时长(慢查询)

  2. 消费阻塞:死循环、死锁、数据库连接池耗尽等待

  3. 流量突增

  4. 硬件/网络问题

  5. 队列分配不均(消费热点)


8.3 怎么解决?

  1. 增加消费者实例、增加 Topic 队列数

    • 消费者数量 <= 队列数量,多了也没用(抢不到队列)

  2. 调大消费线程数(线程池调整)

    • RocketMQ 的消费线程数基本对应业务并发度

    • 想快就调大,想稳就调小

    • 不要随意再套一层“自建异步线程池”把问题复杂化(除非数据库连接池等确实需要隔离)

  3. 跳过非核心消息(如日志类)或降级处理

  4. 优化 SQL、减少 IO 耗时

  5. Producer 限流(从源头削峰)


9. 如何确保消息不丢失?

9.1 消息丢失的常见原因

  1. RocketMQ 理论上可做到非常高可靠;真丢多半是业务代码/配置问题

  2. 发送失败无兜底:异常不处理

  3. 使用单向发送(oneway)

  4. Broker 默认异步刷盘:写入内存 Buffer/PageCache 后未落盘即宕机

  5. 主从同步时丢失:主写入内存但未完成同步复制,主挂导致丢

  6. 消息堆积导致过期(默认 72h)

  7. 磁盘写满(例如达到 85%)拒绝写入


9.2 解决方案(按可靠性增强链路)

  1. 核心业务开启同步刷盘

    • 只有真正写入磁盘才返回 ACK(更慢但更稳)

  2. 主从复制开启同步双写/同步复制

    • 主必须等待从同步完成才返回 ACK(主线程会阻塞)

  3. 核心业务消息 TTL 设置更长(或调整保留策略)

  4. 运维与监控:

    • 实时监控磁盘水位、写入失败、堆积量、DLQ

    • 定期备份与演练


9.3 “Broker 明明有消息,但消费者消费丢了”常见原因

  1. 业务代码漏洞:先提交 offset,后执行业务,业务出错 offset 无法回滚

  2. 异常被吞:catch 了但手动返回成功 ACK

  3. 广播模式容易丢(且无重试机制)

  4. 不处理死信队列,不做兜底补偿


10. 最小落地清单(把关键点变成检查项)

这一节是“适当拓展”:不减少原内容的前提下,把笔记里的关键实践收敛成可执行检查项。

  • 核心链路禁止 oneway

  • Producer 发送失败必须有兜底(本地消息表/补偿任务)

  • Consumer 必须幂等(去重表/唯一业务 key)

  • 顺序业务必须使用 MessageListenerOrderly(且队列选择策略固定)

  • DLQ 必须有监控 + 人工/自动处理流程

  • 堆积监控(topic、group、queue 维度)与扩容预案

  • 磁盘水位监控(写满拒写是灾难性问题)

  • 广播模式只用于允许丢的场景(“无重试”)


两块二每分钟