花莲县网站建设_网站建设公司_网站建设_seo优化
2025/12/18 20:35:37 网站建设 项目流程

WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,允许服务器和客户端之间进行实时双向通信。

基本使用

1. 创建 WebSocket 连接

// 创建 WebSocket 连接
const socket = new WebSocket('ws://localhost:8080');// 或者使用安全连接
const secureSocket = new WebSocket('wss://example.com/socket');

2. WebSocket 事件

// 连接建立时触发
socket.onopen = function(event) {console.log('连接已建立');socket.send('Hello Server!');
};// 接收到消息时触发
socket.onmessage = function(event) {console.log('收到消息:', event.data);// 处理接收到的数据
};// 发生错误时触发
socket.onerror = function(error) {console.error('WebSocket 错误:', error);
};// 连接关闭时触发
socket.onclose = function(event) {console.log('连接关闭', event.code, event.reason);// 可以在这里尝试重连
};

完整示例

客户端示例

<!DOCTYPE html>
<html>
<head><title>WebSocket 示例</title>
</head>
<body><div><input type="text" id="messageInput" placeholder="输入消息"><button onclick="sendMessage()">发送</button></div><div id="messages"></div><script>// 创建 WebSocket 连接
        const socket = new WebSocket('ws://localhost:8080');const messagesDiv = document.getElementById('messages');// 连接建立
        socket.onopen = function() {addMessage('系统', '连接成功!');};// 接收消息
        socket.onmessage = function(event) {try {const data = JSON.parse(event.data);addMessage(data.sender, data.message);} catch (e) {addMessage('系统', event.data);}};// 错误处理
        socket.onerror = function(error) {addMessage('系统', '连接错误');};// 连接关闭
        socket.onclose = function() {addMessage('系统', '连接已关闭');};// 发送消息function sendMessage() {const input = document.getElementById('messageInput');const message = input.value.trim();if (message) {socket.send(JSON.stringify({type: 'message',content: message,timestamp: new Date().toISOString()}));input.value = '';}}// 显示消息function addMessage(sender, text) {const msgElement = document.createElement('div');msgElement.innerHTML = `<strong>${sender}:</strong> ${text}`;
            messagesDiv.appendChild(msgElement);messagesDiv.scrollTop = messagesDiv.scrollHeight;}// 关闭连接(页面卸载时)
        window.addEventListener('beforeunload', function() {if (socket.readyState === WebSocket.OPEN) {socket.close(1000, '用户离开页面');}});</script>
</body>
</html>

Node.js 服务器端示例

// 使用 ws 库
const WebSocket = require('ws');// 创建 WebSocket 服务器
const wss = new WebSocket.Server({ port: 8080 });console.log('WebSocket 服务器启动在 ws://localhost:8080');// 连接处理
wss.on('connection', function connection(ws) {console.log('新客户端连接');// 发送欢迎消息
    ws.send(JSON.stringify({type: 'system',message: '欢迎连接到服务器!'}));// 接收客户端消息ws.on('message', function incoming(message) {console.log('收到消息:', message);try {const data = JSON.parse(message);// 广播消息给所有客户端wss.clients.forEach(function each(client) {if (client !== ws && client.readyState === WebSocket.OPEN) {client.send(JSON.stringify({type: 'message',sender: '用户',message: data.content,timestamp: new Date().toISOString()}));}});} catch (error) {console.error('消息解析错误:', error);}});// 连接关闭ws.on('close', function() {console.log('客户端断开连接');});// 错误处理ws.on('error', function(error) {console.error('WebSocket 错误:', error);});
});

WebSocket 状态

// 检查连接状态
switch(socket.readyState) {case WebSocket.CONNECTING:  // 0 - 连接中console.log('连接中...');break;case WebSocket.OPEN:        // 1 - 已连接console.log('已连接');break;case WebSocket.CLOSING:     // 2 - 关闭中console.log('正在关闭...');break;case WebSocket.CLOSED:      // 3 - 已关闭console.log('已关闭');break;
}

高级特性

1. 心跳检测

