濮阳市网站建设_网站建设公司_SSG_seo优化
2025/12/18 11:08:10 网站建设 项目流程

# 本地消息表vs事务消息:异步方案怎么选?

在分布式事务的异步方案中,“本地消息表”和“事务消息”是最常用的两种——它们都基于“消息传递”实现最终一致性,适合“非实时依赖”的场景(如订单创建后异步通知库存扣减、物流派单)。
但两者的实现方式、侵入性、可靠性差异很大:前者靠数据库表“硬扛”消息可靠性,后者靠消息队列的原生机制“优雅”解决。

今天我们就拆解这两种方案的核心逻辑、实现步骤,对比它们的优缺点,并附上本地消息表的数据库设计和RocketMQ事务消息的实战代码,帮你搞懂“什么时候该用哪种”。

一、本地消息表:用数据库“硬扛”消息可靠性

本地消息表的核心思想很“朴素”:将跨服务的分布式事务,转化为“业务表+消息表”的本地事务。通过数据库的ACID特性保证“业务操作和消息记录”的原子性,再通过独立线程将消息投递到消息队列,最终达成一致。

1. 核心原理

假设场景:电商订单创建后,需要异步通知库存服务扣减库存。本地消息表的流程如下:

本地消息表流程

  1. 订单服务在本地数据库中,除了“订单表”,额外创建一张“消息表”(与订单表同库);
  2. 订单服务执行“创建订单”时,在同一个本地事务中,同时向“订单表”插入订单记录、向“消息表”插入“待发送”的库存扣减消息(确保订单创建和消息记录要么全成功,要么全失败);
  3. 订单服务启动独立的消息投递线程(或定时任务),不断扫描“消息表”中“待发送”状态的消息,投递到消息队列;
  4. 库存服务消费消息队列中的消息,执行扣减库存操作,完成后调用订单服务的“消息确认接口”;
  5. 订单服务收到确认后,将消息表中对应消息的状态改为“已完成”。

2. 实现步骤(附数据库设计)

步骤1:设计消息表(核心!)

消息表需记录消息的基本信息、状态、重试次数等,确保消息不丢失、不重复。

表结构设计

