舟山市网站建设_网站建设公司_GitHub_seo优化
2026/1/21 16:23:23 网站建设 项目流程

一、项目概述与架构设计

1.1 为什么选择Netty + SpringBoot?

性能优势对比:

  • 传统Tomcat WebSocket:单机连接数约1-2万,线程模型较重

  • Netty WebSocket:单机可支持10万+连接,基于NIO的Reactor模型

Netty核心优势:

  • 异步非阻塞IO,高并发处理能力强

  • 零拷贝技术减少内存复制

  • 内存池和对象池减少GC压力

  • 灵活的编解码器支持

1.2 整体架构设计

text

┌─────────────────────────────────────────────────┐ │ 客户端集群 │ └───────────────┬───────────────┬─────────────────┘ │ │ ┌───────────▼───────┬───────▼───────────┐ │ Nginx/负载均衡 │ │ └───────────┬───────┘ │ │ │ ┌───────────▼───────────────┐ ┌──────▼──────────┐ │ Netty WebSocket集群 │ │ SpringBoot │ │ - 连接管理 │ │ - 业务逻辑 │ │ - 消息转发 │ │ - 数据持久化 │ │ - 心跳检测 │ │ - API接口 │ └───────────┬───────────────┘ └──────┬──────────┘ │ │ ┌───────────▼───────────────────────────▼──────────┐ │ Redis集群 │ │ - 会话管理 │ │ - 分布式消息路由 │ └───────────┬──────────────────────────────────────┘ │ ┌───────────▼──────────┐ │ MySQL │ │ - 业务数据 │ └──────────────────────┘

二、快速开始:三分钟搭建

2.1 第一步:创建SpringBoot项目(30秒)

xml

<!-- pom.xml --> <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.0</version> </parent> <dependencies> <!-- Netty依赖 --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.79.Final</version> </dependency> <!-- SpringBoot基础依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Redis用于分布式会话 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- 工具类 --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> </dependencies> </project>

2.2 第二步:配置Netty服务器(60秒)

java

/** * Netty WebSocket服务器配置 */ @Component @Slf4j public class NettyWebSocketServer { private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private Channel channel; @Value("${websocket.port:8080}") private int port; @Value("${websocket.path:/ws}") private String path; /** * 启动Netty服务器 */ @PostConstruct public void start() throws InterruptedException { bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new WebSocketServerInitializer(path)); channel = bootstrap.bind(port).sync().channel(); log.info("Netty WebSocket服务器启动成功,端口:{},路径:{}", port, path); } catch (Exception e) { log.error("Netty服务器启动失败", e); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } /** * 关闭Netty服务器 */ @PreDestroy public void destroy() { if (channel != null) { channel.close(); } bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); log.info("Netty WebSocket服务器已关闭"); } }

2.3 第三步:实现WebSocket处理器(90秒)

java