// 心跳检测
let heartbeatInterval;socket.onopen = function() {console.log('连接建立');// 开始心跳heartbeatInterval = setInterval(() => {if (socket.readyState === WebSocket.OPEN) {socket.send(JSON.stringify({ type: 'ping' }));}}, 30000);
};socket.onclose = function() {// 清除心跳
    clearInterval(heartbeatInterval);
};

2. 重连机制

class WebSocketClient {constructor(url) {this.url = url;this.socket = null;this.reconnectAttempts = 0;this.maxReconnectAttempts = 5;this.reconnectDelay = 1000;}connect() {this.socket = new WebSocket(this.url);this.socket.onopen = () => {console.log('连接成功');this.reconnectAttempts = 0;};this.socket.onclose = (event) => {console.log('连接断开,尝试重连...');this.reconnect();};this.socket.onerror = (error) => {console.error('连接错误:', error);};}reconnect() {if (this.reconnectAttempts < this.maxReconnectAttempts) {this.reconnectAttempts++;const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts);setTimeout(() => {console.log(`第 ${this.reconnectAttempts} 次重连`);this.connect();}, delay);} else {console.error('重连次数已达上限');}}send(data) {if (this.socket.readyState === WebSocket.OPEN) {this.socket.send(data);}}
}

3. 二进制数据传输

// 发送二进制数据
socket.onopen = function() {// 发送 ArrayBufferconst buffer = new ArrayBuffer(4);const view = new Uint8Array(buffer);view[0] = 1;view[1] = 2;view[2] = 3;view[3] = 4;socket.send(buffer);// 发送 Blobconst blob = new Blob(['Hello'], { type: 'text/plain' });socket.send(blob);
};// 接收二进制数据
socket.binaryType = 'arraybuffer'; // 或 'blob'

