网关
/** * 生成日志链路追踪id,并传入header中. * * @author breggor */ @Slf4j @AllArgsConstructor public class TraceGlobalFilter implements GlobalFilter, Ordered { @Override public Mono<Void> filter(final ServerWebExchange exchange, final GatewayFilterChain chain) { ServerHttpRequest rawRequest = exchange.getRequest(); String path = rawRequest.getURI().getPath(); String traceId = StringUtils.defaultString(rawRequest.getHeaders().getFirst(Constants.MDC_TRACE_ID), String.valueOf(System.currentTimeMillis())); MDC.put(Constants.MDC_TRACE_ID, traceId); log.info("[Gateway] - path={}, traceId={}, requestIp={} ====> {}", path, traceId, getIpAddr(rawRequest), MDC.getCopyOfContextMap()); ServerHttpRequest request = rawRequest.mutate().header(Constants.MDC_TRACE_ID, traceId).build(); return chain.filter(exchange.mutate().request(request).build()).doFinally(x -> MDC.clear()); }业务层过滤器
/** * header拦截器把header数据放到MDC. * * @author breggor */ @Slf4j @Component @Order(Ordered.HIGHEST_PRECEDENCE) public class HeaderFilter extends OncePerRequestFilter { @Override protected void doFilterInternal(final HttpServletRequest request, final HttpServletResponse response, final FilterChain chain) throws ServletException, IOException { try { MDC.put(Constants.MDC_USER_ID, StringUtils.defaultString(request.getHeader(Constants.MDC_USER_ID), Constants.DASH)); MDC.put(Constants.MDC_COMPANY_ID, StringUtils.defaultString(request.getHeader(Constants.MDC_COMPANY_ID), Constants.DASH)); String shopId = request.getHeader(Constants.MDC_SHOP_ID); if (StringUtils.isBlank(shopId)) { try { // 获取线下店铺编码 String shopCode = request.getHeader(Constants.MDC_OFFLINE_SHOP_CODE); if (StringUtils.isNotBlank(shopCode)) { shopId = String.valueOf(CodeUtil.toId(shopCode)); } } catch (Exception e) { log.warn("获取线下店铺编码异常", e); } } MDC.put(Constants.MDC_SHOP_ID, StringUtils.defaultString(shopId, Constants.DASH)); String traceId = request.getHeader(Constants.MDC_TRACE_ID); MDC.put(Constants.MDC_TRACE_ID, StringUtils.defaultString(traceId, String.valueOf(System.currentTimeMillis()))); RequestThread.addParam(Constants.ENV, StringUtils.defaultString(request.getHeader(Constants.ENV))); chain.doFilter(request, response); } finally { MDC.clear(); RequestThread.clear(); } } }mq消费
public <T extends BaseEvent> void process(final String key, final Message<T> message, final Consumer<T> function) { String cacheKey = RedisKeyUtil.generate(REDIS_REPEAT_PREFIX_KEY, key); KeyInfo keyInfo = KeyInfo.builder() .prefix(REDIS_REPEAT_PREFIX_KEY) .keys(new String[]{key}) .waitTime(3) .timeUnit(TimeUnit.SECONDS) .build(); // 获取消息体 T event = message.getPayload(); try { // 获取链路追踪id String traceId = getHeaderValue(message, RocketMQConsts.Header.TRACE_ID); MDC.put(Constants.MDC_TRACE_ID, StringUtils.isBlank(traceId) ? key : traceId); // 设置调度tag环境 String env = getHeaderValue(message, RocketMQConsts.Header.ENV); RequestThread.addParam(Constants.ENV, env); //对相同key的消息加锁,保证相同的消息是串行消费 lockService.lock(keyInfo, () -> { //判断该消息是否已处理 if (redisCache.exists(cacheKey)) { log.info("[MQ消息-已处理]--[{}]:{}", key, ReflectUtils.processObject(event)); return null; } log.info("[MQ消息-开始处理]--[{}]:{}", key, ReflectUtils.processObject(event)); function.accept(event); log.info("[MQ消息-处理完成]--{}", key); //缓存处理结果一小时,一个小时内不重复消费 redisCache.setCacheObject(cacheKey, DateUtil.now(), 60, TimeUnit.MINUTES); return null; }); } catch (LockException e) { //加锁失败,说明有线程正在消费 log.warn("[MQ消息-正在处理]--[{}]:{}", key, ReflectUtils.processObject(event)); throw e; } catch (Exception e) { log.error("[MQ消息-处理异常]--[{}]:{}", key, event, e); throw e; } finally { MDC.clear(); RequestThread.clear(); } }🎯 关键点:两个完全独立的线程上下文
1. 网关和业务服务是独立的进程/实例
java
// 网关进程(可能是Spring Cloud Gateway) public class GatewayApp { public static void main(String[] args) { // 独立JVM进程,端口8080 SpringApplication.run(GatewayApp.class, args); } } // 业务服务进程(可能是Spring Boot Web应用) public class BusinessApp { public static void main(String[] args) { // 独立JVM进程,端口8081 SpringApplication.run(BusinessApp.class, args); } } // 结论:两个不同的Tomcat实例,不同的线程池2. 各自的线程池管理
java
// 网关线程池(以Tomcat为例) "http-nio-8080-exec-1" // 网关Tomcat线程1 "http-nio-8080-exec-2" // 网关Tomcat线程2 "http-nio-8080-exec-3" // 网关Tomcat线程3 // 业务服务线程池 "http-nio-8081-exec-1" // 业务服务Tomcat线程1 "http-nio-8081-exec-2" // 业务服务Tomcat线程2 "http-nio-8081-exec-3" // 业务服务Tomcat线程3 // 完全独立的线程池!互不干扰
📊 为什么各自需要清理?
原因1:防止内存泄漏
java
// MDC基于ThreadLocal,不清理会内存泄漏 public class MDC { private static final ThreadLocal<Map<String, String>> CONTEXT = new ThreadLocal<>(); public static void put(String key, String value) { Map<String, String> map = CONTEXT.get(); if (map == null) { map = new HashMap<>(); CONTEXT.set(map); // 绑定到当前线程 } map.put(key, value); } } // Tomcat线程池会复用线程 for (int i = 0; i < 1000; i++) { // 线程池的线程"http-nio-8080-exec-1"被重复使用 // 如果不清理,第2次请求会看到第1次请求的traceId! }原因2:线程生命周期独立
java
// 模拟一次完整请求 public class ThreadLifecycleDemo { public static void main(String[] args) { // 请求1进入 Thread gatewayThread = Thread.currentThread(); // 网关线程-GW1 // 网关设置MDC MDC.put("traceId", "TRACE-001"); System.out.println("网关MDC: " + MDC.get("traceId")); // TRACE-001 // 转发请求到业务服务(网络调用) // 此时网关线程可能处理其他请求! // 业务服务收到请求 Thread businessThread = Thread.currentThread(); // 业务线程-BIZ2 // 这是完全不同的线程! // 业务服务设置MDC(从header获取) MDC.put("traceId", "TRACE-001"); System.out.println("业务MDC: " + MDC.get("traceId")); // TRACE-001 // 各自清理 MDC.clear(); // 网关清理自己的 MDC.clear(); // 业务清理自己的 } }原因3:可能存在的嵌套/重试场景
java
// 场景:网关重试机制 public class RetryScenario { public void processRequest() { try { // 第1次调用业务服务 callBusinessService(); // 如果失败,重试(可能用不同线程) if (needRetry()) { // 网关可能用新线程重试 retryWithNewThread(); } } finally { MDC.clear(); // 必须清理当前线程 } } private void retryWithNewThread() { // 新线程,需要重新设置MDC executor.submit(() -> { MDC.put("traceId", "TRACE-001"); // 重新设置 callBusinessService(); MDC.clear(); // 清理新线程 }); } }⚠️ 如果不清理会怎样?
场景演示:MDC污染
java
// 测试代码:模拟不清理MDC的问题 public class MDCPollutionTest { @Test void testMDCPollution() throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(1); // 第1个任务:设置MDC但不清理 executor.submit(() -> { MDC.put("traceId", "TRACE-001"); MDC.put("userId", "USER-1001"); System.out.println("任务1: traceId=" + MDC.get("traceId")); // 故意不清理! }); Thread.sleep(100); // 等待任务完成 // 第2个任务:复用同一个线程 executor.submit(() -> { // 问题:看到了上个任务的MDC! System.out.println("任务2(污染): traceId=" + MDC.get("traceId")); // TRACE-001 System.out.println("任务2(污染): userId=" + MDC.get("userId")); // USER-1001 }); executor.shutdown(); } } // 输出: // 任务1: traceId=TRACE-001 // 任务2(污染): traceId=TRACE-001 ❌ 错误! // 任务2(污染): userId=USER-1001 ❌ 错误!实际Tomcat线程复用场景
java
// Tomcat请求处理 public class TomcatThreadReuse { // Tomcat配置:最大线程数=200,最小空闲线程=10 // 请求1:用户A下单 // 线程:http-nio-8080-exec-5 MDC.put("traceId", "TRACE-A"); MDC.put("userId", "USER-A"); // 处理订单... // 如果不清理:MDC中留有USER-A的信息 // 请求2:用户B查询(可能复用同一个线程) // 线程:http-nio-8080-exec-5(线程复用!) // 日志会错误地输出:用户B的请求,traceId却是TRACE-A,userId是USER-A log.info("查询用户订单"); // 错误关联了用户A的信息! }🔧 正确的清理模式
模式1:try-finally模式(你的HeaderFilter)
java
public class HeaderFilter extends OncePerRequestFilter { @Override protected void doFilterInternal(HttpServletRequest request, ...) { try { // 1. 设置MDC MDC.put("traceId", traceId); MDC.put("userId", userId); // 2. 业务处理 chain.doFilter(request, response); } finally { // 3. 必须清理! MDC.clear(); RequestThread.clear(); } } }模式2:Reactive模式(你的TraceGlobalFilter)
java
public class TraceGlobalFilter implements GlobalFilter { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // WebFlux响应式编程 return chain.filter(exchange.mutate().request(request).build()) .doFinally(x -> MDC.clear()); // 响应式清理 // doFinally保证无论如何都会执行 // 类似try-finally,但响应式风格 } }模式3:AOP环绕模式
java
@Aspect @Component public class MDCAspect { @Around("@annotation(org.springframework.web.bind.annotation.GetMapping)") public Object aroundController(ProceedingJoinPoint joinPoint) throws Throwable { try { // 前置:设置MDC(如果需要) setupMDC(); // 执行业务 return joinPoint.proceed(); } finally { // 后置:清理MDC MDC.clear(); } } }📝 清理的最佳实践
1. 必须清理的资源
java
finally { // 1. MDC清理(ThreadLocal) MDC.clear(); // 2. RequestThread清理(自定义ThreadLocal) RequestThread.clear(); // 3. 其他ThreadLocal资源 SecurityContextHolder.clearContext(); // Spring Security // 4. 数据库连接(通常由连接池管理) // 5. 事务上下文(由Spring管理) }2. 清理时机验证
java
// 验证清理是否必要 @SpringBootTest public class MDCCleanupTest { @Autowired private TestRestTemplate restTemplate; @Test void testConcurrentRequests() { // 模拟并发请求 List<CompletableFuture<String>> futures = new ArrayList<>(); for (int i = 0; i < 10; i++) { final int userId = i; futures.add(CompletableFuture.supplyAsync(() -> { // 每个请求应该有独立的traceId HttpHeaders headers = new HttpHeaders(); headers.add("traceId", "TRACE-" + userId); ResponseEntity<String> response = restTemplate.exchange( "/api/test", HttpMethod.GET, new HttpEntity<>(headers), String.class ); return response.getBody(); })); } // 所有请求应该互不干扰 // 如果不清理MDC,会有交叉污染 } }3. 监控MDC泄漏
java
// 监控ThreadLocal泄漏 public class MDCLeakMonitor { @Scheduled(fixedDelay = 60000) // 每分钟检查 public void monitorMDCLeak() { // 检查Tomcat线程是否携带MDC ThreadGroup group = Thread.currentThread().getThreadGroup(); Thread[] threads = new Thread[group.activeCount()]; group.enumerate(threads); for (Thread thread : threads) { if (thread.getName().startsWith("http-nio")) { // 模拟访问线程的MDC(通过反射) if (hasMDCData(thread)) { log.warn("检测到MDC泄漏!线程: {}", thread.getName()); } } } } }🎯 为什么需要两次清理:总结
| 维度 | 网关清理 | 业务服务清理 | 原因 |
|---|---|---|---|
| 线程 | 网关Tomcat线程 | 业务Tomcat线程 | 不同线程池 |
| 进程 | 网关JVM进程 | 业务JVM进程 | 不同进程 |
| 内存 | 网关线程的ThreadLocal | 业务线程的ThreadLocal | 独立内存空间 |
| 目的 | 防止网关线程污染 | 防止业务线程污染 | 各自负责 |
| 必要性 | 必须 | 必须 | 线程池会复用线程 |
核心原则:
谁设置,谁清理:每个服务负责清理自己设置的ThreadLocal
线程独立:每个线程的ThreadLocal是独立的,需要各自清理
防御性编程:即使认为"应该不会复用",也要清理
框架要求:Tomcat等容器明确要求清理线程局部变量
最终结论:
你的代码中网关和业务服务都清理MDC是完全正确的,这是分布式系统中防止内存泄漏和上下文污染的最佳实践。如果只有一方清理,另一方就会出现MDC残留,导致严重的数据错乱问题。