/** * WebSocket通道初始化器 */ public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> { private final String websocketPath; public WebSocketServerInitializer(String websocketPath) { this.websocketPath = websocketPath; } @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // HTTP编解码器 pipeline.addLast(new HttpServerCodec()); // 大数据流支持 pipeline.addLast(new ChunkedWriteHandler()); // HTTP消息聚合器 pipeline.addLast(new HttpObjectAggregator(65536)); // WebSocket协议处理器 pipeline.addLast(new WebSocketServerProtocolHandler( websocketPath, null, true, 65536 * 10)); // 心跳检测 pipeline.addLast(new IdleStateHandler(60, 0, 0)); // 自定义消息处理器 pipeline.addLast(new WebSocketFrameHandler()); } } /** * WebSocket消息处理器 */ @ChannelHandler.Sharable @Slf4j public class WebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { // 连接管理器 private final ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void channelActive(ChannelHandlerContext ctx) { channelGroup.add(ctx.channel()); log.info("客户端连接:{},当前连接数:{}", ctx.channel().id(), channelGroup.size()); } @Override public void channelInactive(ChannelHandlerContext ctx) { channelGroup.remove(ctx.channel()); log.info("客户端断开:{},当前连接数:{}", ctx.channel().id(), channelGroup.size()); } @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) { String request = msg.text(); log.info("收到消息:{}", request); // 业务处理 String response = processMessage(request); // 发送响应 ctx.writeAndFlush(new TextWebSocketFrame(response)); } private String processMessage(String message) { // 这里实现业务逻辑 return "服务器收到:" + message + ",时间:" + System.currentTimeMillis(); } /** * 广播消息 */ public void broadcast(String message) { channelGroup.writeAndFlush(new TextWebSocketFrame(message)); } }

三、高性能优化策略

3.1 连接管理与心跳检测

java

/** * 心跳检测处理器 */ public class HeartbeatHandler extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { // 读超时,关闭连接 ctx.close(); } } } } /** * 连接管理器 */ @Component @Slf4j public class ConnectionManager { // 使用ConcurrentHashMap存储连接信息 private final ConcurrentMap<String, Channel> channelMap = new ConcurrentHashMap<>(1024); // 使用Redis存储分布式会话 @Autowired private RedisTemplate<String, String> redisTemplate; /** * 添加连接 */ public void addConnection(String userId, Channel channel) { String channelId = channel.id().asLongText(); channelMap.put(channelId, channel); // 存储到Redis,用于分布式查找 String key = "websocket:user:" + userId; redisTemplate.opsForValue().set(key, channelId, 30, TimeUnit.MINUTES); // 绑定用户ID到Channel属性 channel.attr(AttributeKey.valueOf("userId")).set(userId); } /** * 根据用户ID发送消息 */ public void sendToUser(String userId, String message) { String key = "websocket:user:" + userId; String channelId = redisTemplate.opsForValue().get(key); if (channelId != null) { Channel channel = channelMap.get(channelId); if (channel != null && channel.isActive()) { channel.writeAndFlush(new TextWebSocketFrame(message)); } } } }

3.2 消息编解码优化

java

/** * 自定义消息编解码器 */ public class WebSocketMessageCodec extends MessageToMessageCodec<TextWebSocketFrame, WebSocketMessage> { private static final ObjectMapper objectMapper = new ObjectMapper(); @Override protected void encode(ChannelHandlerContext ctx, WebSocketMessage msg, List<Object> out) { try { String json = objectMapper.writeValueAsString(msg); out.add(new TextWebSocketFrame(json)); } catch (JsonProcessingException e) { throw new RuntimeException("消息编码失败", e); } } @Override protected void decode(ChannelHandlerContext ctx, TextWebSocketFrame frame, List<Object> out) { try { WebSocketMessage msg = objectMapper.readValue( frame.text(), WebSocketMessage.class); out.add(msg); } catch (JsonProcessingException e) { throw new RuntimeException("消息解码失败", e); } } } /** * 消息实体类 */ @Data @Builder @AllArgsConstructor @NoArgsConstructor class WebSocketMessage { private Integer type; // 消息类型 private String from; // 发送者 private String to; // 接收者 private Object data; // 消息数据 private Long timestamp; // 时间戳 }

3.3 内存优化配置

yaml

# application.yml websocket: port: 8080 path: /ws boss-threads: 1 worker-threads: 8 max-frame-size: 65536 server: port: 8081 # SpringBoot管理端口 # Netty内存优化配置 netty: pool: # 使用池化ByteBuf allocator-type: pooled # 堆外内存 prefer-direct: true # 接收缓冲区大小 rcvbuf: 65536 # 发送缓冲区大小 sndbuf: 65536

java

/** * Netty内存优化配置 */ @Configuration public class NettyConfig { @Bean public ByteBufAllocator byteBufAllocator() { // 使用池化分配器,减少内存分配和GC return PooledByteBufAllocator.DEFAULT; } @Bean @ConditionalOnMissingBean public EventLoopGroup bossGroup() { // 单线程即可,因为主要处理连接请求 return new NioEventLoopGroup(1); } @Bean @ConditionalOnMissingBean public EventLoopGroup workerGroup() { // CPU核心数 * 2 int threads = Runtime.getRuntime().availableProcessors() * 2; return new NioEventLoopGroup(threads); } }

四、高级特性实现

4.1 分布式Session管理

java

/** * 分布式Session管理器 */ @Component @Slf4j public class DistributedSessionManager { @Autowired private StringRedisTemplate redisTemplate; // 使用Redisson实现分布式锁 @Autowired private RedissonClient redissonClient; private static final String SESSION_PREFIX = "websocket:session:"; private static final String USER_SESSION_PREFIX = "websocket:user_session:"; /** * 创建会话 */ public String createSession(String userId, Channel channel) { String sessionId = UUID.randomUUID().toString(); String channelId = channel.id().asLongText(); // 存储会话信息 Map<String, String> sessionData = new HashMap<>(); sessionData.put("userId", userId); sessionData.put("channelId", channelId); sessionData.put("loginTime", String.valueOf(System.currentTimeMillis())); sessionData.put("lastActiveTime", String.valueOf(System.currentTimeMillis())); // 存储到Redis,设置过期时间 String sessionKey = SESSION_PREFIX + sessionId; redisTemplate.opsForHash().putAll(sessionKey, sessionData); redisTemplate.expire(sessionKey, 30, TimeUnit.MINUTES); // 记录用户与会话的映射 String userSessionKey = USER_SESSION_PREFIX + userId; redisTemplate.opsForSet().add(userSessionKey, sessionId); redisTemplate.expire(userSessionKey, 30, TimeUnit.MINUTES); return sessionId; } /** * 发送消息到用户的所有设备 */ public void sendToUserAllDevices(String userId, String message) { String userSessionKey = USER_SESSION_PREFIX + userId; Set<String> sessionIds = redisTemplate.opsForSet().members(userSessionKey); if (sessionIds != null) { sessionIds.forEach(sessionId -> { String sessionKey = SESSION_PREFIX + sessionId; Map<Object, Object> sessionData = redisTemplate.opsForHash().entries(sessionKey); String channelId = (String) sessionData.get("channelId"); // 通过ChannelManager找到对应的Channel发送消息 // 这里需要实现ChannelManager来管理Channel }); } } }

4.2 消息可靠性保证

java

/** * 可靠消息处理器 */ @Slf4j public class ReliableMessageHandler extends ChannelInboundHandlerAdapter { // 消息确认映射 private final Map<String, MessageRecord> pendingMessages = new ConcurrentHashMap<>(1024); // 重试队列 private final PriorityBlockingQueue<RetryTask> retryQueue = new PriorityBlockingQueue<>(1000); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof WebSocketMessage) { WebSocketMessage message = (WebSocketMessage) msg; // 处理消息确认 if (message.getType() == MessageType.ACK.getValue()) { handleAck(message); return; } // 发送消息确认 sendAck(ctx, message); // 处理业务消息 ctx.fireChannelRead(msg); } else { ctx.fireChannelRead(msg); } } /** * 发送消息 */ public void sendMessage(Channel channel, WebSocketMessage message) { String messageId = generateMessageId(); message.setMessageId(messageId); // 记录待确认消息 MessageRecord record = new MessageRecord(message, channel, System.currentTimeMillis()); pendingMessages.put(messageId, record); // 发送消息 channel.writeAndFlush(message); // 启动重试任务 scheduleRetry(messageId); } private void scheduleRetry(String messageId) { RetryTask task = new RetryTask(messageId, System.currentTimeMillis() + 3000); retryQueue.offer(task); // 启动重试线程 CompletableFuture.runAsync(() -> { while (!Thread.currentThread().isInterrupted()) { try { RetryTask retryTask = retryQueue.take(); if (retryTask.getRetryTime() <= System.currentTimeMillis()) { retryMessage(retryTask.getMessageId()); } else { retryQueue.put(retryTask); Thread.sleep(1000); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }); } }

4.3 流量控制与限流

java

/** * 流量控制处理器 */ public class TrafficControlHandler extends ChannelDuplexHandler { // 令牌桶限流器 private final RateLimiter rateLimiter = RateLimiter.create(1000); // 1000 QPS // 连接频率限制 private final Cache<String, Integer> connectRateCache = CacheBuilder.newBuilder() .expireAfterWrite(1, TimeUnit.MINUTES) .build(); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String clientIp = getClientIp(ctx); // 检查连接频率 Integer count = connectRateCache.getIfPresent(clientIp); if (count != null && count > 10) { ctx.close(); // 超过10次/分钟,拒绝连接 return; } connectRateCache.put(clientIp, count == null ? 1 : count + 1); super.channelActive(ctx); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { // 限制发送速率 if (!rateLimiter.tryAcquire()) { promise.setFailure(new RateLimitException("发送速率过快")); return; } super.write(ctx, msg, promise); } private String getClientIp(ChannelHandlerContext ctx) { Channel channel = ctx.channel(); if (channel.remoteAddress() instanceof InetSocketAddress) { InetSocketAddress address = (InetSocketAddress) channel.remoteAddress(); return address.getAddress().getHostAddress(); } return "unknown"; } } /** * 滑动窗口限流器 */ @Component @Slf4j public class SlidingWindowLimiter { // 滑动窗口:key为时间戳秒数,value为请求数 private final ConcurrentHashMap<Long, AtomicInteger> windows = new ConcurrentHashMap<>(); private final int limitPerSecond = 1000; // 每秒限制 private final int windowSize = 10; // 窗口大小(秒) /** * 尝试获取令牌 */ public boolean tryAcquire(String key) { long currentTime = System.currentTimeMillis() / 1000; // 清理过期窗口 cleanExpiredWindows(currentTime); // 计算当前窗口内的总请求数 int totalRequests = 0; for (long i = currentTime - windowSize + 1; i <= currentTime; i++) { AtomicInteger count = windows.get(i); if (count != null) { totalRequests += count.get(); } } // 判断是否超过限制 if (totalRequests >= limitPerSecond * windowSize) { return false; } // 记录当前秒的请求 windows.computeIfAbsent(currentTime, k -> new AtomicInteger()) .incrementAndGet(); return true; } private void cleanExpiredWindows(long currentTime) { long expiredTime = currentTime - windowSize; windows.keySet().removeIf(time -> time < expiredTime); } }

五、监控与运维

5.1 性能监控

java

/** * Netty指标监控 */ @Component @Slf4j public class NettyMetrics { private final MeterRegistry meterRegistry; private final ChannelGroup channelGroup; // 监控指标 private final AtomicLong totalConnections = new AtomicLong(0); private final AtomicLong activeConnections = new AtomicLong(0); private final AtomicLong totalMessages = new AtomicLong(0); private final AtomicLong messageProcessingTime = new AtomicLong(0); public NettyMetrics(MeterRegistry meterRegistry, ChannelGroup channelGroup) { this.meterRegistry = meterRegistry; this.channelGroup = channelGroup; initMetrics(); } private void initMetrics() { // 注册监控指标 Gauge.builder("websocket.connections.active", activeConnections::get) .description("活跃连接数") .register(meterRegistry); Gauge.builder("websocket.connections.total", totalConnections::get) .description("总连接数") .register(meterRegistry); Counter.builder("websocket.messages.total") .description("总消息数") .register(meterRegistry); Timer.builder("websocket.message.processing.time") .description("消息处理时间") .register(meterRegistry); // 定时收集指标 ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); executor.scheduleAtFixedRate(this::collectMetrics, 0, 5, TimeUnit.SECONDS); } private void collectMetrics() { activeConnections.set(channelGroup.size()); // 收集内存使用情况 MemoryUsage heapMemoryUsage = ManagementFactory.getMemoryMXBean() .getHeapMemoryUsage(); MemoryUsage nonHeapMemoryUsage = ManagementFactory.getMemoryMXBean() .getNonHeapMemoryUsage(); log.info("WebSocket监控指标 - " + "活跃连接: {}, " + "堆内存使用: {}MB, " + "非堆内存使用: {}MB", activeConnections.get(), heapMemoryUsage.getUsed() / 1024 / 1024, nonHeapMemoryUsage.getUsed() / 1024 / 1024); } /** * 记录消息处理时间 */ public void recordMessageProcessing(long startTime) { long duration = System.currentTimeMillis() - startTime; messageProcessingTime.addAndGet(duration); totalMessages.incrementAndGet(); } } /** * 监控端点 */ @RestController @RequestMapping("/monitor") @Slf4j public class MonitorController { @Autowired private ChannelGroup channelGroup; @Autowired private NettyMetrics nettyMetrics; @GetMapping("/connections") public Map<String, Object> getConnectionInfo() { Map<String, Object> result = new HashMap<>(); result.put("activeConnections", channelGroup.size()); result.put("channels", getChannelDetails()); return result; } private List<Map<String, Object>> getChannelDetails() { List<Map<String, Object>> channels = new ArrayList<>(); channelGroup.forEach(channel -> { Map<String, Object> channelInfo = new HashMap<>(); channelInfo.put("id", channel.id().asLongText()); channelInfo.put("active", channel.isActive()); channelInfo.put("remoteAddress", channel.remoteAddress()); Attribute<String> userIdAttr = channel.attr(AttributeKey.valueOf("userId")); if (userIdAttr != null) { channelInfo.put("userId", userIdAttr.get()); } channels.add(channelInfo); }); return channels; } }

5.2 日志与追踪

java

/** * 全链路追踪 */ public class TraceHandler extends ChannelDuplexHandler { private static final String TRACE_ID = "traceId"; private static final String SPAN_ID = "spanId"; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof WebSocketMessage) { WebSocketMessage message = (WebSocketMessage) msg; // 生成或获取traceId String traceId = message.getTraceId(); if (traceId == null) { traceId = generateTraceId(); message.setTraceId(traceId); } // 创建span Span span = createSpan(traceId, "websocket.process"); try { // 将trace信息存入Channel属性 ctx.channel().attr(AttributeKey.valueOf(TRACE_ID)).set(traceId); ctx.channel().attr(AttributeKey.valueOf(SPAN_ID)).set(span.getSpanId()); // 记录开始时间 ctx.channel().attr(AttributeKey.valueOf("startTime")) .set(System.currentTimeMillis()); // 继续处理 ctx.fireChannelRead(msg); } finally { // 结束span finishSpan(span); } } else { ctx.fireChannelRead(msg); } } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { if (msg instanceof WebSocketMessage) { WebSocketMessage message = (WebSocketMessage) msg; // 获取trace信息 String traceId = ctx.channel().attr(AttributeKey.valueOf(TRACE_ID)).get(); if (traceId != null) { message.setTraceId(traceId); } // 记录发送日志 log.info("发送消息,traceId: {}, 消息类型: {}", traceId, message.getType()); } ctx.write(msg, promise); } private String generateTraceId() { return UUID.randomUUID().toString().replace("-", ""); } private Span createSpan(String traceId, String operation) { Span span = new Span(); span.setTraceId(traceId); span.setSpanId(UUID.randomUUID().toString().replace("-", "")); span.setOperation(operation); span.setStartTime(System.currentTimeMillis()); // 存储span SpanStorage.storeSpan(span); return span; } private void finishSpan(Span span) { span.setEndTime(System.currentTimeMillis()); span.setDuration(span.getEndTime() - span.getStartTime()); // 记录到日志 log.info("Span完成: {}, 耗时: {}ms", span.getOperation(), span.getDuration()); } }

六、部署与优化

6.1 Docker容器化部署

dockerfile

# Dockerfile FROM openjdk:11-jre-slim # 设置时区 ENV TZ=Asia/Shanghai RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone # 创建应用目录 RUN mkdir -p /app WORKDIR /app # 复制JAR文件 COPY target/websocket-server.jar /app/app.jar # JVM优化参数 ENV JAVA_OPTS="-server \ -XX:+UseG1GC \ -XX:MaxGCPauseMillis=200 \ -XX:+HeapDumpOnOutOfMemoryError \ -XX:HeapDumpPath=/app/logs/heapdump.hprof \ -Xms512m \ -Xmx1024m \ -XX:MaxDirectMemorySize=256m" # 暴露端口 EXPOSE 8080 8081 # 健康检查 HEALTHCHECK --interval=30s --timeout=3s --retries=3 \ CMD curl -f http://localhost:8081/health || exit 1 # 启动命令 ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -jar /app/app.jar"]

6.2 Kubernetes部署配置

yaml

# websocket-deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: websocket-server spec: replicas: 3 selector: matchLabels: app: websocket template: metadata: labels: app: websocket spec: containers: - name: websocket image: websocket-server:latest ports: - containerPort: 8080 name: websocket - containerPort: 8081 name: management resources: requests: memory: "1Gi" cpu: "500m" limits: memory: "2Gi" cpu: "1000m" env: - name: SPRING_PROFILES_ACTIVE value: "prod" - name: REDIS_HOST value: "redis-cluster" - name: JAVA_OPTS value: "-XX:+UseContainerSupport -XX:InitialRAMPercentage=50.0 -XX:MaxRAMPercentage=80.0" livenessProbe: httpGet: path: /actuator/health port: 8081 initialDelaySeconds: 60 periodSeconds: 10 readinessProbe: httpGet: path: /actuator/health port: 8081 initialDelaySeconds: 30 periodSeconds: 5 --- # Service配置 apiVersion: v1 kind: Service metadata: name: websocket-service spec: selector: app: websocket ports: - port: 80 targetPort: 8080 name: websocket type: ClusterIP --- # Ingress配置 apiVersion: networking.k8s.io/v1 kind: Ingress metadata: name: websocket-ingress annotations: nginx.ingress.kubernetes.io/proxy-read-timeout: "3600" nginx.ingress.kubernetes.io/proxy-send-timeout: "3600" nginx.ingress.kubernetes.io/upstream-hash-by: "$remote_addr" spec: rules: - host: websocket.example.com http: paths: - path: /ws pathType: Prefix backend: service: name: websocket-service port: number: 80

6.3 性能调优参数

java

/** * 性能调优配置类 */ @Configuration @Slf4j public class PerformanceConfig { @Bean public ServerBootstrap serverBootstrap(EventLoopGroup bossGroup, EventLoopGroup workerGroup) { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // TCP参数优化 .option(ChannelOption.SO_BACKLOG, 1024) // 连接队列大小 .option(ChannelOption.SO_REUSEADDR, true) // 地址重用 // 子通道参数 .childOption(ChannelOption.SO_KEEPALIVE, true) // 保持连接 .childOption(ChannelOption.TCP_NODELAY, true) // 禁用Nagle算法 .childOption(ChannelOption.SO_RCVBUF, 65536) // 接收缓冲区 .childOption(ChannelOption.SO_SNDBUF, 65536) // 发送缓冲区 .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(32 * 1024, 64 * 1024)) // 水位线 // 内存分配器 .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) // 连接超时 .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000); return bootstrap; } @Bean public EventLoopGroup workerGroup() { // 根据CPU核心数动态设置线程数 int workerThreads = Math.max(1, Runtime.getRuntime().availableProcessors() * 2); return new NioEventLoopGroup(workerThreads, new ThreadFactory() { private final AtomicInteger counter = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("netty-worker-" + counter.incrementAndGet()); thread.setPriority(Thread.NORM_PRIORITY); thread.setDaemon(false); return thread; } }); } @Bean public ByteBufAllocator byteBufAllocator() { // 配置详细的ByteBuf分配器 return new PooledByteBufAllocator( true, // 优先使用堆外内存 0, // 堆内存arena数量,0表示使用默认值 0, // 堆外内存arena数量,0表示使用默认值 8192, // page大小 11, // maxOrder 64, // tinyCacheSize 256, // smallCacheSize 64 // normalCacheSize ); } }

七、故障排查与容灾

7.1 常见问题排查

java

/** * 故障排查工具类 */ @Component @Slf4j public class TroubleshootingUtils { /** * 诊断连接问题 */ public void diagnoseConnectionIssue(Channel channel) { if (channel == null) { log.warn("Channel is null"); return; } log.info("Channel诊断信息:"); log.info(" - Channel ID: {}", channel.id().asLongText()); log.info(" - 是否活跃: {}", channel.isActive()); log.info(" - 是否可写: {}", channel.isWritable()); log.info(" - 是否打开: {}", channel.isOpen()); log.info(" - 本地地址: {}", channel.localAddress()); log.info(" - 远程地址: {}", channel.remoteAddress()); // 检查缓冲区水位 if (channel.isWritable()) { log.info(" - 缓冲区状态: 正常"); } else { log.warn(" - 缓冲区状态: 高水位"); } // 检查EventLoop状态 EventLoop eventLoop = channel.eventLoop(); log.info(" - EventLoop: {}", eventLoop); log.info(" - EventLoop是否在事件循环中: {}", eventLoop.inEventLoop()); } /** * 诊断内存使用情况 */ public void diagnoseMemoryUsage() { Runtime runtime = Runtime.getRuntime(); long totalMemory = runtime.totalMemory(); long freeMemory = runtime.freeMemory(); long usedMemory = totalMemory - freeMemory; long maxMemory = runtime.maxMemory(); log.info("内存使用情况:"); log.info(" - 总内存: {}MB", totalMemory / 1024 / 1024); log.info(" - 已使用: {}MB", usedMemory / 1024 / 1024); log.info(" - 空闲内存: {}MB", freeMemory / 1024 / 1024); log.info(" - 最大内存: {}MB", maxMemory / 1024 / 1024); // 检查直接内存使用 try { Class<?> c = Class.forName("java.nio.Bits"); Field maxMemoryField = c.getDeclaredField("maxMemory"); Field reservedMemoryField = c.getDeclaredField("reservedMemory"); maxMemoryField.setAccessible(true); reservedMemoryField.setAccessible(true); Long maxDirectMemory = (Long) maxMemoryField.get(null); AtomicLong reservedMemory = (AtomicLong) reservedMemoryField.get(null); log.info("直接内存使用情况:"); log.info(" - 最大直接内存: {}MB", maxDirectMemory / 1024 / 1024); log.info(" - 已保留直接内存: {}MB", reservedMemory.get() / 1024 / 1024); } catch (Exception e) { log.warn("无法获取直接内存信息", e); } } /** * 生成诊断报告 */ public Map<String, Object> generateDiagnosticReport() { Map<String, Object> report = new LinkedHashMap<>(); // 系统信息 report.put("timestamp", System.currentTimeMillis()); report.put("jvmVersion", System.getProperty("java.version")); report.put("os", System.getProperty("os.name")); // 内存信息 Runtime runtime = Runtime.getRuntime(); Map<String, Object> memoryInfo = new HashMap<>(); memoryInfo.put("total", runtime.totalMemory()); memoryInfo.put("free", runtime.freeMemory()); memoryInfo.put("max", runtime.maxMemory()); memoryInfo.put("used", runtime.totalMemory() - runtime.freeMemory()); report.put("memory", memoryInfo); // 线程信息 ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); Map<String, Object> threadInfo = new HashMap<>(); threadInfo.put("threadCount", threadBean.getThreadCount()); threadInfo.put("peakThreadCount", threadBean.getPeakThreadCount()); report.put("threads", threadInfo); // GC信息 List<GarbageCollectorMXBean> gcBeans = ManagementFactory.getGarbageCollectorMXBeans(); List<Map<String, Object>> gcInfoList = new ArrayList<>(); for (GarbageCollectorMXBean gcBean : gcBeans) { Map<String, Object> gcInfo = new HashMap<>(); gcInfo.put("name", gcBean.getName()); gcInfo.put("collectionCount", gcBean.getCollectionCount()); gcInfo.put("collectionTime", gcBean.getCollectionTime()); gcInfoList.add(gcInfo); } report.put("garbageCollectors", gcInfoList); return report; } }

7.2 容灾与降级

java

/** * 熔断降级处理器 */ @Component @Slf4j public class CircuitBreakerHandler { private final Map<String, CircuitBreaker> breakers = new ConcurrentHashMap<>(); private static final int FAILURE_THRESHOLD = 10; private static final long TIMEOUT = 5000; // 5秒超时 private static final long RESET_TIMEOUT = 60000; // 60秒重置 public <T> T execute(String serviceName, Supplier<T> supplier) { CircuitBreaker breaker = breakers.computeIfAbsent(serviceName, k -> new CircuitBreaker(FAILURE_THRESHOLD, RESET_TIMEOUT)); if (breaker.isOpen()) { log.warn("熔断器已打开,服务降级: {}", serviceName); return fallback(serviceName); } try { CompletableFuture<T> future = CompletableFuture.supplyAsync(supplier); T result = future.get(TIMEOUT, TimeUnit.MILLISECONDS); breaker.recordSuccess(); return result; } catch (TimeoutException e) { log.error("服务调用超时: {}", serviceName, e); breaker.recordFailure(); return fallback(serviceName); } catch (Exception e) { log.error("服务调用失败: {}", serviceName, e); breaker.recordFailure(); return fallback(serviceName); } } private <T> T fallback(String serviceName) { // 降级逻辑 if ("userService".equals(serviceName)) { return (T) new DefaultUser(); } return null; } /** * 熔断器实现 */ private static class CircuitBreaker { private final int failureThreshold; private final long resetTimeout; private int failureCount = 0; private long lastFailureTime = 0; private State state = State.CLOSED; enum State { CLOSED, OPEN, HALF_OPEN } public CircuitBreaker(int failureThreshold, long resetTimeout) { this.failureThreshold = failureThreshold; this.resetTimeout = resetTimeout; } public synchronized void recordFailure() { failureCount++; lastFailureTime = System.currentTimeMillis(); if (failureCount >= failureThreshold) { state = State.OPEN; log.warn("熔断器状态变更为OPEN"); // 定时恢复 ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); scheduler.schedule(this::attemptReset, resetTimeout, TimeUnit.MILLISECONDS); } } public synchronized void recordSuccess() { if (state == State.HALF_OPEN) { state = State.CLOSED; failureCount = 0; log.info("熔断器状态变更为CLOSED"); } } public synchronized boolean isOpen() { if (state == State.OPEN) { return true; } // 检查是否需要自动恢复 if (state == State.HALF_OPEN) { return false; } return false; } private synchronized void attemptReset() { if (state == State.OPEN) { state = State.HALF_OPEN; log.info("熔断器状态变更为HALF_OPEN,尝试恢复"); } } } }

总结

通过本文的详细介绍,我们完成了一个高性能WebSocket服务的完整构建。关键点总结:

核心优势:

  1. 高性能:Netty的Reactor模型支持10万+并发连接

  2. 低延迟:零拷贝和内存池优化

  3. 高可靠:完善的心跳、重连、消息确认机制

  4. 易扩展:分布式架构支持水平扩展

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询