前言
今天我们来聊聊一个让很多开发者头疼的话题——MQ消息丢失问题。
有些小伙伴在工作中,一提到消息队列就觉得很简单,但真正遇到线上消息丢失时,排查起来却让人抓狂。
其实,我在实际工作中,也遇到过MQ消息丢失的情况。
今天这篇文章,专门跟大家一起聊聊这个话题,希望对你会有所帮助。
一、消息丢失的三大环节
在深入解决方案之前,我们先搞清楚消息在哪几个环节可能丢失:
1. 生产者发送阶段
网络抖动导致发送失败
生产者宕机未发送
Broker处理失败未返回确认
2. Broker存储阶段
内存消息未持久化,重启丢失
磁盘故障导致数据丢失
集群切换时消息丢失
3. 消费者处理阶段
自动确认模式下处理异常
消费者宕机处理中断
手动确认但忘记确认
理解了问题根源,接下来我们看5种实用的解决方案。
二、方案一:生产者确认机制
核心原理
生产者发送消息后等待Broker确认,确保消息成功到达。
这是防止消息丢失的第一道防线。
关键实现
// RabbitMQ生产者确认配置
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
// 消息成功到达Broker
messageStatusService.markConfirmed(correlationData.getId());
} else {
// 发送失败,触发重试
retryService.scheduleRetry(correlationData.getId());
}
});
return template;
}
// 可靠发送方法
public void sendReliable(String exchange, String routingKey, Object message) {
String messageId = generateId();
// 先落库保存发送状态
messageStatusService.saveSendingStatus(messageId, message);
// 发送持久化消息
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
msg.getMessageProperties().setMessageId(messageId);
return msg;
}, new CorrelationData(messageId));
}
适用场景
对消息可靠性要求高的业务
金融交易、订单处理等关键业务
需要精确知道消息发送结果的场景
三、方案二:消息持久化机制
核心原理
将消息保存到磁盘,确保Broker重启后消息不丢失。
这是防止Broker端消息丢失的关键。
关键实现
// 持久化队列配置
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.queue") // 队列持久化
.deadLetterExchange("order.dlx") // 死信交换机
.build();
}
// 发送持久化消息
public void sendPersistentMessage(Object message) {
rabbitTemplate.convertAndSend("order.exchange", "order.create", message, msg -> {
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 消息持久化
return msg;
});
}
// Kafka持久化配置
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 所有副本确认
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性
return new DefaultKafkaProducerFactory<>(props);
}
优缺点
优点:
有效防止Broker重启导致的消息丢失
配置简单,效果明显
缺点:
磁盘IO影响性能
需要足够的磁盘空间
四、方案三:消费者确认机制
核心原理
消费者处理完消息后手动向Broker发送确认,Broker收到确认后才删除消息。
这是保证消息不丢失的最后一道防线。
关键实现
// 手动确认消费者
@RabbitListener(queues = "order.queue")
public void handleMessage(Order order, Message message, Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 业务处理
orderService.processOrder(order);
// 手动确认
channel.basicAck(deliveryTag, false);
log.info("消息处理完成: {}", order.getOrderId());
} catch (Exception e) {
log.error("消息处理失败: {}", order.getOrderId(), e);
// 处理失败,重新入队
channel.basicNack(deliveryTag, false, true);
}
}
// 消费者容器配置
@Bean
public SimpleRabbitListenerContainerFactory containerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动确认
factory.setPrefetchCount(10); // 预取数量
factory.setConcurrentConsumers(3); // 并发消费者
return factory;
}
注意事项
确保业务处理完成后再确认
合理设置预取数量,避免内存溢出
处理异常时要正确使用NACK
五、方案四:事务消息机制
核心原理
通过事务保证本地业务操作和消息发送的原子性,要么都成功,要么都失败。
关键实现
// 本地事务表方案
@Transactional
public void createOrder(Order order) {
// 1. 保存订单到数据库
orderRepository.save(order);
// 2. 保存消息到本地消息表
LocalMessage localMessage = new LocalMessage();
localMessage.setBusinessId(order.getOrderId());
localMessage.setContent(JSON.toJSONString(order));
localMessage.setStatus(MessageStatus.PENDING);
localMessageRepository.save(localMessage);
// 3. 事务提交,本地业务和消息存储保持一致性
}
// 定时任务扫描并发送消息
@Scheduled(fixedDelay = 5000)
public void sendPendingMessages() {
List<LocalMessage> pendingMessages = localMessageRepository.findByStatus(MessageStatus.PENDING);
for (LocalMessage message : pendingMessages) {
try {
// 发送消息到MQ
rabbitTemplate.convertAndSend("order.exchange", "order.create", message.getContent());
// 更新消息状态为已发送
message.setStatus(MessageStatus.SENT);
localMessageRepository.save(message);
} catch (Exception e) {
log.error("发送消息失败: {}", message.getId(), e);
}
}
}
// RocketMQ事务消息
public void sendTransactionMessage(Order order) {
TransactionMQProducer producer = new TransactionMQProducer("order_producer");
// 发送事务消息
Message msg = new Message("order_topic", "create",
JSON.toJSONBytes(order));
TransactionSendResult result = producer.sendMessageInTransaction(msg, null);
if (result.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {
log.info("事务消息提交成功");
}
}
适用场景
需要严格保证业务和消息一致性的场景
分布式事务场景
金融、电商等对数据一致性要求高的业务
六、方案五:消息重试与死信队列
核心原理
通过重试机制处理临时故障,通过死信队列处理最终无法消费的消息。
关键实现
// 重试队列配置
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.queue")
.withArgument("x-dead-letter-exchange", "order.dlx") // 死信交换机
.withArgument("x-dead-letter-routing-key", "order.dead")
.withArgument("x-message-ttl", 60000) // 60秒后进入死信
.build();
}
// 死信队列配置
@Bean
public Queue orderDeadLetterQueue() {
return QueueBuilder.durable("order.dead.queue").build();
}
// 消费者重试逻辑
@RabbitListener(queues = "order.queue")
public void handleMessageWithRetry(Order order, Message message, Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
orderService.processOrder(order);
channel.basicAck(deliveryTag, false);
} catch (TemporaryException e) {
// 临时异常,重新入队重试
channel.basicNack(deliveryTag, false, true);