kafka_learn
磊磊

Kafka 学习项目

这是一个用于学习 Apache Kafka 的 Spring Boot 项目,通过不同的 Topic 区分不同的学习场景。

技术栈

  • Java 8
  • Spring Boot 2.7.18
  • Spring Kafka

快速开始

1. 启动 Kafka(Docker)

1
2
cd dockerInneed
docker-compose up -d

2. 启动 Spring Boot 应用

1
mvn spring-boot:run

3. 测试接口

场景 接口
字符串消息 http://localhost:11111/string/test
JSON 对象 http://localhost:11111/json/test

项目结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
src/main/java/com/example/kafkalearn/

├── KafkaLearnApplication.java # 主启动类

├── entity/
│ └── Order.java # 订单实体类(用于 JSON 场景)

├── config/
│ └── KafkaJsonConfig.java # Kafka 配置(字符串 + JSON)

├── controller/
│ ├── HelloController.java # 基础测试接口
│ ├── StringKafkaController.java # 字符串消息接口
│ └── JsonKafkaController.java # JSON 对象接口

└── kafka/
├── KafkaProducerService.java # 字符串生产者
├── KafkaConsumerService.java # 字符串消费者

└── json/
├── JsonProducerService.java # JSON 生产者
└── JsonConsumerService.java # JSON 消费者

学习场景说明

场景1:字符串消息(learn-topic)

学习内容:Kafka 基础 - 发送和接收简单的字符串消息

组件 文件 说明
Producer KafkaProducerService.java 发送字符串到 Kafka
Consumer KafkaConsumerService.java 从 Kafka 接收字符串
Controller StringKafkaController.java HTTP 测试接口

测试接口

1
2
3
4
5
# 快速测试
curl --noproxy '*' "http://localhost:11111/string/test"

# 自定义消息
curl --noproxy '*' "http://localhost:11111/string/send?message=HelloKafka"

场景2:JSON 对象(json-topic)

学习内容:进阶 - 发送和接收 Java 对象(自动序列化/反序列化)

组件 文件 说明
Entity Order.java 订单实体类
Producer JsonProducerService.java 发送 Order 对象到 Kafka
Consumer JsonConsumerService.java 从 Kafka 接收 Order 对象
Controller JsonKafkaController.java HTTP 测试接口
Config KafkaJsonConfig.java JSON 序列化配置

测试接口

1
2
3
4
5
# 快速测试(自动生成订单)
curl --noproxy '*' "http://localhost:11111/json/test"

# 自定义订单
curl --noproxy '*' "http://localhost:11111/json/order?orderId=ORD001&productName=iPhone&price=7999"

Topic 一览

Topic Group 用途
learn-topic learn-group 字符串消息学习
json-topic json-group JSON 对象学习

Kafka 核心概念

概念 说明
Producer 生产者,发送消息到 Kafka
Consumer 消费者,从 Kafka 接收消息
Topic 主题,消息的分类(你发到哪个 Topic,消费者就从哪个 Topic 收)
Consumer Group 消费者组,同组内消息只被一个消费者处理

常见问题 FAQ

Q1:Producer 会重复发送吗?

不用担心,Kafka 自动处理。

Spring Kafka 默认开启「幂等 Producer」,通过消息序号自动去重。


Q2:Consumer 会重复消费吗?

会!但不是每条都重复,是偶尔发生。

1
2
3
4
5
6
7
8
9
正常流程:
消费消息 → 处理完 → 自动 commit → Kafka 记录"已消费"

重复消费发生在:
消费消息 → 处理完 → 还没 commit → 应用突然挂了/重启

Kafka 不知道你处理完了

重新投递这条消息 → 重复消费!

需要你自己做幂等处理!


Q3:Redis 幂等和数据库幂等有什么区别?

对比 Redis SETNX 数据库查 ID
速度 快(内存) 慢(磁盘)
并发安全 ✅ 原子操作 ❌ 有并发漏洞
持久性 会过期 永久保存
推荐用法 第一道防线(快速过滤) 兜底(数据库唯一索引)