socket.onmessage = function(event) {if (event.data instanceof ArrayBuffer) {// 处理 ArrayBufferconst view = new Uint8Array(event.data);console.log('收到二进制数据:', view);} else {// 处理文本数据console.log('收到文本数据:', event.data);}
};

Spring Boot 中使用 WebSocket

添加依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

基础配置类

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {@Overridepublic void configureMessageBroker(MessageBrokerRegistry config) {// 消息代理前缀config.enableSimpleBroker("/topic", "/queue");// 应用目的地前缀config.setApplicationDestinationPrefixes("/app");// 用户目的地前缀(一对一消息)config.setUserDestinationPrefix("/user");}@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {// 注册 WebSocket 端点registry.addEndpoint("/ws").setAllowedOriginPatterns("*").withSockJS();  // 支持 SockJS 降级// 也可以添加多个端点registry.addEndpoint("/ws-native").setAllowedOriginPatterns("*");}@Overridepublic void configureWebSocketTransport(WebSocketTransportRegistration registration) {// 配置传输限制registration.setMessageSizeLimit(128 * 1024); // 消息大小限制 128KBregistration.setSendTimeLimit(20 * 1000);    // 发送超时 20秒registration.setSendBufferSizeLimit(512 * 1024); // 发送缓冲区限制 512KB
    }
}

控制器示例

@Controller
public class WebSocketController {// 注入消息模板
    @Autowiredprivate SimpMessagingTemplate messagingTemplate;/*** 处理客户端发送的消息* 目的地:/app/chat*/@MessageMapping("/chat")@SendTo("/topic/messages")public ChatMessage handleMessage(ChatMessage message) {message.setTimestamp(new Date());System.out.println("收到消息: " + message.getContent());return message;}/*** 发送广播消息*/@GetMapping("/broadcast")public void broadcast(String content) {ChatMessage message = new ChatMessage();message.setContent(content);message.setSender("系统");message.setTimestamp(new Date());// 发送到 /topic/messagesmessagingTemplate.convertAndSend("/topic/messages", message);}/*** 发送点对点消息*/@GetMapping("/sendToUser")public void sendToUser(String userId, String content) {ChatMessage message = new ChatMessage();message.setContent(content);message.setSender("管理员");message.setTimestamp(new Date());// 发送给指定用户:/user/{userId}/queue/messages
        messagingTemplate.convertAndSendToUser(userId, "/queue/messages", message);}
}// 消息实体类
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ChatMessage {private String sender;private String content;private Date timestamp;
}

连接拦截器

@Component
public class WebSocketInterceptor extends ChannelInterceptorAdapter {@Overridepublic Message<?> preSend(Message<?> message, MessageChannel channel) {StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);if (StompCommand.CONNECT.equals(accessor.getCommand())) {// 连接建立时处理String token = accessor.getFirstNativeHeader("token");// 验证 token...System.out.println("用户连接: " + accessor.getSessionId());} else if (StompCommand.DISCONNECT.equals(accessor.getCommand())) {// 连接断开时处理System.out.println("用户断开: " + accessor.getSessionId());}return message;}
}

原生 Java WebSocket(JSR 356)

注解方式

@ServerEndpoint("/chat/{userId}")
@Component
public class ChatEndpoint {// 存储所有连接private static final Map<String, Session> sessions = new ConcurrentHashMap<>();// 存储用户ID和session的映射private static final Map<String, String> userSessionMap = new ConcurrentHashMap<>();/*** 连接建立时调用*/@OnOpenpublic void onOpen(Session session, @PathParam("userId") String userId) {System.out.println("连接建立: " + session.getId() + ", 用户: " + userId);// 保存连接
        sessions.put(session.getId(), session);userSessionMap.put(userId, session.getId());// 通知其他用户有新用户上线broadcast("系统", "用户 " + userId + " 上线了");}/*** 收到消息时调用*/@OnMessagepublic void onMessage(String message, Session session, @PathParam("userId") String userId) {System.out.println("收到消息: " + message + " from: " + userId);try {// 解析消息JSONObject json = new JSONObject(message);String content = json.getString("content");String toUserId = json.optString("to", null);if (toUserId != null) {// 私聊消息
                sendToUser(userId, toUserId, content);} else {// 群发消息
                broadcast(userId, content);}} catch (Exception e) {e.printStackTrace();}}/*** 连接关闭时调用*/@OnClosepublic void onClose(Session session, @PathParam("userId") String userId) {System.out.println("连接关闭: " + session.getId());// 移除连接
        sessions.remove(session.getId());userSessionMap.remove(userId);// 通知其他用户broadcast("系统", "用户 " + userId + " 下线了");}/*** 发生错误时调用*/@OnErrorpublic void onError(Session session, Throwable error) {System.out.println("连接错误: " + session.getId());error.printStackTrace();}/*** 广播消息给所有用户*/private void broadcast(String sender, String content) {JSONObject message = new JSONObject();message.put("sender", sender);message.put("content", content);message.put("timestamp", System.currentTimeMillis());message.put("type", "broadcast");// 发送给所有连接的客户端for (Session session : sessions.values()) {if (session.isOpen()) {try {session.getAsyncRemote().sendText(message.toString());} catch (Exception e) {e.printStackTrace();}}}}/*** 发送私聊消息*/private void sendToUser(String fromUserId, String toUserId, String content) {String toSessionId = userSessionMap.get(toUserId);if (toSessionId != null) {Session toSession = sessions.get(toSessionId);if (toSession != null && toSession.isOpen()) {try {JSONObject message = new JSONObject();message.put("sender", fromUserId);message.put("content", content);message.put("timestamp", System.currentTimeMillis());message.put("type", "private");toSession.getAsyncRemote().sendText(message.toString());} catch (Exception e) {e.printStackTrace();}}}}
}

编程方式(继承 Endpoint 类)

