预计时间
1 周
学习目标
- 理解同步 → 异步 → 事件驱动的演进
- 掌握 Kafka 核心概念(Topic / Partition / Consumer Group)
- 掌握 RabbitMQ 核心概念(Exchange / Queue / Binding)
- 知道什么时候用哪个
- 能设计异步任务流程
一、为什么需要消息队列?
1.1 三层演进
text
【第一层:同步调用】
Web → 上传文档 → 解析 → Embedding → 返回结果
用户上传一个 PDF,要等 30 秒才能看到"处理完成"
问题:用户等太久、解析失败整个请求失败
【第二层:异步】
Web → 上传文档 → 返回"已收到" → 后台慢慢处理
用户立刻看到"处理中",然后轮询或 WebSocket 拿结果
问题:上传服务和解析服务耦合。解析挂了,上传服务处理失败重试逻辑复杂
加一个 Embedding 服务?改上传服务的代码
【第三层:事件驱动(MQ)】
Web → 上传文档 → 发消息到 MQ → 返回"已收到"
MQ → 解析服务(订阅) → 处理完 → 发消息到 MQ
MQ → Embedding 服务(订阅) → 处理完 → 发消息到 MQ
每个服务只关心自己的消息,不关心上下游是谁
好处:解耦、削峰、容错、可扩展1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1.2 MQ 解决的核心问题
| 问题 | MQ 怎么解决 |
|---|---|
| 解耦 | 发布者和订阅者通过 MQ 通信,互不知道对方存在 |
| 削峰 | 高峰期消息堆积在 MQ,消费者按自己的节奏处理 |
| 容错 | 消费者挂了,消息不丢,重启后继续处理 |
| 顺序 | 同一 Partition 内消息有序 |
| 可观测 | 消息堆积量 = 生产速度 - 消费速度,天然监控指标 |
一句话总结
没有 MQ:A 直接调 B,A 要知道 B 在哪、B 挂了怎么办、B 慢了要不要重试。 有了 MQ:A 只往 MQ 发消息,B 只从 MQ 取消息,谁也不认识谁。
二、Kafka
2.1 核心概念
text
Producer ──→ [Topic: document-events]
├── Partition 0: [msg0, msg3, msg6, ...]
├── Partition 1: [msg1, msg4, msg7, ...]
└── Partition 2: [msg2, msg5, msg8, ...]
↓
Consumer Group (解析服务)
├── Consumer A 消费 Partition 0 + 1
└── Consumer B 消费 Partition 2
关键理念:
- 同一 Partition 内消息有序
- 同一 Consumer Group 内,一个 Partition 只能被一个 Consumer 消费
- 不同 Consumer Group 之间互不影响(各自维护 Offset)1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
2.2 概念速记
| 概念 | 一句话 |
|---|---|
| Topic | 消息的分类,类似"主题频道" |
| Partition | Topic 的分片,分布式存储 + 并行消费的基础 |
| Producer | 发消息的 |
| Consumer | 收消息的 |
| Consumer Group | 一组 Consumer,同一组的负载均衡,不同组的各自全量消费 |
| Offset | 消息在 Partition 中的位置,Consumer 记录读到哪了 |
| Broker | Kafka 服务器节点 |
2.3 为什么 Kafka 吞吐量高?
text
1. 顺序写磁盘 — 追加写(append-only),比随机写快几百倍
2. 零拷贝 — sendfile() 系统调用,数据从磁盘直接到网卡,不经过用户态
3. 批量发送 — Producer 攒一批消息一起发
4. 分段存储 — 按 Partition 分段,并行读写1
2
3
4
2
3
4
2.4 适合什么场景?
text
✅ 日志收集、用户行为追踪(高吞吐、可回溯)
✅ 数据管道(ETL → 数据仓库)
✅ 流处理(Kafka Streams / Flink)
✅ 事件溯源(消息可重复消费、永久保存)
❌ 低延迟 RPC(Kafka 有毫秒级延迟,RabbitMQ 更快)
❌ 复杂路由逻辑(RabbitMQ 的 Exchange 更灵活)1
2
3
4
5
6
7
2
3
4
5
6
7
三、RabbitMQ
3.1 核心概念
text
Producer ──→ Exchange ──→ Queue ──→ Consumer
Exchange 有三种类型:
Direct: routing key 完全匹配 → 精确路由
Topic: routing key 通配符匹配 → 灵活路由
Fanout: 广播到所有绑定的 Queue → 发布/订阅
ACK(确认机制):
Consumer 处理完消息 → 发送 ACK → Broker 删除消息
Consumer 挂了没发 ACK → Broker 把消息重新投递给其他 Consumer1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
3.2 概念速记
| 概念 | 一句话 |
|---|---|
| Exchange | 消息路由器,决定消息去哪个 Queue |
| Queue | 消息缓冲区 |
| Binding | Exchange 到 Queue 的绑定规则 |
| Routing Key | Producer 发消息时指定的路由标记 |
| ACK | Consumer 确认"我处理完了" |
3.3 适合什么场景?
text
✅ 业务消息——下单、发通知、发短信
✅ RPC 调用——需要即时响应的异步调用
✅ 复杂路由——按用户级别、地区分发消息
✅ 任务队列——每个任务确处理一次
❌ 海量日志(Kafka 更合适)
❌ 需要回放历史消息(RabbitMQ 消费完就删,Kafka 保留策略更灵活)1
2
3
4
5
6
7
2
3
4
5
6
7
四、Kafka vs RabbitMQ
| Kafka | RabbitMQ | |
|---|---|---|
| 定位 | 分布式流平台 | 消息代理 |
| 吞吐 | 极高(百万条/秒) | 中等(万条/秒) |
| 延迟 | 毫秒级 | 微秒级 |
| 消息持久化 | 默认持久化,可配置保留时间 | 消费后删除 |
| 路由 | 简单(Topic → Partition) | 复杂(Exchange + Binding) |
| 协议 | 自定义 TCP 协议 | AMQP 0-9-1 |
| 消息回溯 | ✅ 天然支持 | ❌ 消费完没了 |
| 运维复杂度 | 高 | 中 |
选型口诀
数据管道用 Kafka,业务消息用 RabbitMQ。
如果你需要:保留所有历史消息、TB 级吞吐 → Kafka 如果你需要:灵活路由、低延迟、简单运维 → RabbitMQ
五、消息可靠性
5.1 三种交付语义
text
at-most-once(最多一次)
消息可能丢,但不会重复
适用:日志收集(丢几条无所谓)
at-least-once(至少一次)← 最常用
消息不会丢,但可能重复
适用:大部分业务场景(配合幂等处理重复)
exactly-once(精确一次)← 最难实现
消息不丢、不重复
适用:金融交易、库存扣减1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
5.2 为什么会有重复?怎么处理?
text
重复的典型场景:
Consumer 处理完了,但网络超时没发 ACK
→ Broker 以为没处理 → 重新投递
→ Consumer 处理了两遍同一个消息
解决:幂等性
- 每条消息带唯一 ID (messageId)
- Consumer 用 Redis 记录处理过的 messageId
- 重复消息来了 → 查 Redis → 已处理 → 直接 ACK 跳过
代码思路:
if (redis.sismember('processed_messages', messageId)) {
channel.ack(msg); // 已处理,直接确认
return;
}
processMessage(msg);
redis.sadd('processed_messages', messageId);
redis.expire('processed_messages', 86400); // 24小时后清理
channel.ack(msg);1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
六、实战:文档处理异步流程
text
用户上传 document.pdf
↓
API 服务: 存储文件 → 发消息到 MQ
↓
Topic: document.uploaded
Message: { documentId, userId, filePath, fileType }
↓
┌─ 解析服务 (Consumer Group: parsers)
│ 监听 document.uploaded
│ 提取文本内容
│ 发消息 → Topic: document.parsed
│ Message: { documentId, text, pageCount }
│
├─ 切分服务 (Consumer Group: chunkers)
│ 监听 document.parsed
│ 文本分块
│ 发消息 → Topic: document.chunked
│ Message: { documentId, chunks: [...] }
│
└─ Embedding 服务 (Consumer Group: embedders)
监听 document.chunked
调用 Embedding API
存入 Vector DB
发消息 → Topic: document.indexed
Message: { documentId, status: 'ready' }
API 服务监听 document.indexed → 通知用户"处理完成"1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
javascript
// 简化版生产者代码(以 Kafka 为例,使用 kafkajs)
const { Kafka } = require('kafkajs');
const kafka = new Kafka({ brokers: ['localhost:9092'] });
const producer = kafka.producer();
async function uploadDocument(documentId, filePath) {
// ... 存储文件 ...
await producer.send({
topic: 'document.uploaded',
messages: [{
key: documentId, // 同一个文档的事件到同一 Partition
value: JSON.stringify({
documentId,
userId: 'user123',
filePath,
fileType: 'pdf',
timestamp: Date.now()
})
}]
});
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
javascript
// 简化版消费者代码
const consumer = kafka.consumer({ groupId: 'parser-group' });
await consumer.subscribe({ topic: 'document.uploaded' });
await consumer.run({
eachMessage: async ({ message }) => {
const { documentId, filePath } = JSON.parse(message.value);
// 幂等检查
if (await isProcessed(documentId)) return;
// 处理
const text = await parseDocument(filePath);
// 记录已处理
await markProcessed(documentId);
// 发下一个事件
await producer.send({
topic: 'document.parsed',
messages: [{ key: documentId, value: JSON.stringify({ documentId, text }) }]
});
}
});1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
实践
- Docker 快速启动 Kafka:bash
docker run -p 9092:9092 apache/kafka:latest1 - 用 kafkajs 或 kafka-node 写一个 Producer 和 Consumer
- 模拟:Consumer 处理一半崩溃 → 重启观察消息是否重新投递
- 加幂等逻辑处理重复消息