Q4:消费失败后怎么重试?

默认不会重试! 抛异常后消息直接丢了。

为什么?因为自动提交机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
┌─────────────────────────────────────────────────────────┐
│ Consumer 内部 │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ 主线程 │ │ 后台线程 │ │
│ │ 处理消息 │ │ 定时提交 │ │
│ └─────────────────┘ └────────┬────────┘ │
│ │ │ │
│ │ 处理中... │ 每5秒执行 │
│ │ 抛异常了! │ │
│ │ ▼ │
│ │ "我消费到第100条了" │
│ │ │ │
│ │ ▼ │
│ │ ┌─────────────┐ │
│ │ │ Kafka │ │
│ │ │ 记录offset │ │
│ │ └─────────────┘ │
│ │
│ 主线程处理失败了,但后台线程不管,5秒到了照样提交 │
│ 结果:Kafka 以为你处理完了,消息就"丢"了 │
│ │
└─────────────────────────────────────────────────────────┘

关键理解

  • 自动提交是 Consumer 的后台线程定时(每5秒)把 offset 发给 Kafka
  • 后台线程不管主线程处理成功没有,时间到了就提交
  • 所以抛异常也会被提交,消息就丢了

解决方案:自己写业务重试逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@KafkaListener(topics = "order-topic")
public void listen(Order order) {
int maxRetry = 3;

for (int i = 1; i <= maxRetry; i++) {
try {
orderService.process(order);
return; // 成功,退出
} catch (Exception e) {
log.warn("第{}次处理失败: {}", i, e.getMessage());
if (i == maxRetry) {
// 彻底失败,记录到失败表,后续人工处理
saveToFailedTable(order, e.getMessage());
}
}
}
}

Q5:完整的消费者最佳实践?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@KafkaListener(topics = "order-topic")
public void listen(Order order) {
String key = "processed:" + order.getOrderId();

// 1. 幂等检查(Redis 快速过滤)
Boolean isNew = redis.opsForValue().setIfAbsent(key, "PROCESSING", 5, TimeUnit.MINUTES);
if (!isNew) {
log.info("订单已处理/处理中,跳过: {}", order.getOrderId());
return;
}

// 2. 业务重试
int maxRetry = 3;
for (int i = 1; i <= maxRetry; i++) {
try {
orderService.process(order);

// 3. 成功:标记完成(长过期时间)
redis.opsForValue().set(key, "SUCCESS", 24, TimeUnit.HOURS);
return;

} catch (Exception e) {
log.warn("第{}次处理失败: {}", i, e.getMessage());
if (i == maxRetry) {
// 4. 彻底失败:删除标记,记录失败
redis.delete(key);
saveToFailedTable(order, e.getMessage());
}
}
}
}

Q6:acks 配置是什么?

Producer 发消息时,等 Kafka 确认的级别:

acks 含义 速度 安全性
0 发了不管,不等确认 最快 可能丢
1 等 Leader 确认 中等 较安全
all 等所有副本确认 最慢 最安全

副本:Kafka 把数据复制到多台服务器,防止某台挂了数据丢失。


一句话总结

问题 答案
Producer 重复发送? Kafka 自动处理,不用管
Consumer 重复消费? 会偶尔发生,需要你做幂等
怎么做幂等? Redis SETNX + 数据库唯一索引
消费失败怎么办? 代码里 for 循环重试,失败记录到表

Docker 服务

服务 端口 说明
Kafka 9092 消息队列服务
Zookeeper 2181 Kafka 协调服务
Kafka UI 8080 Web 管理界面

访问 http://localhost:8080 可以查看 Kafka 集群状态、Topic、消息等。


后续学习计划

  • 消息确认机制(ACK)
  • 消费者组详解
  • 分区与并行消费
  • 消息重试与死信队列
由 Hexo 驱动 & 主题 Keep
访客数 访问量