Skip to content

预计时间

1 周

学习目标

  • 理解同步 → 异步 → 事件驱动的演进
  • 掌握 Kafka 核心概念(Topic / Partition / Consumer Group)
  • 掌握 RabbitMQ 核心概念(Exchange / Queue / Binding)
  • 知道什么时候用哪个
  • 能设计异步任务流程

一、为什么需要消息队列?

1.1 三层演进

text
【第一层:同步调用】
  Web → 上传文档 → 解析 → Embedding → 返回结果
  用户上传一个 PDF,要等 30 秒才能看到"处理完成"

  问题:用户等太久、解析失败整个请求失败

【第二层:异步】
  Web → 上传文档 → 返回"已收到" → 后台慢慢处理
  用户立刻看到"处理中",然后轮询或 WebSocket 拿结果

  问题:上传服务和解析服务耦合。解析挂了,上传服务处理失败重试逻辑复杂
        加一个 Embedding 服务?改上传服务的代码

【第三层:事件驱动(MQ)】
  Web → 上传文档 → 发消息到 MQ → 返回"已收到"
  MQ → 解析服务(订阅) → 处理完 → 发消息到 MQ
  MQ → Embedding 服务(订阅) → 处理完 → 发消息到 MQ
  每个服务只关心自己的消息,不关心上下游是谁

  好处:解耦、削峰、容错、可扩展

1.2 MQ 解决的核心问题

问题MQ 怎么解决
解耦发布者和订阅者通过 MQ 通信,互不知道对方存在
削峰高峰期消息堆积在 MQ,消费者按自己的节奏处理
容错消费者挂了,消息不丢,重启后继续处理
顺序同一 Partition 内消息有序
可观测消息堆积量 = 生产速度 - 消费速度,天然监控指标

一句话总结

没有 MQ:A 直接调 B,A 要知道 B 在哪、B 挂了怎么办、B 慢了要不要重试。 有了 MQ:A 只往 MQ 发消息,B 只从 MQ 取消息,谁也不认识谁。


二、Kafka

2.1 核心概念

text
Producer ──→ [Topic: document-events]
                   ├── Partition 0: [msg0, msg3, msg6, ...]
                   ├── Partition 1: [msg1, msg4, msg7, ...]
                   └── Partition 2: [msg2, msg5, msg8, ...]

Consumer Group (解析服务)
  ├── Consumer A 消费 Partition 0 + 1
  └── Consumer B 消费 Partition 2

关键理念:
  - 同一 Partition 内消息有序
  - 同一 Consumer Group 内,一个 Partition 只能被一个 Consumer 消费
  - 不同 Consumer Group 之间互不影响(各自维护 Offset)

2.2 概念速记

概念一句话
Topic消息的分类,类似"主题频道"
PartitionTopic 的分片,分布式存储 + 并行消费的基础
Producer发消息的
Consumer收消息的
Consumer Group一组 Consumer,同一组的负载均衡,不同组的各自全量消费
Offset消息在 Partition 中的位置,Consumer 记录读到哪了
BrokerKafka 服务器节点

2.3 为什么 Kafka 吞吐量高?

text
1. 顺序写磁盘 — 追加写(append-only),比随机写快几百倍
2. 零拷贝 — sendfile() 系统调用,数据从磁盘直接到网卡,不经过用户态
3. 批量发送 — Producer 攒一批消息一起发
4. 分段存储 — 按 Partition 分段,并行读写

2.4 适合什么场景?

text
✅ 日志收集、用户行为追踪(高吞吐、可回溯)
✅ 数据管道(ETL → 数据仓库)
✅ 流处理(Kafka Streams / Flink)
✅ 事件溯源(消息可重复消费、永久保存)

❌ 低延迟 RPC(Kafka 有毫秒级延迟,RabbitMQ 更快)
❌ 复杂路由逻辑(RabbitMQ 的 Exchange 更灵活)

三、RabbitMQ

3.1 核心概念

text
Producer ──→ Exchange ──→ Queue ──→ Consumer

Exchange 有三种类型:
  Direct:  routing key 完全匹配 → 精确路由
  Topic:   routing key 通配符匹配 → 灵活路由
  Fanout:  广播到所有绑定的 Queue → 发布/订阅

ACK(确认机制):
  Consumer 处理完消息 → 发送 ACK → Broker 删除消息
  Consumer 挂了没发 ACK → Broker 把消息重新投递给其他 Consumer

3.2 概念速记

概念一句话
Exchange消息路由器,决定消息去哪个 Queue
Queue消息缓冲区
BindingExchange 到 Queue 的绑定规则
Routing KeyProducer 发消息时指定的路由标记
ACKConsumer 确认"我处理完了"

3.3 适合什么场景?

