SSM学生谈话管理系统2j3ws(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面
2026/1/19 18:03:59
plaintext
rocketmq-client/src/main/java/org/apache/rocketmq/client/ ├── producer/ // 生产者核心 │ ├── DefaultMQProducer.java // 生产者门面类(对外API) │ ├── DefaultMQProducerImpl.java // 生产者核心实现类(所有逻辑落地) │ ├── SendResult.java // 发送结果封装 │ └── SendCallback.java // 异步发送回调接口 ├── consumer/ // 消费者核心 │ ├── DefaultMQPushConsumer.java // Push消费者门面类 │ ├── DefaultMQPushConsumerImpl.java // Push消费者实现类 │ ├── DefaultMQPullConsumer.java // Pull消费者核心类 │ ├── MessageListener.java // 消费监听器顶级接口 │ ├── listener/ // 监听器实现 │ │ ├── MessageListenerConcurrently.java // 并发消费监听器 │ │ └── MessageListenerOrderly.java // 顺序消费监听器 │ ├── PullMessageService.java // 拉取消息后台服务(Push核心) │ ├── ConsumeMessageService.java // 消费消息服务(分并发/顺序) │ └── offset/ // Offset存储实现 │ ├── RemoteBrokerOffsetStore.java // 集群模式(存Broker) │ └── LocalFileOffsetStore.java // 广播模式(存本地) ├── common/ // 公共类 │ ├── Message.java // 消息封装(topic/tag/body等) │ ├── MessageQueue.java // 消息队列(topic/broker/queueId) │ └── consumer/ │ └── OffsetStore.java // Offset存储接口 └── MQClientInstance.java // 客户端核心实例(连接/心跳/路由) # Broker端核心存储源码(rocketmq-store模块) rocketmq-store/src/main/java/org/apache/rocketmq/store/ ├── DefaultMessageStore.java // 消息存储核心类 ├── commitlog/ // CommitLog存储 │ └── CommitLog.java // 消息写入/读取核心 ├── consumequeue/ // 消费队列存储 │ └── ConsumeQueue.java // 消费队列索引管理 ├── index/ // 索引文件管理 │ └── IndexFile.java // Key/时间索引实现 └── config/ └── ConsumerOffsetManager.java // Offset持久化管理(consumerOffset.json)DefaultMQProducer:门面类,仅暴露start()/send()等 API,所有逻辑委托给DefaultMQProducerImpl;DefaultMQProducerImpl:生产者核心,包含启动、消息发送、路由获取等全部逻辑;MQClientInstance:管理与 NameServer/Broker 的连接,维护路由信息,是生产者 / 消费者的公共依赖。java
运行
// 1. 生产者启动核心(DefaultMQProducerImpl.start()) public void start() throws MQClientException { this.checkConfig(); // 校验group、namesrv等配置 this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer); this.mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); if (!this.mQClientFactory.start()) { throw new MQClientException("启动MQClientInstance失败", null); } this.startScheduledTask(); // 启动路由更新定时任务(默认30秒一次) } // 2. 消息发送核心(DefaultMQProducerImpl.sendDefaultImpl()) private SendResult sendDefaultImpl(Message msg, CommunicationMode communicationMode, SendCallback sendCallback, long timeout) { Validators.checkMessage(msg, this.defaultMQProducer); // 校验消息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); // 获取路由 if (null == topicPublishInfo || !topicPublishInfo.ok()) { throw new MQClientException("找不到Topic路由信息", null); } MessageQueue mq = this.selectOneMessageQueue(topicPublishInfo); // 选择队列 return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout); // 发送到Broker }DefaultMQPushConsumer:Push 消费者门面类,对外暴露配置和启动 API;DefaultMQPushConsumerImpl:Push 消费者实现类,管理拉取 / 消费全流程;PullMessageService:后台线程,循环从 Broker 拉取消息(Push 模式是 “伪推”,底层主动拉);ConsumeMessageService:消费线程池,分ConsumeMessageConcurrentlyService(并发)/ConsumeMessageOrderlyService(顺序);OffsetStore:Offset 存储接口,分RemoteBrokerOffsetStore(集群模式,存 Broker)/LocalFileOffsetStore(广播模式,存本地)。java
运行
// 1. 消费者启动核心(DefaultMQPushConsumerImpl.start()) public void start() throws MQClientException { this.checkConfig(); // 校验监听器、group等 this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer); this.mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); this.pullMessageService.start(); // 启动拉取线程 this.consumeMessageService.start(); // 启动消费线程池 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); // 发送心跳到Broker } // 2. 消息拉取核心(PullMessageService.pullMessage()) private void pullMessage(final PullRequest pullRequest) { DefaultMQPushConsumerImpl consumerImpl = this.defaultMQPushConsumerImpl; MessageQueue mq = pullRequest.getMessageQueue(); // 构建拉取请求头(consumerGroup/topic/offset/拉取数量) PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); requestHeader.setConsumerGroup(consumerImpl.groupName()); requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setQueueOffset(pullRequest.getNextOffset()); // 发送拉取请求到Broker PullResult pullResult = consumerImpl.pullKernelImpl(mq, requestHeader, consumerImpl.getPullTimeoutMillis()); // 处理拉取结果 if (pullResult.getPullStatus() == PullStatus.FOUND) { // 提交到消费线程池 consumerImpl.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(), mq, pullRequest); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); this.executePullRequestImmediately(pullRequest); // 继续拉取下一批 } } // 3. 并发消费核心(ConsumeMessageConcurrentlyService.ConsumeRequest.run()) class ConsumeRequest implements Runnable { @Override public void run() { MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); // 调用用户自定义消费逻辑 ConsumeConcurrentlyStatus status = listener.consumeMessage(msgs, context); if (status == ConsumeConcurrentlyStatus.CONSUME_SUCCESS) { // 消费成功,提交Offset if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.isAutoCommit()) { ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.commitOffsetAsync(); } } else { // 消费失败,延迟重试 ConsumeMessageConcurrentlyService.this.submitConsumeRequestLater(msgs, mq); } } }| 文件 / 目录 | 路径 | 核心作用 |
|---|---|---|
| commitlog | store/commitlog/ | 存储所有消息原始内容(追加写,默认 1G / 文件,文件名 = 起始偏移量) |
| consumequeue | store/consumequeue/{Topic}/{QueueId}/ | 每个 Topic / 队列的逻辑索引文件,存储「commitlog 偏移量 + 消息大小 + Tag 哈希」(20 字节 / 条) |
| index.file | store/index/ | 消息 Key / 时间索引,支持快速查询(非必需) |
| readQueue/writeQueue | 逻辑概念(映射 consumequeue) | writeQueue = 生产者发消息的队列,readQueue = 消费者读消息的队列,默认数量一致 |
java
运行
// DefaultMessageStore.java 消息写入核心 public CompletableFuture<PutMessageResult> putMessage(final MessageExtBrokerInner msg) { if (this.shutdown) { return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null)); } // 1. 获取CommitLog的内存映射文件(MappedFile) MappedFileQueue mappedFileQueue = this.commitLog.getMappedFileQueue(); MappedFile mappedFile = mappedFileQueue.getLastMappedFile(); if (null == mappedFile) { mappedFile = mappedFileQueue.createMappedFile(null); } // 2. 写入消息到CommitLog(内存) PutMessageResult result = mappedFile.appendMessage(msg, this.appendMessageCallback); // 3. 同步生成ConsumeQueue索引 if (result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK) { this.commitLog.putMessagePositionInfo(result.getAppendMessageResult(), msg); // 触发ConsumeQueue刷盘 this.consumeQueueService.wakeup(); } // 4. 刷盘(同步/异步) if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStoreConfig.getFlushDiskType()) { CompletableFuture<PutMessageStatus> flushResultFuture = mappedFile.flush(0); flushResultFuture.get(); } else { this.flushCommitLogService.wakeup(); } return CompletableFuture.completedFuture(result); } // ConsumeQueue.java 索引写入核心 public boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode) { // 1. 计算当前索引位置(Offset对应ConsumeQueue的索引序号) long logicOffset = this.getMaxPhysicOffset() / ConsumeQueue.CQ_STORE_UNIT_SIZE; // 2. 获取ConsumeQueue的MappedFile MappedFile mappedFile = this.mappedFileQueue.getMappedFileByOffset(logicOffset); // 3. 写入索引(20字节:8字节commitlog偏移量 + 4字节消息大小 + 8字节Tag哈希) ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); byteBuffer.putLong(offset); byteBuffer.putInt(size); byteBuffer.putLong(tagsCode); // 4. 标记写入完成 mappedFile.wrotePosition(byteBuffer.position()); return true; }java
运行
// TopicConfig.java 读写队列配置 public class TopicConfig { private String topicName; private int readQueueNums = 4; // 默认读队列数 private int writeQueueNums = 4; // 默认写队列数 // 生产者选择队列:仅基于writeQueueNums哈希 public int getWriteQueueNums() { return writeQueueNums; } // 消费者选择队列:仅基于readQueueNums哈希 public int getReadQueueNums() { return readQueueNums; } }java
运行
// RemoteBrokerOffsetStore.java 核心方法:向Broker提交Offset public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) { if (mq != null) { AtomicLong offsetOld = this.offsetTable.get(mq); if (null == offsetOld) { offsetOld = new AtomicLong(offset); this.offsetTable.put(mq, offsetOld); } // 仅当新Offset更大时更新(避免回退) if (increaseOnly) { MixAll.compareAndIncreaseOnly(offsetOld, offset); } else { offsetOld.set(offset); } } } // 提交Offset到Broker(同步) public void persistAll(final Set<MessageQueue> mqs) { if (null == mqs || mqs.isEmpty()) { return; } // 构建提交请求 OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper(); for (MessageQueue mq : mqs) { AtomicLong offset = this.offsetTable.get(mq); if (offset != null) { offsetSerializeWrapper.getOffsetTable().put(mq, offset.get()); } } // 发送请求到Broker,持久化到consumerOffset.json this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway( this.groupName, offsetSerializeWrapper, 1000 * 5 ); } // Broker端:ConsumerOffsetManager.java 持久化到consumerOffset.json public void persist() { String jsonString = this.encode(false); if (jsonString != null) { try { // 写入consumerOffset.json FileUtil.writeFileContent(this.consumerOffsetPath, jsonString); } catch (IOException e) { log.error("Persist consumer offset exception", e); } } }java
运行
// LocalFileOffsetStore.java 核心方法:持久化到本地文件 public void persist(MessageQueue mq) { if (mq != null) { // 构建本地文件路径:~/.rocketmq_offsets/消费者组/Broker/Topic/QueueId String fileName = this.storePath + File.separator + this.groupName + File.separator + mq.getBrokerName() + File.separator + mq.getTopic() + File.separator + mq.getQueueId(); // 读取当前内存中的Offset Long offset = this.readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE); if (offset != null) { // 写入本地文件(纯数字格式) FileUtil.writeFileContent(fileName, offset + ""); } } } // 从本地文件加载Offset(消费者启动时调用) public long readOffset(final MessageQueue mq, final ReadOffsetType type) { if (mq != null) { String fileName = this.storePath + File.separator + this.groupName + File.separator + mq.getBrokerName() + File.separator + mq.getTopic() + File.separator + mq.getQueueId(); File file = new File(fileName); if (file.exists()) { // 读取文件内容(Offset值) String content = FileUtil.readFileContent(file); return Long.parseLong(content); } } return -1; // 文件不存在,返回-1(从0开始消费) }json
{ "ORDER_CONSUMER_GROUP": { // 消费者组 "ORDER_TOPIC": { // Topic "0": 100, // 队列0的Offset(消费到ConsumeQueue的第100条索引) "1": 95 // 队列1的Offset } }, "PAY_CONSUMER_GROUP": { "PAY_TOPIC": { "0": 200 } } }Offset 是 RocketMQ 标记 “消费者消费到哪个位置” 的核心标识,值为 “下一条要消费的消息在 ConsumeQueue 中的索引序号”(当前消费最后一条的索引序号 + 1),核心作用:
bash
运行
# 1. 查询指定消费者组-主题的详细Offset(每个队列) sh mqadmin queryConsumerOffset -n 127.0.0.1:9876 -g CONSUMER_GROUP -t TOPIC_NAME # 输出示例:Queue ID:0, Broker Name:broker-a, Queue Offset:100, Broker Offset:150 # 说明:Queue Offset=消费者进度(ConsumeQueue索引序号),Broker Offset=队列最新消息位置 # 2. 查询消费者组整体消费进度 sh mqadmin consumerProgress -n 127.0.0.1:9876 -g CONSUMER_GROUP # 3. 查询Topic队列的最大/最小Offset(消息范围) sh mqadmin topicStatus -n 127.0.0.1:9876 -t TOPIC_NAMEbash
运行
cat ${ROCKETMQ_HOME}/store/config/consumerOffset.jsonjava
运行
DefaultMQAdminExt admin = new DefaultMQAdminExt(); admin.setNamesrvAddr("127.0.0.1:9876"); admin.start(); // 查询指定队列的消费Offset MessageQueue mq = new MessageQueue("TOPIC_NAME", "broker-a", 0); long consumerOffset = admin.queryConsumerOffset("CONSUMER_GROUP", mq); // 查询队列最大Offset long maxOffset = admin.maxOffset(mq); // 查询队列最小Offset long minOffset = admin.minOffset(mq); admin.shutdown();cat ~/.rocketmq_offsets/CONSUMER_GROUP/broker-a:10911/TOPIC_NAME/0;bash
运行
# 1. 重置到指定Offset值 sh mqadmin resetOffset -n 127.0.0.1:9876 -g CONSUMER_GROUP -t TOPIC_NAME -o 100 # 2. 重置到指定时间点(如2026-01-19 10:00:00) sh mqadmin resetOffset -n 127.0.0.1:9876 -g CONSUMER_GROUP -t TOPIC_NAME -s 20260119100000java
运行
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CONSUMER_GROUP"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.subscribe("TOPIC_NAME", "*"); consumer.start(); // 获取该Topic的所有队列 Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TOPIC_NAME"); for (MessageQueue mq : mqs) { // 方式1:重置到指定Offset consumer.resetOffsetToBroker(mq, 100, false); // 方式2:重置到指定时间点 long timestamp = LocalDateTime.of(2026, 1, 19, 10, 0, 0).toInstant(ZoneOffset.of("+8")).toEpochMilli(); consumer.resetOffsetToTimestamp(mq, timestamp); }java
运行
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CONSUMER_GROUP"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setAutoCommit(false); // 关闭自动提交(核心) consumer.subscribe("TOPIC_NAME", "*"); // 并发消费手动提交 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { try { // 处理业务逻辑 processBusiness(msgs); // 方式1:同步提交(可靠,阻塞) consumer.commitSync(context.getMessageQueue()); // 方式2:异步提交(高性能,非阻塞) consumer.commitAsync(context.getMessageQueue(), new CommitCallback() { @Override public void onSuccess(MessageQueue mq, long offset) { System.out.println("Offset提交成功:" + offset); } @Override public void onException(MessageQueue mq, long offset, Throwable e) { System.err.println("Offset提交失败,重试"); consumer.commitSync(mq); // 失败重试 } }); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); // 顺序消费手动提交 consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { try { processOrder(msgs); context.setAutoCommit(false); consumer.commitSync(context.getMessageQueue()); return ConsumeOrderlyStatus.SUCCESS; } catch (Exception e) { context.setSuspendCurrentQueueTimeMillis(1000); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } });RECONSUME_LATER(并发)/SUSPEND_CURRENT_QUEUE_A_MOMENT(顺序);sendMessageBack()触发重试。java
运行
// 全局配置:设置最大重试次数(默认16次) DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CONSUMER_GROUP"); consumer.setMaxReconsumeTimes(3); // 重试3次后进入死信队列 // Broker全局配置(broker.conf) retryTimesWhenConsumeFailed=3setMaxReconsumeTimes阈值;java
运行
// ConsumerManageProcessor.java 处理死信队列创建 public RemotingCommand handleSendMessageBackRequest(ChannelHandlerContext ctx, RemotingCommand request) { final SendMessageBackRequestHeader requestHeader = (SendMessageBackRequestHeader) request.decodeCommandCustomHeader(SendMessageBackRequestHeader.class); // 1. 获取重试次数 int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes(); reconsumeTimes++; // 重试次数+1 // 2. 判断是否达到最大重试次数 if (reconsumeTimes >= this.brokerController.getBrokerConfig().getRetryTimesWhenConsumeFailed()) { // 3. 达到阈值,创建死信队列并转移消息 String deadLetterTopic = MixAll.getDLQTopic(requestHeader.getConsumerGroup()); // 创建死信队列(若不存在) this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( deadLetterTopic, 1, // 死信队列默认1个队列 PermName.PERM_WRITE | PermName.PERM_READ, 0 ); // 4. 将消息发送到死信队列 MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset()); Message newMsg = new Message(deadLetterTopic, msgExt.getBody()); newMsg.setTags(msgExt.getTags()); newMsg.setKeys(msgExt.getKeys()); // 发送死信消息 this.brokerController.getSendMessageProcessor().sendMessage(ctx, newMsg); return RemotingCommand.createResponseCommand(null); } // 未达到阈值,发送到重试队列 String retryTopic = MixAll.getRetryTopic(requestHeader.getConsumerGroup()); // 创建重试队列(若不存在) this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( retryTopic, this.brokerController.getBrokerConfig().getDefaultTopicQueueNums(), PermName.PERM_WRITE | PermName.PERM_READ, 0 ); // 发送到重试队列 // ... 省略重试队列发送逻辑 return RemotingCommand.createResponseCommand(null); }%DLQ%+消费者组名(如%DLQ%CONSUMER_GROUP);bash
运行
# 1. 手动创建死信队列Topic(%DLQ%+消费者组名) sh mqadmin updateTopic -n 127.0.0.1:9876 -c DefaultCluster -t %DLQ%CONSUMER_GROUP -b broker-a -r 1 -w 1 # 2. 查看死信队列消息 sh mqadmin queryMsgByQueue -n 127.0.0.1:9876 -t %DLQ%CONSUMER_GROUP -i 0 -s 0 -e 100 # 3. 重新消费死信消息(重置Offset) sh mqadmin resetOffset -n 127.0.0.1:9876 -g CONSUMER_GROUP -t %DLQ%CONSUMER_GROUP -o 0java
运行
// 创建专门的消费者监听死信队列 DefaultMQPushConsumer dlqConsumer = new DefaultMQPushConsumer("DLQ_CONSUMER_GROUP"); dlqConsumer.setNamesrvAddr("127.0.0.1:9876"); // 订阅死信队列(%DLQ%+原消费者组名) dlqConsumer.subscribe("%DLQ%ORIGIN_CONSUMER_GROUP", "*"); dlqConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { try { // 处理死信消息(人工审核、重试、标记异常等) for (MessageExt msg : msgs) { System.out.println("处理死信消息:" + new String(msg.getBody())); // 示例:调用补偿接口处理异常消息 compensateFailedMessage(msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { // 死信消息处理失败,记录日志并标记(避免无限重试) log.error("处理死信消息失败", e); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 死信消息不再重试 } }); dlqConsumer.start(); System.out.println("死信队列消费者启动成功");properties
# 死信队列消息过期时间(默认3天,单位毫秒) fileReservedTime=259200000 # 重试队列最大重试次数(全局) retryTimesWhenConsumeFailed=16 # 死信队列是否开启(默认开启) dlqEnable=true # 重试队列消息刷盘方式(同步刷盘更可靠) flushDiskType=SYNC_FLUSH| 维度 | 集群模式(默认) | 广播模式 |
|---|---|---|
| 消费范围 | 消费者组内实例分摊消费全量消息 | 每个实例独立消费全量消息 |
| 消息存储依赖 | commitlog + consumerQueue(Broker) | commitlog + consumerQueue(Broker) |
| Offset 存储 | Broker 端 consumerOffset.json(易恢复) | 本地文件~/.rocketmq_offsets/(易丢失) |
| 重试机制 | 支持,重试次数可控,有死信队列 | 不支持重试,无死信队列 |
| 消费进度 | 组内统一 | 实例独立 |
| 适用场景 | 核心业务(订单 / 支付 / 库存) | 通知 / 配置推送 / 日志同步 |
| 宕机恢复 | 从 Broker 恢复 Offset,无重复消费(正常) | 本地文件丢失则从头消费(重复) |
java
运行
DefaultMQProducer producer = new DefaultMQProducer("PRODUCER_GROUP"); producer.start(); String orderId = "ORDER_001"; // 分区键(保证同一订单的消息发往同一队列) Message msg = new Message("ORDER_TOPIC", "TAG", ("订单消息:" + orderId).getBytes()); // 按分区键选择队列 SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { String orderId = (String) arg; int index = orderId.hashCode() % mqs.size(); return mqs.get(index); } }, orderId);java
运行
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CONSUMER_GROUP"); consumer.setConsumeThreadNums(1); // 必须设为1(核心) consumer.subscribe("ORDER_TOPIC", "*"); // 使用顺序监听器(队列锁保证串行) consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { // 串行处理订单消息(创建→支付→扣库存) for (MessageExt msg : msgs) { processOrder(msg); } return ConsumeOrderlyStatus.SUCCESS; } }); // 额外前提:消费者组仅部署1个实例(多实例会瓜分队列,导致乱序) consumer.start();flushDiskType=SYNC_FLUSH(同步刷盘),非核心用异步刷盘;readQueueNums=writeQueueNums(默认),扩容时先改 writeQueue 再改 readQueue;