CREATE TABLE `local_message` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
`message_id` varchar(64) NOT NULL COMMENT '消息唯一ID(UUID)',
`business_id` varchar(64) NOT NULL COMMENT '业务ID(如订单ID)',
`topic` varchar(128) NOT NULL COMMENT '消息队列主题(如stock_topic)',
`content` text NOT NULL COMMENT '消息内容(JSON格式,如{"orderId":"1001","goodsId":5,"num":2})',
`status` tinyint(4) NOT NULL COMMENT '状态:0-待发送,1-发送中,2-已完成,3-发送失败',
`retry_count` int(11) NOT NULL DEFAULT 0 COMMENT '重试次数(默认0,最大3次)',
`next_retry_time` datetime DEFAULT NULL COMMENT '下次重试时间(用于退避策略)',
`create_time` datetime NOT NULL COMMENT '创建时间',
`update_time` datetime NOT NULL COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_message_id` (`message_id`) COMMENT '保证消息唯一,防重复插入',
KEY `idx_status_next_retry` (`status`,`next_retry_time`) COMMENT '优化消息扫描效率'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT '本地消息表';

设计说明

  • message_id:全局唯一,用于幂等(避免重复投递);
  • status:核心字段,控制消息流转(待发送→发送中→已完成);
  • retry_countnext_retry_time:实现重试机制(失败后隔一段时间重试,避免频繁重试压垮系统);
  • 索引:uk_message_id防重复,idx_status_next_retry加速“待发送”消息的扫描。
步骤2:本地事务中同时操作业务表和消息表

订单服务创建订单时,在同一个事务中插入订单和消息:

@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private LocalMessageMapper messageMapper;
@Transactional
public void createOrder(OrderDTO orderDTO) {
// 1. 创建订单(业务操作)
Order order = new Order();
order.setOrderId(orderDTO.getOrderId());
order.setUserId(orderDTO.getUserId());
order.setStatus("PENDING");
orderMapper.insert(order);
// 2. 插入消息表(与业务操作在同一事务)
LocalMessage message = new LocalMessage();
message.setMessageId(UUID.randomUUID().toString());
message.setBusinessId(orderDTO.getOrderId());
message.setTopic("stock_topic");
message.setContent(JSON.toJSONString(new StockMessage(orderDTO.getOrderId(), orderDTO.getGoodsId(), orderDTO.getNum())));
message.setStatus(0); // 待发送
message.setCreateTime(new Date());
message.setUpdateTime(new Date());
messageMapper.insert(message);
}
}
步骤3:消息投递线程(确保消息发出去)

独立线程扫描“待发送”消息,投递到消息队列(以RabbitMQ为例):

@Component
public class MessageSenderTask {
@Autowired
private LocalMessageMapper messageMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
// 定时任务,每10秒扫描一次
@Scheduled(fixedRate = 10000)
public void sendPendingMessages() {
// 1. 查询待发送且重试次数<3的消息(加行锁,避免并发处理)
List<LocalMessage> messages = messageMapper.selectPendingMessages(3);for (LocalMessage msg : messages) {try {// 2. 标记为“发送中”messageMapper.updateStatus(msg.getId(), 1); // 状态:1-发送中// 3. 投递消息到队列rabbitTemplate.convertAndSend(msg.getTopic(), msg.getContent());// 4. 投递成功,标记为“已完成”messageMapper.updateStatus(msg.getId(), 2); // 状态:2-已完成} catch (Exception e) {// 5. 投递失败,更新重试次数和下次重试时间(退避策略:10s→30s→60s)int newRetryCount = msg.getRetryCount() + 1;Date nextRetryTime = new Date(System.currentTimeMillis() + getRetryDelay(newRetryCount) * 1000);messageMapper.updateRetryInfo(msg.getId(), newRetryCount, nextRetryTime, 3); // 状态:3-发送失败}}}// 退避策略:重试次数越多,间隔越长private int getRetryDelay(int retryCount) {return retryCount == 1 ? 10 : (retryCount == 2 ? 30 : 60);}}
步骤4:接收方处理消息(确保消息被消费)

库存服务消费消息,扣减库存,并调用“消息确认接口”:

// 库存服务消费消息
@Component
public class StockMessageConsumer {
@Autowired
private StockMapper stockMapper;
@Autowired
private OrderMessageClient orderMessageClient; // 调用订单服务的Feign接口
@RabbitListener(queues = "stock_queue")
public void handleMessage(String content) {
StockMessage msg = JSON.parseObject(content, StockMessage.class);
try {
// 1. 扣减库存(幂等处理:查是否已处理过该消息)
if (stockMapper.existsProcessedMessage(msg.getMessageId())) {
return; // 已处理,直接返回
}
stockMapper.deductStock(msg.getGoodsId(), msg.getNum());
// 2. 记录已处理消息(幂等用)
stockMapper.insertProcessedMessage(msg.getMessageId(), msg.getOrderId());
// 3. 调用订单服务的确认接口(可选,本地消息表可依赖定时任务最终确认)
orderMessageClient.confirmMessage(msg.getMessageId());
} catch (Exception e) {
// 消费失败,消息队列会重试(需开启RabbitMQ的重试机制)
throw new AmqpRejectAndDontRequeueException("处理失败,等待重试", e);
}
}
}

3. 核心优势与致命缺陷

优势:
缺陷:
  • 业务侵入性强:需要在业务库中新增消息表,且业务代码需耦合消息操作(如创建订单时必须插入消息);
  • 性能瓶颈:消息表与业务表同库,高频写入可能影响业务库性能;消息投递依赖定时任务扫描,有延迟(秒级);
  • 运维成本高:需维护消息表的索引、清理历史数据,避免表过大影响扫描效率。

二、事务消息:消息队列“原生支持”的优雅方案

事务消息是消息队列(如RocketMQ、RabbitMQ通过插件)提供的原生功能,核心通过“半消息+本地事务确认”机制,保证“业务操作和消息发送”的原子性,无需侵入业务数据库。

1. 核心原理

还是“订单创建后通知库存扣减”的场景,RocketMQ事务消息的流程如下:

RocketMQ事务消息流程

  1. 订单服务向RocketMQ发送“半消息”(Half Message):半消息是一种特殊消息,发送后被消息队列暂存,接收方不可见
  2. RocketMQ确认收到半消息后,订单服务执行本地事务(创建订单);
  3. 订单服务根据本地事务结果,向RocketMQ发送“确认(Commit)”或“回滚(Rollback)”指令:
    • 若本地事务成功(订单创建成功),发送“Commit”,半消息变为“可见”,库存服务可消费;
    • 若本地事务失败(订单创建失败),发送“Rollback”,半消息被删除;
  4. 若RocketMQ长时间未收到确认指令(如订单服务宕机),会主动调用订单服务的“事务回查接口”,确认本地事务状态,再决定Commit或Rollback。

2. 实现步骤(RocketMQ实战)

步骤1:引入RocketMQ依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
步骤2:配置RocketMQ生产者
rocketmq:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv地址
producer:
group: order_producer_group # 生产者组
步骤3:发送半消息+执行本地事务

订单服务实现RocketMQLocalTransactionListener接口,处理半消息发送后的本地事务和回查:

@Service
public class OrderTransactionService implements RocketMQLocalTransactionListener {
@Autowired
private OrderMapper orderMapper;
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 发送半消息
public void createOrderWithTransaction(OrderDTO orderDTO) {
// 1. 构建半消息(消息体+事务ID)
Message<StockMessage> message = MessageBuilder.withPayload(new StockMessage(orderDTO.getOrderId(), orderDTO.getGoodsId(), orderDTO.getNum())).setHeader(RocketMQHeaders.TRANSACTION_ID, orderDTO.getOrderId()) // 用订单ID作为事务ID.build();// 2. 发送半消息(指定topic和监听器)rocketMQTemplate.sendMessageInTransaction("stock_trans_topic", // 事务消息主题message, // 消息体orderDTO // 额外参数(传给executeLocalTransaction方法));}// 执行本地事务(半消息发送成功后回调)@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {OrderDTO orderDTO = (OrderDTO) arg;try {// 执行本地事务:创建订单Order order = new Order();order.setOrderId(orderDTO.getOrderId());order.setUserId(orderDTO.getUserId());order.setStatus("PENDING");orderMapper.insert(order);// 本地事务成功,返回COMMITreturn RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {// 本地事务失败,返回ROLLBACKreturn RocketMQLocalTransactionState.ROLLBACK;}}// 事务回查(RocketMQ未收到确认时调用)@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {// 从消息头获取事务ID(订单ID)String orderId = msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID, String.class);// 查订单是否存在:存在→COMMIT,不存在→ROLLBACKOrder order = orderMapper.selectById(orderId);return order != null ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;}}
步骤4:接收方消费消息(确保幂等)

库存服务消费事务消息,扣减库存(需处理重复消费):

@Component
public class StockTransactionConsumer {
@Autowired
private StockMapper stockMapper;
@RocketMQMessageListener(
topic = "stock_trans_topic",
consumerGroup = "stock_consumer_group"
)
public class StockConsumer implements RocketMQListener<StockMessage> {@Overridepublic void onMessage(StockMessage msg) {// 幂等处理:查是否已消费过该消息(用messageId)if (stockMapper.existsConsumedMessage(msg.getMessageId())) {return;}// 扣减库存stockMapper.deductStock(msg.getGoodsId(), msg.getNum());// 记录已消费消息stockMapper.insertConsumedMessage(msg.getMessageId(), msg.getOrderId());}}}

3. 核心优势与局限

优势:
局限:
  • 依赖中间件:仅支持原生支持事务消息的队列(如RocketMQ、RabbitMQ+插件),Kafka需额外开发;
  • 学习成本高:需理解半消息、回查机制等概念,调试比本地消息表复杂;
  • 不适合长事务:回查机制有超时限制(如RocketMQ默认15分钟),超长时间的本地事务可能被误判。

三、本地消息表vs事务消息:对比与选型

维度本地消息表事务消息(RocketMQ)
核心依赖数据库(业务库+消息表)支持事务消息的消息队列
业务侵入性高(需加消息表,改业务代码)低(仅需消息发送/监听代码)
可靠性高(数据库持久化)高(半消息+回查机制)
性能中(定时扫描有延迟,影响业务库)高(异步投递,无业务库侵入)
适用场景中小系统、多中间件环境、简单异步场景核心业务、高并发、需解耦的场景
典型案例内部管理系统的跨库数据同步电商下单→库存扣减→物流通知

选型建议:

  1. 中小系统,团队技术实力有限:选本地消息表(实现简单,不依赖复杂中间件);
  2. 核心业务,高并发且需解耦:选事务消息(性能好,侵入性低,适合大规模系统);
  3. 已用Kafka且无法换中间件:选本地消息表(Kafka无原生事务消息,二次开发成本高);
  4. 对消息延迟敏感:选事务消息(无定时扫描,延迟更低)。

总结:异步方案的本质是“ trade-off”

本地消息表和事务消息都是通过“消息异步传递”实现最终一致性,核心差异在于“消息可靠性由谁保证”:前者靠数据库的“笨办法”硬扛,后者靠消息队列的“巧机制”优雅解决。

没有绝对的好坏,只有是否适合——中小系统追求“简单落地”,本地消息表足够;大规模系统追求“高性能+低侵入”,事务消息更优。

下一篇,我们将讲解分布式事务中最简单的方案——“最大努力通知”,看看它如何用“最少的成本”解决非核心业务的一致性问题。

(觉得有用的话,欢迎点赞收藏,关注后续系列文章~)

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询