RabbitMQ 深入浅出

- RabbitMQ MQ

RabbitMQ 知识架构

1. AMQP 协议基础

RabbitMQ 是 AMQP 0-9-1 协议的实现,其核心模型基于信道(Channel)、**交换机(Exchange)队列(Queue)**的协作。


2. 消息流转原理

消息从生产者到消费者的完整生命周期如下:

(1)生产者发送消息

  1. 连接建立:生产者通过 TCP 连接到 RabbitMQ 节点,并创建信道。
  2. 消息封装:消息包含有效负载(Playload) + 元数据(Headers、Routing Key、Delivery Mode 等)
  3. 发送到交换器:生产者通过basic.publish 方法将消息发送到指定的交换器。
    • 关键参数:exchange (目标交换器)、routing_key (路由键)、mandatory (消息无法路由时是否返回错误)。

(2)交换器路由消息

交换器根据类型绑定规则将消息路由到队列:

(3)队列存储消息

消息进入队列后,按**先进先出(FIFO)**顺序存储,但优先级队列(Priority Queue)可调整顺序。

(4)消息者拉取消息

  1. 订阅队列:消费者通过basic.consume 订阅队列,进入阻塞等待状态。
  2. 消息推送:RabbitMQ 通过basic.deliver 方法主动推送消息到消费者(Push 模式)。
    • 消费者也可通过basic.get 主动拉取消息(Pull 模式),但效率较低。
  3. 消息确认(ACK)
    • 自动 ACK:消息发送后立即从队列删除,风险高(消息可能未处理成功)。
    • 手动 ACK:消费者处理完成后发送basic.ack ,队列删除消息;若未 ACK 且连接断开,消息重新入队(Redelivery)。

3. 持久化与可靠性机制

(1)消息持久化

(2)发布者确认(Publisher Confrim)


4. 死信队列(DLX)原理

当消息无法被正常消费时,会被转发到私信交换器(Dead Letter Exchange)


5. 集群与高可用原理

(1)集群架构

(2)镜像队列(Mirrored Queue)

通过策略(Policy)定义 ha-mode(如 all),队列数据在集群节点间同步,基于 Raft 协议选举主节点。

(3)网络分区处理


6. 内存与磁盘管理


7. 性能优化原理

(1)预取(QoS)

(2)批量操作


8. RPC 模式原理

  +---------+          +------------+          +---------+
  | Client  | --(1)--> | RabbitMQ   | --(2)--> | Server  |
  |         | <--(4)-- | (RPC Queue)| <--(3)-- |         |
  +---------+          +------------+          +---------+
  步骤:
  1. 客户端发送请求到 RPC 队列,携带 `reply_to` 和 `correlation_id`。
  2. 服务端消费请求,处理完成后将结果发送到 `reply_to` 队列。
  3. 客户端监听 `reply_to` 队列,通过 `correlation_id` 匹配响应。

9. 流量削峰原理

10. 保证消息有序性

消息有序性

11. 高可用

高可用


12. 重要知识点

  1. 为什么使用消息队列
    • 核心场景:解耦(服务间异步通信)、削峰(缓冲高并发请求)、异步(非阻塞处理)。
    • 缺点:系统可用性降低(依赖 MQ)、复杂性增加(需处理消息丢失、重复、顺序性问题)。
  2. 如何保证消息不丢失?
    • 生产者:使用 Confirm 模式(异步确认)或事务(同步确认)。
    • Broker:持久化队列与消息,结合镜像队列。
    • 消费者:关闭自动 ACK,处理完成后手动发送 ACK。
  3. 如何避免消息重复消费?
    • 生产者去重:MQ 内部生成inner-msg-id 拦截重复投递。
    • 消费者幂等:业务层通过唯一标识(如订单 ID)校验,结合数据库唯一键或 Redis 原子操作。
  4. RabbitMQ 集群模式?
    • 普通集群:元数据同步,但队列数据仅存于创建节点,故障时需重新拉取数据。
    • 镜像集群:队列数据全节点同步,高可用但性能开销大,拓展性差。
  5. 消息顺序性如何保证?
    • 单队列消费者:同一业务的消息路由到同一队列,由单个消费者顺序处理。
    • 全局序列号:消息中添加序号,消费者按序处理。
  6. 消息堆积如何处理?
    • 临时扩容:增加队列 Partition 和消费者实例,并行消费积压数据。
    • 批量导出:将积压消息导出至临时 Topic,分片处理。