Kafka消息重复消费怎么从业务层面做幂等
摘要:# Kafka消息重复消费,别慌!业务层幂等设计,我教你几招“土办法” 做消息队列的,谁没被重复消费坑过?我前两天刚处理一个线上问题,用户投诉说“优惠券怎么领了两次”,一查日志,好家伙,同一个订单ID,Kafka消费者重启那会儿,消息被老老实实处理了两遍…
Kafka消息重复消费,别慌!业务层幂等设计,我教你几招“土办法”
做消息队列的,谁没被重复消费坑过?我前两天刚处理一个线上问题,用户投诉说“优惠券怎么领了两次”,一查日志,好家伙,同一个订单ID,Kafka消费者重启那会儿,消息被老老实实处理了两遍。
说白了,Kafka只保证“至少一次”(at least once)的投递语义。 网络抖动、消费者重启、分区重平衡……随便一个场景都可能让你收到重复消息。指望中间件给你解决?不现实。这活儿,最后还得落到业务代码自己头上。
今天咱不聊那些高深的理论,就说说在业务代码里,怎么用最实在的办法,把“重复消费”变成“重复也没事”。
一、先想明白:你的业务怕重复吗?
不是所有消息都值得大动干戈做幂等。你得先分分类:
- “怕得要死”型:涉及真金白银的。比如支付成功通知、扣减库存、发放唯一优惠券。这种消息重复了,用户要投诉,财务要找你麻烦。
- “有点烦人”型:比如发送通知短信、更新某个统计计数。多发一次,用户可能觉得是系统bug,体验不好,但通常不会造成直接损失。
- “无所谓”型:比如更新用户最后登录时间、记录非关键的操作日志。重复执行一百遍,结果都一样。
我们今天主要对付第一种。对于后两种,有时候为了性能,容忍少量重复反而是更经济的选择。
二、业务幂等的核心思路:给操作加个“身份证”
想防止重复执行,最直白的想法就是:让同一个操作,无论来多少次,都只生效一次。
怎么实现?关键在于,你得能识别出“这是同一个操作”。这就引出了几个非常接地气的方案。
方案1:数据库“唯一索引” —— 简单粗暴,但好用
这是我最推荐优先考虑的方法,尤其适合有天然唯一标识的场景。
举个栗子:你的消息是“订单支付成功”,里面肯定带着order_id吧?处理这条消息的逻辑,通常是要把订单状态从“待支付”更新为“已支付”。
怎么做幂等?
在订单状态更新表(或者专门的消息处理记录表)里,给order_id字段加一个唯一索引。你的业务逻辑伪代码大概长这样:
// 伪代码,意会即可
@Transactional
public void handlePaymentSuccessMessage(String orderId) {
// 1. 先尝试插入一条处理记录
int inserted = messageProcessRecordDao.insertIfNotExists(orderId, "PAY_SUCCESS");
if (inserted == 0) {
// 插入失败,说明这个orderId已经处理过了
log.warn("订单{}的支付成功消息已处理,本次跳过", orderId);
return; // 直接返回,啥也不干
}
// 2. 插入成功,说明是第一次处理,放心执行核心业务
orderService.updateStatusToPaid(orderId);
// ... 其他后续逻辑,比如发券、通知物流等
}
优点:
- 简单:几乎不增加复杂度,利用数据库的特性。
- 可靠:数据库保证的原子性,并发情况下也安全。
- 通用:适用于任何有唯一业务ID的场景。
缺点/要注意的坑:
- 别用“查一下再决定”代替“插入唯一索引”!高并发下,两个请求可能同时查到“不存在”,然后都去执行业务,典型的“先查后改”并发问题。唯一索引(或类似的主键冲突)是防并发的关键。
- 这张表可能会变得很大,需要定期清理老旧数据,或者用可过期的存储(比如下面要说的Redis)。
方案2:Redis“占位符” —— 快就一个字
当你的业务对性能极其敏感,或者唯一标识很多、不想给数据库加太多压力时,Redis是个好帮手。
思路:在处理消息前,去Redis里用SET key value NX(如果不存在才设置)命令占个坑。设置成功了,就执行业务;设置失败了,说明有人处理过了,直接跳过。
public void handleMessageWithRedis(String bizId) {
String key = "msg_processed:" + bizId;
// 尝试设置一个过期时间为1小时的键,如果设置成功,返回true
Boolean firstProcess = redisTemplate.opsForValue().setIfAbsent(key, "1", Duration.ofHours(1));
if (Boolean.FALSE.equals(firstProcess)) {
// 键已存在,重复消息
return;
}
// 键设置成功,第一次处理
doRealBusiness(bizId);
}
优点:
- 速度极快:Redis单命令操作,性能远超数据库。
- 自动过期:不用自己操心清理数据。
缺点:
- 可靠性依赖Redis:如果Redis挂了或者集群发生脑裂,可能导致幂等失效。所以通常用于对一致性要求稍低、但吞吐量极高的场景。
- 需要考虑原子性:确保“设置Redis”和“执行业务”在一个事务里,或者用Lua脚本保证原子性,否则还是可能出问题。
方案3:数据库“乐观锁” —— 适合更新场景
如果你的业务操作本身就是更新某条数据的状态,那用版本号(version)或状态条件做乐观锁,顺带就把幂等实现了。
还是用订单状态更新举例,假设订单表有个version字段。
-- 更新语句
UPDATE orders
SET status = 'PAID', version = version + 1
WHERE order_id = #{orderId} AND status = 'UNPAID';
-- 或者用 version
UPDATE orders
SET status = 'PAID', version = version + 1
WHERE order_id = #{orderId} AND version = #{oldVersion};
执行完这条SQL后,检查affected_rows(受影响的行数)。如果是0,说明要么订单不存在,要么状态已经不是UNPAID(可能之前已经支付过了),本次更新就可以视为重复消费而忽略。
优点:
- 浑然一体:业务逻辑和幂等控制合二为一,非常优雅。
- 无额外开销:不需要额外的表或存储。
缺点:
- 场景受限:只适用于“更新”操作,且更新条件要能区分出“第一次”和“第N次”。
- 需要精心设计:更新条件要设得好,不然可能把正常的业务更新也给拦掉了。
三、几个你可能没想到的“骚操作”与避坑指南
- 幂等Key怎么选? 最好用“业务ID+操作类型”。比如
order_12345_pay和order_12345_refund就是两个不同的操作,应该允许分别执行。只用order_12345,万一支付和退款消息乱序,可能就互相阻塞了。 - 消息本身没有唯一ID怎么办? 那就自己造一个。用几个关键字段拼起来(比如用户ID+商品ID+时间戳),或者更规范点,在生产者端就生成一个全局唯一的
message_id(比如UUID)塞到消息头里,消费者用这个ID做幂等。 - “最终一致性”的坑:你用了数据库唯一索引做幂等,业务也执行了,但后续的异步操作(比如调用外部接口发短信)失败了,整个事务回滚。这时候,你的幂等记录也回滚了。下次重试消息,业务又会执行一遍,可能造成重复发短信。所以,幂等记录最好在业务操作完成后,再单独提交,或者使用异步记录。
- 别过度设计:我看到有些团队,一上来就要搞分布式全局唯一序列号,引入ZooKeeper什么的。对于绝大多数业务,上面那三招真的够用了。先选最简单的,扛不住了再升级。
写在最后
Kafka消息重复消费,本质上是一个业务问题。技术方案只是工具,核心在于你对自己的业务流程和数据流向要有如指掌。
下次再遇到这类问题,别急着去翻Kafka的配置文档。先冷静下来,问自己三个问题:
- 这条消息重复了,最坏的结果是什么?(决定要不要做幂等)
- 这条消息里,有没有天然的唯一标识?(决定用什么做幂等Key)
- 我的业务场景,对一致性和性能的要求到底哪个更高?(决定用数据库还是Redis)
想清楚这些,代码写起来心里就有底了。好了,关于业务层幂等,我的经验差不多就这些。如果你有更奇葩的场景或者更好的“土办法”,欢迎来聊聊。

