河源市网站建设_网站建设公司_域名注册_seo优化
2025/12/21 23:35:35 网站建设 项目流程

前言

今天我们来聊聊一个让很多开发者头疼的话题——MQ消息丢失问题。

有些小伙伴在工作中,一提到消息队列就觉得很简单,但真正遇到线上消息丢失时,排查起来却让人抓狂。

其实,我在实际工作中,也遇到过MQ消息丢失的情况。

今天这篇文章,专门跟大家一起聊聊这个话题,希望对你会有所帮助。

一、消息丢失的三大环节

在深入解决方案之前,我们先搞清楚消息在哪几个环节可能丢失:

1. 生产者发送阶段

网络抖动导致发送失败

生产者宕机未发送

Broker处理失败未返回确认

2. Broker存储阶段

内存消息未持久化,重启丢失

磁盘故障导致数据丢失

集群切换时消息丢失

3. 消费者处理阶段

自动确认模式下处理异常

消费者宕机处理中断

手动确认但忘记确认

理解了问题根源,接下来我们看5种实用的解决方案。

二、方案一:生产者确认机制

核心原理

生产者发送消息后等待Broker确认,确保消息成功到达。

这是防止消息丢失的第一道防线。

关键实现

// RabbitMQ生产者确认配置

@Bean

public RabbitTemplate rabbitTemplate() {

RabbitTemplate template = new RabbitTemplate(connectionFactory);

template.setConfirmCallback((correlationData, ack, cause) -> {

if (ack) {

// 消息成功到达Broker

messageStatusService.markConfirmed(correlationData.getId());

} else {

// 发送失败,触发重试

retryService.scheduleRetry(correlationData.getId());

}

});

return template;

}

// 可靠发送方法

public void sendReliable(String exchange, String routingKey, Object message) {

String messageId = generateId();

// 先落库保存发送状态

messageStatusService.saveSendingStatus(messageId, message);

// 发送持久化消息

rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {

msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);

msg.getMessageProperties().setMessageId(messageId);

return msg;

}, new CorrelationData(messageId));

}

适用场景

对消息可靠性要求高的业务

金融交易、订单处理等关键业务

需要精确知道消息发送结果的场景

三、方案二:消息持久化机制

核心原理

将消息保存到磁盘,确保Broker重启后消息不丢失。

这是防止Broker端消息丢失的关键。

关键实现

// 持久化队列配置

@Bean

public Queue orderQueue() {

return QueueBuilder.durable("order.queue") // 队列持久化

.deadLetterExchange("order.dlx") // 死信交换机

.build();

}

// 发送持久化消息

public void sendPersistentMessage(Object message) {

rabbitTemplate.convertAndSend("order.exchange", "order.create", message, msg -> {

msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 消息持久化

return msg;

});

}

// Kafka持久化配置

@Bean

public ProducerFactory<String, Object> producerFactory() {

Map<String, Object> props = new HashMap<>();

props.put(ProducerConfig.ACKS_CONFIG, "all"); // 所有副本确认

props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性

return new DefaultKafkaProducerFactory<>(props);

}

优缺点

优点:

有效防止Broker重启导致的消息丢失

配置简单,效果明显

缺点:

磁盘IO影响性能

需要足够的磁盘空间

四、方案三:消费者确认机制

核心原理

消费者处理完消息后手动向Broker发送确认,Broker收到确认后才删除消息。

这是保证消息不丢失的最后一道防线。

关键实现

// 手动确认消费者

@RabbitListener(queues = "order.queue")

public void handleMessage(Order order, Message message, Channel channel) {

long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {

// 业务处理

orderService.processOrder(order);

// 手动确认

channel.basicAck(deliveryTag, false);

log.info("消息处理完成: {}", order.getOrderId());

} catch (Exception e) {

log.error("消息处理失败: {}", order.getOrderId(), e);

// 处理失败,重新入队

channel.basicNack(deliveryTag, false, true);

}

}

// 消费者容器配置

@Bean

public SimpleRabbitListenerContainerFactory containerFactory() {

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动确认

factory.setPrefetchCount(10); // 预取数量

factory.setConcurrentConsumers(3); // 并发消费者

return factory;

}

注意事项

确保业务处理完成后再确认

合理设置预取数量,避免内存溢出

处理异常时要正确使用NACK

五、方案四:事务消息机制

核心原理

通过事务保证本地业务操作和消息发送的原子性,要么都成功,要么都失败。

关键实现

// 本地事务表方案

@Transactional

public void createOrder(Order order) {

// 1. 保存订单到数据库

orderRepository.save(order);

// 2. 保存消息到本地消息表

LocalMessage localMessage = new LocalMessage();

localMessage.setBusinessId(order.getOrderId());

localMessage.setContent(JSON.toJSONString(order));

localMessage.setStatus(MessageStatus.PENDING);

localMessageRepository.save(localMessage);

// 3. 事务提交,本地业务和消息存储保持一致性

}

// 定时任务扫描并发送消息

@Scheduled(fixedDelay = 5000)

public void sendPendingMessages() {

List<LocalMessage> pendingMessages = localMessageRepository.findByStatus(MessageStatus.PENDING);

for (LocalMessage message : pendingMessages) {

try {

// 发送消息到MQ

rabbitTemplate.convertAndSend("order.exchange", "order.create", message.getContent());

// 更新消息状态为已发送

message.setStatus(MessageStatus.SENT);

localMessageRepository.save(message);

} catch (Exception e) {

log.error("发送消息失败: {}", message.getId(), e);

}

}

}

// RocketMQ事务消息

public void sendTransactionMessage(Order order) {

TransactionMQProducer producer = new TransactionMQProducer("order_producer");

// 发送事务消息

Message msg = new Message("order_topic", "create",

JSON.toJSONBytes(order));

TransactionSendResult result = producer.sendMessageInTransaction(msg, null);

if (result.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {

log.info("事务消息提交成功");

}

}

适用场景

需要严格保证业务和消息一致性的场景

分布式事务场景

金融、电商等对数据一致性要求高的业务

六、方案五:消息重试与死信队列

核心原理

通过重试机制处理临时故障,通过死信队列处理最终无法消费的消息。

关键实现

// 重试队列配置

@Bean

public Queue orderQueue() {

return QueueBuilder.durable("order.queue")

.withArgument("x-dead-letter-exchange", "order.dlx") // 死信交换机

.withArgument("x-dead-letter-routing-key", "order.dead")

.withArgument("x-message-ttl", 60000) // 60秒后进入死信

.build();

}

// 死信队列配置

@Bean

public Queue orderDeadLetterQueue() {

return QueueBuilder.durable("order.dead.queue").build();

}

// 消费者重试逻辑

@RabbitListener(queues = "order.queue")

public void handleMessageWithRetry(Order order, Message message, Channel channel) {

long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {

orderService.processOrder(order);

channel.basicAck(deliveryTag, false);

} catch (TemporaryException e) {

// 临时异常,重新入队重试

channel.basicNack(deliveryTag, false, true);

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询