text
✅ 业务消息——下单、发通知、发短信
✅ RPC 调用——需要即时响应的异步调用
✅ 复杂路由——按用户级别、地区分发消息
✅ 任务队列——每个任务确处理一次

❌ 海量日志(Kafka 更合适)
❌ 需要回放历史消息(RabbitMQ 消费完就删,Kafka 保留策略更灵活)

四、Kafka vs RabbitMQ

KafkaRabbitMQ
定位分布式流平台消息代理
吞吐极高(百万条/秒)中等(万条/秒)
延迟毫秒级微秒级
消息持久化默认持久化,可配置保留时间消费后删除
路由简单(Topic → Partition)复杂(Exchange + Binding)
协议自定义 TCP 协议AMQP 0-9-1
消息回溯✅ 天然支持❌ 消费完没了
运维复杂度

选型口诀

数据管道用 Kafka,业务消息用 RabbitMQ。

如果你需要:保留所有历史消息、TB 级吞吐 → Kafka 如果你需要:灵活路由、低延迟、简单运维 → RabbitMQ


五、消息可靠性

5.1 三种交付语义

text
at-most-once(最多一次)
  消息可能丢,但不会重复
  适用:日志收集(丢几条无所谓)

at-least-once(至少一次)← 最常用
  消息不会丢,但可能重复
  适用:大部分业务场景(配合幂等处理重复)

exactly-once(精确一次)← 最难实现
  消息不丢、不重复
  适用:金融交易、库存扣减

5.2 为什么会有重复?怎么处理?

text
重复的典型场景:
  Consumer 处理完了,但网络超时没发 ACK
  → Broker 以为没处理 → 重新投递
  → Consumer 处理了两遍同一个消息

解决:幂等性
  - 每条消息带唯一 ID (messageId)
  - Consumer 用 Redis 记录处理过的 messageId
  - 重复消息来了 → 查 Redis → 已处理 → 直接 ACK 跳过

代码思路:
  if (redis.sismember('processed_messages', messageId)) {
    channel.ack(msg);  // 已处理,直接确认
    return;
  }
  processMessage(msg);
  redis.sadd('processed_messages', messageId);
  redis.expire('processed_messages', 86400);  // 24小时后清理
  channel.ack(msg);

六、实战:文档处理异步流程

text
用户上传 document.pdf

API 服务: 存储文件 → 发消息到 MQ

Topic: document.uploaded
  Message: { documentId, userId, filePath, fileType }

┌─ 解析服务 (Consumer Group: parsers)
│   监听 document.uploaded
│   提取文本内容
│   发消息 → Topic: document.parsed
│   Message: { documentId, text, pageCount }

├─ 切分服务 (Consumer Group: chunkers)
│   监听 document.parsed
│   文本分块
│   发消息 → Topic: document.chunked
│   Message: { documentId, chunks: [...] }

└─ Embedding 服务 (Consumer Group: embedders)
    监听 document.chunked
    调用 Embedding API
    存入 Vector DB
    发消息 → Topic: document.indexed
    Message: { documentId, status: 'ready' }

API 服务监听 document.indexed → 通知用户"处理完成"
javascript
// 简化版生产者代码(以 Kafka 为例,使用 kafkajs)
const { Kafka } = require('kafkajs');

const kafka = new Kafka({ brokers: ['localhost:9092'] });
const producer = kafka.producer();

async function uploadDocument(documentId, filePath) {
  // ... 存储文件 ...

  await producer.send({
    topic: 'document.uploaded',
    messages: [{
      key: documentId,          // 同一个文档的事件到同一 Partition
      value: JSON.stringify({
        documentId,
        userId: 'user123',
        filePath,
        fileType: 'pdf',
        timestamp: Date.now()
      })
    }]
  });
}
javascript
// 简化版消费者代码
const consumer = kafka.consumer({ groupId: 'parser-group' });

await consumer.subscribe({ topic: 'document.uploaded' });

await consumer.run({
  eachMessage: async ({ message }) => {
    const { documentId, filePath } = JSON.parse(message.value);

    // 幂等检查
    if (await isProcessed(documentId)) return;

    // 处理
    const text = await parseDocument(filePath);

    // 记录已处理
    await markProcessed(documentId);

    // 发下一个事件
    await producer.send({
      topic: 'document.parsed',
      messages: [{ key: documentId, value: JSON.stringify({ documentId, text }) }]
    });
  }
});

实践

  1. Docker 快速启动 Kafka:
    bash
    docker run -p 9092:9092 apache/kafka:latest
  2. 用 kafkajs 或 kafka-node 写一个 Producer 和 Consumer
  3. 模拟:Consumer 处理一半崩溃 → 重启观察消息是否重新投递
  4. 加幂等逻辑处理重复消息