@ServerEndpoint("/game")
public class GameEndpoint extends Endpoint {private static final Set<Session> sessions = Collections.synchronizedSet(new HashSet<>());@Overridepublic void onOpen(Session session, EndpointConfig config) {System.out.println("新连接: " + session.getId());sessions.add(session);// 添加消息处理器session.addMessageHandler(new MessageHandler.Whole<String>() {@Overridepublic void onMessage(String message) {System.out.println("收到: " + message);// 处理游戏逻辑
                handleGameMessage(session, message);}});// 发送欢迎消息try {JSONObject welcome = new JSONObject();welcome.put("type", "welcome");welcome.put("message", "欢迎加入游戏!");welcome.put("sessionId", session.getId());session.getBasicRemote().sendText(welcome.toString());} catch (IOException e) {e.printStackTrace();}}@Overridepublic void onClose(Session session, CloseReason closeReason) {System.out.println("连接关闭: " + session.getId());sessions.remove(session);// 通知其他玩家
        broadcastPlayerLeft(session.getId());}@Overridepublic void onError(Session session, Throwable thr) {System.err.println("连接错误: " + session.getId());thr.printStackTrace();}private void handleGameMessage(Session session, String message) {try {JSONObject json = new JSONObject(message);String type = json.getString("type");switch (type) {case "move":// 处理移动
                    handlePlayerMove(session, json);break;case "chat":// 处理聊天
                    handleChatMessage(session, json);break;default:System.out.println("未知消息类型: " + type);}} catch (Exception e) {e.printStackTrace();}}private void handlePlayerMove(Session session, JSONObject moveData) {// 处理玩家移动逻辑// 广播给所有玩家
        broadcastGameUpdate(moveData);}private void handleChatMessage(Session session, JSONObject chatData) {// 广播聊天消息JSONObject broadcastMsg = new JSONObject();broadcastMsg.put("type", "chat");broadcastMsg.put("sender", session.getId());broadcastMsg.put("message", chatData.getString("message"));broadcastMsg.put("timestamp", System.currentTimeMillis());broadcast(broadcastMsg.toString());}private void broadcast(String message) {synchronized (sessions) {for (Session s : sessions) {if (s.isOpen()) {try {s.getBasicRemote().sendText(message);} catch (IOException e) {e.printStackTrace();}}}}}
}

 配置文件

application.yml 配置

spring:websocket:# WebSocket 配置enabled: trueserver:# 服务器配置port: 8080servlet:context-path: /api# 自定义配置
websocket:max-sessions: 1000heartbeat-interval: 30000max-message-size: 128KB

心跳检测和连接管理

@Component
public class WebSocketHeartbeat {@Autowiredprivate SimpMessagingTemplate messagingTemplate;private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);@PostConstructpublic void init() {// 每30秒发送一次心跳scheduler.scheduleAtFixedRate(() -> {try {messagingTemplate.convertAndSend("/topic/heartbeat", Map.of("timestamp", System.currentTimeMillis(), "type", "heartbeat"));} catch (Exception e) {e.printStackTrace();}}, 0, 30, TimeUnit.SECONDS);}@PreDestroypublic void destroy() {scheduler.shutdown();}
}

消息编码器/解码器

// 自定义消息编解码器
@Component
public class ChatMessageConverter implements MessageConverter {@Overridepublic Message<?> toMessage(Object payload, MessageHeaders headers) {if (payload instanceof ChatMessage) {ChatMessage msg = (ChatMessage) payload;byte[] bytes = serializeMessage(msg);return MessageBuilder.createMessage(bytes, headers);}return null;}@Overridepublic Object fromMessage(Message<?> message, Class<?> targetClass) {if (targetClass == ChatMessage.class) {byte[] bytes = (byte[]) message.getPayload();return deserializeMessage(bytes);}return null;}private byte[] serializeMessage(ChatMessage message) {try {return new ObjectMapper().writeValueAsBytes(message);} catch (Exception e) {throw new RuntimeException("序列化失败", e);}}private ChatMessage deserializeMessage(byte[] bytes) {try {return new ObjectMapper().readValue(bytes, ChatMessage.class);} catch (Exception e) {throw new RuntimeException("反序列化失败", e);}}
}

集群支持

@Configuration
@EnableRedisRepositories
public class RedisConfig {@Beanpublic RedisMessageListenerContainer redisContainer(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);return container;}@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));return template;}
}// Redis 广播消息
@Component
public class RedisMessagePublisher {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;public void publish(String channel, Object message) {redisTemplate.convertAndSend(channel, message);}
}@Component
public class RedisMessageSubscriber implements MessageListener {@Autowiredprivate SimpMessagingTemplate messagingTemplate;@Overridepublic void onMessage(Message message, byte[] pattern) {// 处理从 Redis 收到的消息// 转发给 WebSocket 客户端String channel = new String(pattern);String msg = new String(message.getBody());messagingTemplate.convertAndSend("/topic/" + channel, msg);}
}

Spring Boot 的 STOMP 实现更加完整和易于使用,而原生 WebSocket 则更加灵活。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询