mq 发送消息
private <T extends BaseEvent> MessageBuilder<T> toMessageBuilder(final T event) { if (StringUtils.isBlank(event.keys())) { throw new RuntimeException("keys是必填项"); } // 获取tag,默认使用类名 String tags = StringUtils.defaultString(event.tags(), event.getClass().getSimpleName()); // 构建消息 MessageBuilder<T> messageBuilder = MessageBuilder.withPayload(event) .setHeader(RocketMQHeaders.TAGS, tags) .setHeader(RocketMQHeaders.KEYS, event.keys()); String traceId = MDC.get(Constants.MDC_TRACE_ID); if (StringUtils.isNotBlank(traceId)) { messageBuilder.setHeader(RocketMQConsts.Header.TRACE_ID, traceId); } String env = RequestThread.getValue(Constants.ENV); if (StringUtils.isNotBlank(env)) { messageBuilder.setHeader(RocketMQConsts.Header.ENV, env); } String desc = event.desc(); if (StringUtils.isNotBlank(desc)) { messageBuilder.setHeader(RocketMQConsts.Header.DESC, desc); } String producerApplicationName = environment.getProperty(Constants.SPRING_APPLICATION_NAME, DEFAULT_PRODUCER); messageBuilder.setHeader(RocketMQConsts.Header.PRODUCER, producerApplicationName); return messageBuilder; }mq消费
public <T extends BaseEvent> void process(final String key, final Message<T> message, final Consumer<T> function) { String cacheKey = RedisKeyUtil.generate(REDIS_REPEAT_PREFIX_KEY, key); KeyInfo keyInfo = KeyInfo.builder() .prefix(REDIS_REPEAT_PREFIX_KEY) .keys(new String[]{key}) .waitTime(3) .timeUnit(TimeUnit.SECONDS) .build(); // 获取消息体 T event = message.getPayload(); try { // 获取链路追踪id String traceId = getHeaderValue(message, RocketMQConsts.Header.TRACE_ID); MDC.put(Constants.MDC_TRACE_ID, StringUtils.isBlank(traceId) ? key : traceId); // 设置调度tag环境 String env = getHeaderValue(message, RocketMQConsts.Header.ENV); RequestThread.addParam(Constants.ENV, env); //对相同key的消息加锁,保证相同的消息是串行消费 lockService.lock(keyInfo, () -> { //判断该消息是否已处理 if (redisCache.exists(cacheKey)) { log.info("[MQ消息-已处理]--[{}]:{}", key, ReflectUtils.processObject(event)); return null; } log.info("[MQ消息-开始处理]--[{}]:{}", key, ReflectUtils.processObject(event)); function.accept(event); log.info("[MQ消息-处理完成]--{}", key); //缓存处理结果一小时,一个小时内不重复消费 redisCache.setCacheObject(cacheKey, DateUtil.now(), 60, TimeUnit.MINUTES); return null; }); } catch (LockException e) { //加锁失败,说明有线程正在消费 log.warn("[MQ消息-正在处理]--[{}]:{}", key, ReflectUtils.processObject(event)); throw e; } catch (Exception e) { log.error("[MQ消息-处理异常]--[{}]:{}", key, event, e); throw e; } finally { MDC.clear(); RequestThread.clear(); } }问题
为什么mq消费可以从mac中获取traceId 传递给消息请求头
这个代码展示了TraceId 如何从 HTTP 请求上下文传播到 MQ 消息的完整链路。让我详细解释为什么这里可以从 MDC 获取到 traceId:
🔄 完整的传播链路
🎯 为什么可以从 MDC 获取 traceId?
关键:在同一个线程内!
java
// 时间线分析(假设单线程同步处理): 1. HTTP请求进入 → HeaderFilter.doFilterInternal() ↓ 2. HeaderFilter设置MDC: String traceId = request.getHeader("traceId"); MDC.put("traceId", traceId); // 存入当前线程的ThreadLocal ↓ 3. 调用Controller → Service → MQ发送方法 ↓ 4. MQ发送方法执行: String traceId = MDC.get("traceId"); // 从同一个线程的ThreadLocal获取! ↓ 5. HeaderFilter清理: finally { MDC.clear(); } // 清理,但已经用过了线程状态可视化
java
public class ThreadStateDemo { public void processOrder() { // 时间点1:HeaderFilter执行前 // 线程:http-nio-8080-exec-1 // MDC状态:{} // HeaderFilter执行 MDC.put("traceId", "TRACE-123"); // MDC状态:{"traceId": "TRACE-123"} // 时间点2:Controller执行 // 仍然是同一个线程:http-nio-8080-exec-1 String traceId1 = MDC.get("traceId"); // "TRACE-123" // 时间点3:Service执行,调用MQ发送 // 仍然是同一个线程:http-nio-8080-exec-1 sendMQEvent(); // sendMQEvent内部: String traceId2 = MDC.get("traceId"); // "TRACE-123" ✅ // 时间点4:HeaderFilter finally清理 MDC.clear(); // MDC状态:{} } private void sendMQEvent() { // 关键:这里和调用者在同一个线程! System.out.println("当前线程: " + Thread.currentThread().getName()); // 输出:http-nio-8080-exec-1 String traceId = MDC.get("traceId"); System.out.println("获取traceId: " + traceId); // "TRACE-123" } }📊 完整的上下文传播表
| 阶段 | 组件 | 上下文存储位置 | 如何获取 | 是否同一线程 |
|---|---|---|---|---|
| 网关接收 | TraceGlobalFilter | 请求Header | request.getHeader() | N/A |
| 网关传递 | TraceGlobalFilter | 请求Header | header.set() | 网关线程 |
| 业务服务接收 | HeaderFilter | 请求Header | request.getHeader() | 业务线程 |
| 业务服务存储 | HeaderFilter | MDC (ThreadLocal) | MDC.put() | 业务线程 |
| Controller使用 | Controller方法 | MDC | MDC.get() | 同一业务线程 |
| Service使用 | Service方法 | MDC | MDC.get() | 同一业务线程 |
| MQ发送 | MQ发送器 | MDC | MDC.get() | 同一业务线程 |
| MQ消息头 | RocketMQ消息 | Message Header | setHeader() | 同一业务线程 |
🔧 代码执行流程详解
1. 请求进入(设置MDC)
java
@Order(Ordered.HIGHEST_PRECEDENCE) public class HeaderFilter extends OncePerRequestFilter { @Override protected void doFilterInternal(HttpServletRequest request, ...) { try { // 从HTTP header获取traceId String traceId = request.getHeader(Constants.MDC_TRACE_ID); // 设置到当前线程的MDC MDC.put(Constants.MDC_TRACE_ID, StringUtils.defaultString(traceId, String.valueOf(System.currentTimeMillis()))); // 继续处理请求 chain.doFilter(request, response); // → 进入Controller } finally { MDC.clear(); // 请求处理完后清理 } } }2. 业务处理(使用MDC)
java
@Service public class OrderService { @Autowired private EventPublisher eventPublisher; public void createOrder(OrderRequest request) { // 这里可以直接使用MDC中的traceId String currentTraceId = MDC.get(Constants.MDC_TRACE_ID); log.info("创建订单,traceId: {}", currentTraceId); // 执行业务逻辑... Order order = saveOrder(request); // 发送MQ事件 OrderCreatedEvent event = new OrderCreatedEvent(order); eventPublisher.publish(event); // → 进入MQ发送代码 } }3. MQ发送(从MDC获取)
java
public class EventPublisher { private <T extends BaseEvent> MessageBuilder<T> toMessageBuilder(final T event) { // 关键:这里和Service方法在同一个线程 // 所以可以获取到MDC中的traceId String traceId = MDC.get(Constants.MDC_TRACE_ID); // ✅ 能获取到! if (StringUtils.isNotBlank(traceId)) { // 设置到MQ消息头 messageBuilder.setHeader(RocketMQConsts.Header.TRACE_ID, traceId); } // 同样获取其他上下文 String env = RequestThread.getValue(Constants.ENV); // ThreadLocal if (StringUtils.isNotBlank(env)) { messageBuilder.setHeader(RocketMQConsts.Header.ENV, env); } return messageBuilder; } }📝 总结:为什么能从MDC获取traceId
核心原因:MQ发送代码与HTTP请求处理在同一个Tomcat线程内执行。
线程连续性:HTTP请求线程 → Controller → Service → MQ发送,都在同一个线程
MDC的线程绑定:MDC基于ThreadLocal,线程内共享
生命周期匹配:在HeaderFilter清理MDC之前,MQ发送已经完成
设计意图:这正是MDC的设计目的——在请求处理链路中共享上下文
关键时间线:
所以你的MQ发送代码能正确获取traceId,这正是分布式追踪系统设计精妙之处:通过线程上下文在同步处理链路上自动传递追踪信息。