Kafka 学习项目
这是一个用于学习 Apache Kafka 的 Spring Boot 项目,通过不同的 Topic 区分不同的学习场景。
技术栈
- Java 8
- Spring Boot 2.7.18
- Spring Kafka
快速开始
1. 启动 Kafka(Docker)
1 2
| cd dockerInneed docker-compose up -d
|
2. 启动 Spring Boot 应用
3. 测试接口
项目结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| src/main/java/com/example/kafkalearn/ │ ├── KafkaLearnApplication.java # 主启动类 │ ├── entity/ │ └── Order.java # 订单实体类(用于 JSON 场景) │ ├── config/ │ └── KafkaJsonConfig.java # Kafka 配置(字符串 + JSON) │ ├── controller/ │ ├── HelloController.java # 基础测试接口 │ ├── StringKafkaController.java # 字符串消息接口 │ └── JsonKafkaController.java # JSON 对象接口 │ └── kafka/ ├── KafkaProducerService.java # 字符串生产者 ├── KafkaConsumerService.java # 字符串消费者 │ └── json/ ├── JsonProducerService.java # JSON 生产者 └── JsonConsumerService.java # JSON 消费者
|
学习场景说明
场景1:字符串消息(learn-topic)
学习内容:Kafka 基础 - 发送和接收简单的字符串消息
| 组件 |
文件 |
说明 |
| Producer |
KafkaProducerService.java |
发送字符串到 Kafka |
| Consumer |
KafkaConsumerService.java |
从 Kafka 接收字符串 |
| Controller |
StringKafkaController.java |
HTTP 测试接口 |
测试接口:
1 2 3 4 5
| curl --noproxy '*' "http://localhost:11111/string/test"
curl --noproxy '*' "http://localhost:11111/string/send?message=HelloKafka"
|
场景2:JSON 对象(json-topic)
学习内容:进阶 - 发送和接收 Java 对象(自动序列化/反序列化)
| 组件 |
文件 |
说明 |
| Entity |
Order.java |
订单实体类 |
| Producer |
JsonProducerService.java |
发送 Order 对象到 Kafka |
| Consumer |
JsonConsumerService.java |
从 Kafka 接收 Order 对象 |
| Controller |
JsonKafkaController.java |
HTTP 测试接口 |
| Config |
KafkaJsonConfig.java |
JSON 序列化配置 |
测试接口:
1 2 3 4 5
| curl --noproxy '*' "http://localhost:11111/json/test"
curl --noproxy '*' "http://localhost:11111/json/order?orderId=ORD001&productName=iPhone&price=7999"
|
Topic 一览
| Topic |
Group |
用途 |
learn-topic |
learn-group |
字符串消息学习 |
json-topic |
json-group |
JSON 对象学习 |
Kafka 核心概念
| 概念 |
说明 |
| Producer |
生产者,发送消息到 Kafka |
| Consumer |
消费者,从 Kafka 接收消息 |
| Topic |
主题,消息的分类(你发到哪个 Topic,消费者就从哪个 Topic 收) |
| Consumer Group |
消费者组,同组内消息只被一个消费者处理 |
常见问题 FAQ
Q1:Producer 会重复发送吗?
不用担心,Kafka 自动处理。
Spring Kafka 默认开启「幂等 Producer」,通过消息序号自动去重。
Q2:Consumer 会重复消费吗?
会!但不是每条都重复,是偶尔发生。
1 2 3 4 5 6 7 8 9
| 正常流程: 消费消息 → 处理完 → 自动 commit → Kafka 记录"已消费"
重复消费发生在: 消费消息 → 处理完 → 还没 commit → 应用突然挂了/重启 ↓ Kafka 不知道你处理完了 ↓ 重新投递这条消息 → 重复消费!
|
需要你自己做幂等处理!
Q3:Redis 幂等和数据库幂等有什么区别?
| 对比 |
Redis SETNX |
数据库查 ID |
| 速度 |
快(内存) |
慢(磁盘) |
| 并发安全 |
✅ 原子操作 |
❌ 有并发漏洞 |
| 持久性 |
会过期 |
永久保存 |
| 推荐用法 |
第一道防线(快速过滤) |
兜底(数据库唯一索引) |
Q4:消费失败后怎么重试?
默认不会重试! 抛异常后消息直接丢了。
为什么?因为自动提交机制:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| ┌─────────────────────────────────────────────────────────┐ │ Consumer 内部 │ │ │ │ ┌─────────────────┐ ┌─────────────────┐ │ │ │ 主线程 │ │ 后台线程 │ │ │ │ 处理消息 │ │ 定时提交 │ │ │ └─────────────────┘ └────────┬────────┘ │ │ │ │ │ │ │ 处理中... │ 每5秒执行 │ │ │ 抛异常了! │ │ │ │ ▼ │ │ │ "我消费到第100条了" │ │ │ │ │ │ │ ▼ │ │ │ ┌─────────────┐ │ │ │ │ Kafka │ │ │ │ │ 记录offset │ │ │ │ └─────────────┘ │ │ │ │ 主线程处理失败了,但后台线程不管,5秒到了照样提交 │ │ 结果:Kafka 以为你处理完了,消息就"丢"了 │ │ │ └─────────────────────────────────────────────────────────┘
|
关键理解:
- 自动提交是 Consumer 的后台线程定时(每5秒)把 offset 发给 Kafka
- 后台线程不管主线程处理成功没有,时间到了就提交
- 所以抛异常也会被提交,消息就丢了
解决方案:自己写业务重试逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @KafkaListener(topics = "order-topic") public void listen(Order order) { int maxRetry = 3; for (int i = 1; i <= maxRetry; i++) { try { orderService.process(order); return; } catch (Exception e) { log.warn("第{}次处理失败: {}", i, e.getMessage()); if (i == maxRetry) { saveToFailedTable(order, e.getMessage()); } } } }
|
Q5:完整的消费者最佳实践?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @KafkaListener(topics = "order-topic") public void listen(Order order) { String key = "processed:" + order.getOrderId(); Boolean isNew = redis.opsForValue().setIfAbsent(key, "PROCESSING", 5, TimeUnit.MINUTES); if (!isNew) { log.info("订单已处理/处理中,跳过: {}", order.getOrderId()); return; } int maxRetry = 3; for (int i = 1; i <= maxRetry; i++) { try { orderService.process(order); redis.opsForValue().set(key, "SUCCESS", 24, TimeUnit.HOURS); return; } catch (Exception e) { log.warn("第{}次处理失败: {}", i, e.getMessage()); if (i == maxRetry) { redis.delete(key); saveToFailedTable(order, e.getMessage()); } } } }
|
Q6:acks 配置是什么?
Producer 发消息时,等 Kafka 确认的级别:
| acks |
含义 |
速度 |
安全性 |
0 |
发了不管,不等确认 |
最快 |
可能丢 |
1 |
等 Leader 确认 |
中等 |
较安全 |
all |
等所有副本确认 |
最慢 |
最安全 |
副本:Kafka 把数据复制到多台服务器,防止某台挂了数据丢失。
一句话总结
| 问题 |
答案 |
| Producer 重复发送? |
Kafka 自动处理,不用管 |
| Consumer 重复消费? |
会偶尔发生,需要你做幂等 |
| 怎么做幂等? |
Redis SETNX + 数据库唯一索引 |
| 消费失败怎么办? |
代码里 for 循环重试,失败记录到表 |
Docker 服务
| 服务 |
端口 |
说明 |
| Kafka |
9092 |
消息队列服务 |
| Zookeeper |
2181 |
Kafka 协调服务 |
| Kafka UI |
8080 |
Web 管理界面 |
访问 http://localhost:8080 可以查看 Kafka 集群状态、Topic、消息等。
后续学习计划