黄山市网站建设_网站建设公司_Redis_seo优化
2025/12/17 17:48:45 网站建设 项目流程

mq 发送消息

private <T extends BaseEvent> MessageBuilder<T> toMessageBuilder(final T event) { if (StringUtils.isBlank(event.keys())) { throw new RuntimeException("keys是必填项"); } // 获取tag,默认使用类名 String tags = StringUtils.defaultString(event.tags(), event.getClass().getSimpleName()); // 构建消息 MessageBuilder<T> messageBuilder = MessageBuilder.withPayload(event) .setHeader(RocketMQHeaders.TAGS, tags) .setHeader(RocketMQHeaders.KEYS, event.keys()); String traceId = MDC.get(Constants.MDC_TRACE_ID); if (StringUtils.isNotBlank(traceId)) { messageBuilder.setHeader(RocketMQConsts.Header.TRACE_ID, traceId); } String env = RequestThread.getValue(Constants.ENV); if (StringUtils.isNotBlank(env)) { messageBuilder.setHeader(RocketMQConsts.Header.ENV, env); } String desc = event.desc(); if (StringUtils.isNotBlank(desc)) { messageBuilder.setHeader(RocketMQConsts.Header.DESC, desc); } String producerApplicationName = environment.getProperty(Constants.SPRING_APPLICATION_NAME, DEFAULT_PRODUCER); messageBuilder.setHeader(RocketMQConsts.Header.PRODUCER, producerApplicationName); return messageBuilder; }

mq消费

public <T extends BaseEvent> void process(final String key, final Message<T> message, final Consumer<T> function) { String cacheKey = RedisKeyUtil.generate(REDIS_REPEAT_PREFIX_KEY, key); KeyInfo keyInfo = KeyInfo.builder() .prefix(REDIS_REPEAT_PREFIX_KEY) .keys(new String[]{key}) .waitTime(3) .timeUnit(TimeUnit.SECONDS) .build(); // 获取消息体 T event = message.getPayload(); try { // 获取链路追踪id String traceId = getHeaderValue(message, RocketMQConsts.Header.TRACE_ID); MDC.put(Constants.MDC_TRACE_ID, StringUtils.isBlank(traceId) ? key : traceId); // 设置调度tag环境 String env = getHeaderValue(message, RocketMQConsts.Header.ENV); RequestThread.addParam(Constants.ENV, env); //对相同key的消息加锁,保证相同的消息是串行消费 lockService.lock(keyInfo, () -> { //判断该消息是否已处理 if (redisCache.exists(cacheKey)) { log.info("[MQ消息-已处理]--[{}]:{}", key, ReflectUtils.processObject(event)); return null; } log.info("[MQ消息-开始处理]--[{}]:{}", key, ReflectUtils.processObject(event)); function.accept(event); log.info("[MQ消息-处理完成]--{}", key); //缓存处理结果一小时,一个小时内不重复消费 redisCache.setCacheObject(cacheKey, DateUtil.now(), 60, TimeUnit.MINUTES); return null; }); } catch (LockException e) { //加锁失败,说明有线程正在消费 log.warn("[MQ消息-正在处理]--[{}]:{}", key, ReflectUtils.processObject(event)); throw e; } catch (Exception e) { log.error("[MQ消息-处理异常]--[{}]:{}", key, event, e); throw e; } finally { MDC.clear(); RequestThread.clear(); } }

问题

为什么mq消费可以从mac中获取traceId 传递给消息请求头

这个代码展示了TraceId 如何从 HTTP 请求上下文传播到 MQ 消息的完整链路。让我详细解释为什么这里可以从 MDC 获取到 traceId:

🔄 完整的传播链路

🎯 为什么可以从 MDC 获取 traceId?

关键:在同一个线程内!

java

// 时间线分析(假设单线程同步处理): 1. HTTP请求进入 → HeaderFilter.doFilterInternal() ↓ 2. HeaderFilter设置MDC: String traceId = request.getHeader("traceId"); MDC.put("traceId", traceId); // 存入当前线程的ThreadLocal ↓ 3. 调用Controller → Service → MQ发送方法 ↓ 4. MQ发送方法执行: String traceId = MDC.get("traceId"); // 从同一个线程的ThreadLocal获取! ↓ 5. HeaderFilter清理: finally { MDC.clear(); } // 清理,但已经用过了

线程状态可视化

java

public class ThreadStateDemo { public void processOrder() { // 时间点1:HeaderFilter执行前 // 线程:http-nio-8080-exec-1 // MDC状态:{} // HeaderFilter执行 MDC.put("traceId", "TRACE-123"); // MDC状态:{"traceId": "TRACE-123"} // 时间点2:Controller执行 // 仍然是同一个线程:http-nio-8080-exec-1 String traceId1 = MDC.get("traceId"); // "TRACE-123" // 时间点3:Service执行,调用MQ发送 // 仍然是同一个线程:http-nio-8080-exec-1 sendMQEvent(); // sendMQEvent内部: String traceId2 = MDC.get("traceId"); // "TRACE-123" ✅ // 时间点4:HeaderFilter finally清理 MDC.clear(); // MDC状态:{} } private void sendMQEvent() { // 关键:这里和调用者在同一个线程! System.out.println("当前线程: " + Thread.currentThread().getName()); // 输出:http-nio-8080-exec-1 String traceId = MDC.get("traceId"); System.out.println("获取traceId: " + traceId); // "TRACE-123" } }

📊 完整的上下文传播表

阶段组件上下文存储位置如何获取是否同一线程
网关接收TraceGlobalFilter请求Headerrequest.getHeader()N/A
网关传递TraceGlobalFilter请求Headerheader.set()网关线程
业务服务接收HeaderFilter请求Headerrequest.getHeader()业务线程
业务服务存储HeaderFilterMDC (ThreadLocal)MDC.put()业务线程
Controller使用Controller方法MDCMDC.get()同一业务线程
Service使用Service方法MDCMDC.get()同一业务线程
MQ发送MQ发送器MDCMDC.get()同一业务线程
MQ消息头RocketMQ消息Message HeadersetHeader()同一业务线程

🔧 代码执行流程详解

1. 请求进入(设置MDC)

java

@Order(Ordered.HIGHEST_PRECEDENCE) public class HeaderFilter extends OncePerRequestFilter { @Override protected void doFilterInternal(HttpServletRequest request, ...) { try { // 从HTTP header获取traceId String traceId = request.getHeader(Constants.MDC_TRACE_ID); // 设置到当前线程的MDC MDC.put(Constants.MDC_TRACE_ID, StringUtils.defaultString(traceId, String.valueOf(System.currentTimeMillis()))); // 继续处理请求 chain.doFilter(request, response); // → 进入Controller } finally { MDC.clear(); // 请求处理完后清理 } } }

2. 业务处理(使用MDC)

java

@Service public class OrderService { @Autowired private EventPublisher eventPublisher; public void createOrder(OrderRequest request) { // 这里可以直接使用MDC中的traceId String currentTraceId = MDC.get(Constants.MDC_TRACE_ID); log.info("创建订单,traceId: {}", currentTraceId); // 执行业务逻辑... Order order = saveOrder(request); // 发送MQ事件 OrderCreatedEvent event = new OrderCreatedEvent(order); eventPublisher.publish(event); // → 进入MQ发送代码 } }

3. MQ发送(从MDC获取)

java

public class EventPublisher { private <T extends BaseEvent> MessageBuilder<T> toMessageBuilder(final T event) { // 关键:这里和Service方法在同一个线程 // 所以可以获取到MDC中的traceId String traceId = MDC.get(Constants.MDC_TRACE_ID); // ✅ 能获取到! if (StringUtils.isNotBlank(traceId)) { // 设置到MQ消息头 messageBuilder.setHeader(RocketMQConsts.Header.TRACE_ID, traceId); } // 同样获取其他上下文 String env = RequestThread.getValue(Constants.ENV); // ThreadLocal if (StringUtils.isNotBlank(env)) { messageBuilder.setHeader(RocketMQConsts.Header.ENV, env); } return messageBuilder; } }

📝 总结:为什么能从MDC获取traceId
核心原因:MQ发送代码与HTTP请求处理在同一个Tomcat线程内执行。

线程连续性:HTTP请求线程 → Controller → Service → MQ发送,都在同一个线程

MDC的线程绑定:MDC基于ThreadLocal,线程内共享

生命周期匹配:在HeaderFilter清理MDC之前,MQ发送已经完成

设计意图:这正是MDC的设计目的——在请求处理链路中共享上下文

关键时间线:


所以你的MQ发送代码能正确获取traceId,这正是分布式追踪系统设计精妙之处:通过线程上下文在同步处理链路上自动传递追踪信息。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询