SSE (Server-Sent Events) 通用配置指南
目录
- SSE 简介
- SSE vs WebSocket vs 轮询
- 服务端配置
- 客户端使用
- 最佳实践
- 常见问题
- 性能优化
- 安全考虑
- 生产环境部署
SSE 简介
什么是 SSE?
Server-Sent Events (SSE) 是一种基于 HTTP 的服务器推送技术,允许服务器向客户端单向发送实时数据。
SSE 特点
| 特性 | 说明 |
|---|---|
| 单向通信 | 服务器 → 客户端(推送消息) |
| 基于 HTTP | 使用标准 HTTP/HTTPS 协议 |
| 自动重连 | 浏览器自动处理断线重连 |
| 文本格式 | 仅支持文本数据(UTF-8 编码) |
| EventSource API | 浏览器原生支持,无需额外库 |
适用场景
✅ 适合:
- 实时通知(新订单、新用户注册等)
- 股票价格、汇率实时更新
- 社交媒体动态推送
- 系统状态监控
- 日志实时输出
- 进度条更新
❌ 不适合:
- 双向实时通信(聊天、游戏等)→ 使用 WebSocket
- 二进制数据传输 → 使用 WebSocket
- 需要高频率双向通信 → 使用 WebSocket
SSE vs WebSocket vs 轮询
| 特性 | SSE | WebSocket | HTTP 轮询 |
|---|---|---|---|
| 通信方向 | 单向(服务器→客户端) | 双向 | 客户端→服务器 |
| 连接方式 | 长连接 | 长连接 | 短连接 |
| 协议 | HTTP/HTTPS | WS/WSS | HTTP/HTTPS |
| 自动重连 | ✅ 原生支持 | ❌ 需手动实现 | - |
| 数据格式 | 文本 | 文本/二进制 | JSON/XML |
| 浏览器支持 | 现代浏览器全支持 | IE10+ | 全支持 |
| 服务器资源 | 低 | 中 | 高 |
| 实现复杂度 | 简单 | 中等 | 简单 |
| 防火墙穿透 | ✅ 容易 | ⚠️ 可能受阻 | ✅ 容易 |
| 跨域 | ✅ 支持 CORS | ✅ 支持 CORS | ✅ 支持 CORS |
选择建议
场景 → 技术选择需要实时推送,只需服务器→客户端↓使用 SSE ✅需要双向实时通信↓使用 WebSocket ✅不需要实时,偶尔更新即可↓使用 HTTP 轮询 ✅需要兼容旧浏览器(IE9 及以下)↓使用 HTTP 轮询 ✅
服务端配置
1. ASP.NET Core (.NET 6+)
SSE 服务接口
// ISseService.cs
using Microsoft.AspNetCore.Http;public interface ISseService
{string AddConnection(HttpResponse response);Task BroadcastAsync(string eventType, string data);Task SendToClientAsync(string connectionId, string eventType, string data);int GetConnectedClientCount();Task RemoveConnectionAsync(string connectionId);
}
SSE 服务实现
// SseService.cs
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using System.Collections.Concurrent;
using System.Text;
using System.Threading.Tasks;public class SseService : ISseService
{private readonly ConcurrentDictionary<string, HttpResponse> _connections = new();private readonly ILogger<SseService> _logger;public SseService(ILogger<SseService> logger){_logger = logger;}public string AddConnection(HttpResponse response){var connectionId = Guid.NewGuid().ToString();_connections.TryAdd(connectionId, response);_logger.LogInformation($"SSE 连接添加: {connectionId}");return connectionId;}public async Task BroadcastAsync(string eventType, string data){foreach (var connection in _connections){await SendMessageAsync(connection.Value, eventType, data);}_logger.LogInformation($"SSE 广播发送至 {_connections.Count} 个客户端: {eventType}");}public async Task SendToClientAsync(string connectionId, string eventType, string data){if (_connections.TryGetValue(connectionId, out var response)){await SendMessageAsync(response, eventType, data);}}private async Task SendMessageAsync(HttpResponse response, string eventType, string data){try{await response.WriteAsync($"event: {eventType}\n");await response.WriteAsync($"data: {data}\n\n");await response.Body.FlushAsync();}catch (Exception ex){_logger.LogError($"SSE 消息发送错误: {ex.Message}");}}public int GetConnectedClientCount(){return _connections.Count;}public async Task RemoveConnectionAsync(string connectionId){if (_connections.TryRemove(connectionId, out var response)){try{await response.WriteAsync("event: close\ndata: Connection closed\n\n");await response.Body.FlushAsync();}catch { }}}
}
注册服务
// Program.cs
builder.Services.AddSingleton<ISseService, SseService>();
SSE 控制器
// SseController.cs
using Microsoft.AspNetCore.Cors;
using Microsoft.AspNetCore.Mvc;
using System.Threading.Tasks;[ApiController]
[Route("api/[controller]")]
[EnableCors("any")]
public class SseController : ControllerBase
{private readonly ISseService _sseService;public SseController(ISseService sseService){_sseService = sseService;}[HttpGet][Route("connect")]public async Task Connect(){// 设置 SSE 响应头Response.Headers.Add("Content-Type", "text/event-stream");Response.Headers.Add("Cache-Control", "no-cache");Response.Headers.Add("Connection", "keep-alive");Response.Headers.Add("X-Accel-Buffering", "no"); // Nginx 禁用缓冲var connectionId = _sseService.AddConnection(Response);try{// 发送连接确认消息await _sseService.SendToClientAsync(connectionId, "connected",Newtonsoft.Json.JsonConvert.SerializeObject(new{connectionId,message = "SSE connection established",timestamp = DateTime.Now}));// 保持连接活跃while (!HttpContext.RequestAborted.IsCancellationRequested){await Task.Delay(30000); // 每30秒发送心跳await _sseService.SendToClientAsync(connectionId, "heartbeat", DateTime.Now.ToString());}}catch{// 客户端断开}finally{await _sseService.RemoveConnectionAsync(connectionId);}}[HttpGet][Route("count")]public IActionResult GetConnectionCount(){return Ok(new { count = _sseService.GetConnectedClientCount() });}[HttpPost][Route("broadcast")]public async Task<IActionResult> Broadcast([FromBody] BroadcastRequest request){await _sseService.BroadcastAsync(request.EventType, request.Message);return Ok(new { success = true });}
}public class BroadcastRequest
{public string EventType { get; set; }public string Message { get; set; }
}
2. Node.js + Express
基本实现
// server.js
const express = require('express');
const cors = require('cors');
const app = express();app.use(cors());
app.use(express.json());// 存储所有客户端连接
const clients = new Set();// SSE 连接端点
app.get('/sse', (req, res) => {// 设置 SSE 响应头res.setHeader('Content-Type', 'text/event-stream');res.setHeader('Cache-Control', 'no-cache');res.setHeader('Connection', 'keep-alive');res.setHeader('X-Accel-Buffering', 'no'); // Nginx 禁用缓冲const clientId = Date.now();clients.add(res);console.log(`客户端连接: ${clientId}`);// 发送连接确认res.write(`event: connected\ndata: ${JSON.stringify({ clientId, message: 'Connected' })}\n\n`);// 发送心跳const heartbeat = setInterval(() => {res.write(`event: heartbeat\ndata: ${new Date().toISOString()}\n\n`);}, 30000);// 客户端断开连接req.on('close', () => {console.log(`客户端断开: ${clientId}`);clearInterval(heartbeat);clients.delete(res);});
});// 广播消息
app.post('/broadcast', (req, res) => {const { eventType, message } = req.body;clients.forEach(client => {client.write(`event: ${eventType}\ndata: ${message}\n\n`);});console.log(`广播消息发送至 ${clients.size} 个客户端`);res.json({ success: true, count: clients.size });
});// 获取连接数
app.get('/count', (req, res) => {res.json({ count: clients.size });
});app.listen(3000, () => {console.log('SSE 服务器运行在端口 3000');
});
3. Java + Spring Boot
SSE 控制器
// SseController.java
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;@RestController
@RequestMapping("/api/sse")
@CrossOrigin(origins = "*")
public class SseController {// 存储所有 SSE 连接private final CopyOnWriteArrayList<SseEmitter> emitters = new CopyOnWriteArrayList<>();private final ExecutorService executor = Executors.newCachedThreadPool();/*** 建立 SSE 连接*/@GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter connect() {SseEmitter emitter = new SseEmitter(60 * 60 * 1000L); // 1小时超时emitter.onCompletion(() -> {emitters.remove(emitter);System.out.println("SSE 连接完成");});emitter.onTimeout(() -> {emitters.remove(emitter);System.out.println("SSE 连接超时");});emitter.onError((e) -> {emitters.remove(emitter);System.err.println("SSE 连接错误: " + e.getMessage());});emitters.add(emitter);// 发送连接确认executor.execute(() -> {try {emitter.send(SseEmitter.event().name("connected").data("{\"message\":\"SSE connection established\"}"));} catch (IOException e) {emitter.completeWithError(e);}});// 发送心跳executor.execute(() -> {while (emitters.contains(emitter)) {try {Thread.sleep(30000);emitter.send(SseEmitter.event().name("heartbeat").data(new Date().toString()));} catch (Exception e) {break;}}});return emitter;}/*** 广播消息*/@PostMapping("/broadcast")public ResponseEntity<?> broadcast(@RequestBody BroadcastRequest request) {int count = 0;for (SseEmitter emitter : emitters) {executor.execute(() -> {try {emitter.send(SseEmitter.event().name(request.getEventType()).data(request.getMessage()));} catch (IOException e) {emitter.completeWithError(e);}});count++;}return ResponseEntity.ok(new Response(true, "消息已发送至 " + count + " 个客户端"));}/*** 获取连接数*/@GetMapping("/count")public ResponseEntity<?> count() {return ResponseEntity.ok(new CountResponse(emitters.size()));}
}// 请求/响应类
class BroadcastRequest {private String eventType;private String message;// getters and setters
}class Response {private boolean success;private String message;// constructor, getters
}class CountResponse {private int count;// constructor, getters
}
4. Python + Flask
# app.py
from flask import Flask, Response, jsonify, request
import json
import time
import threading
from datetime import datetimeapp = Flask(__name__)# 存储所有客户端连接
clients = []@app.route('/sse')
def sse():def event_stream():# 生成唯一客户端IDclient_id = id(event_stream)clients.append(event_stream)try:# 发送连接确认yield f"event: connected\ndata: {json.dumps({'clientId': client_id})}\n\n"# 保持连接活跃,发送心跳while True:time.sleep(30) # 每30秒发送心跳yield f"event: heartbeat\ndata: {datetime.now().isoformat()}\n\n"except GeneratorExit:clients.remove(event_stream)print(f"客户端断开: {client_id}")return Response(event_stream(), mimetype='text/event-stream',headers={'Cache-Control': 'no-cache','Connection': 'keep-alive','X-Accel-Buffering': 'no'})@app.route('/broadcast', methods=['POST'])
def broadcast():data = request.jsonevent_type = data.get('eventType', 'message')message = data.get('message', '')# 发送给所有客户端message_str = f"event: {event_type}\ndata: {message}\n\n"for client in clients[:]: # 使用切片避免迭代时修改try:client.send(message_str)except:clients.remove(client)return jsonify({'success': True, 'count': len(clients)})@app.route('/count')
def count():return jsonify({'count': len(clients)})if __name__ == '__main__':app.run(host='0.0.0.0', port=5000, threaded=True)
5. Go + Gin
// main.go
package mainimport ("encoding/json""fmt""log""net/http""sync""time""github.com/gin-gonic/gin"
)// 客户端连接
type Client struct {Channel chan stringID string
}// SSE 服务
type SSEService struct {clients map[*Client]boolmu sync.RWMutex
}var sseService = &SSEService{clients: make(map[*Client]bool),
}func (s *SSEService) AddClient(client *Client) {s.mu.Lock()defer s.mu.Unlock()s.clients[client] = truelog.Printf("客户端连接: %s, 当前连接数: %d", client.ID, len(s.clients))
}func (s *SSEService) RemoveClient(client *Client) {s.mu.Lock()defer s.mu.Unlock()if _, ok := s.clients[client]; ok {delete(s.clients, client)close(client.Channel)log.Printf("客户端断开: %s, 当前连接数: %d", client.ID, len(s.clients))}
}func (s *SSEService) Broadcast(eventType, data string) {s.mu.RLock()defer s.mu.RUnlock()message := fmt.Sprintf("event: %s\ndata: %s\n\n", eventType, data)for client := range s.clients {select {case client.Channel <- message:default:// 客户端接收缓冲区满,移除go s.RemoveClient(client)}}
}func (s *SSEService) GetClientCount() int {s.mu.RLock()defer s.mu.RUnlock()return len(s.clients)
}// SSE 处理器
func sseHandler(c *gin.Context) {client := &Client{ID: generateID(),Channel: make(chan string, 100),}sseService.AddClient(client)defer sseService.RemoveClient(client)// 设置响应头c.Header("Content-Type", "text/event-stream")c.Header("Cache-Control", "no-cache")c.Header("Connection", "keep-alive")c.Header("X-Accel-Buffering", "no")// 发送连接确认c.SSEvent("connected", "").JSON(gin.H{"clientId": client.ID,"message": "Connected",})c.Writer.Flush()// 发送心跳ticker := time.NewTicker(30 * time.Second)defer ticker.Stop()for {select {case message := <-client.Channel:// 发送消息fmt.Fprint(c.Writer, message)c.Writer.Flush()case <-ticker.C:// 发送心跳c.SSEvent("heartbeat", "").Send(time.Now().Format(time.RFC3339))c.Writer.Flush()case <-c.Request.Context().Done():return}}
}// 广播处理器
func broadcastHandler(c *gin.Context) {var req struct {EventType string `json:"eventType"`Message string `json:"message"`}if err := c.ShouldBindJSON(&req); err != nil {c.JSON(400, gin.H{"error": err.Error()})return}sseService.Broadcast(req.EventType, req.Message)c.JSON(200, gin.H{"success": true,"count": sseService.GetClientCount(),})
}// 获取连接数
func countHandler(c *gin.Context) {c.JSON(200, gin.H{"count": sseService.GetClientCount()})
}func generateID() string {return fmt.Sprintf("%d", time.Now().UnixNano())
}func main() {r := gin.Default()// 允许跨域r.Use(func(c *gin.Context) {c.Writer.Header().Set("Access-Control-Allow-Origin", "*")c.Writer.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS")if c.Request.Method == "OPTIONS" {c.AbortWithStatus(204)return}c.Next()})r.GET("/api/sse/connect", sseHandler)r.POST("/api/sse/broadcast", broadcastHandler)r.GET("/api/sse/count", countHandler)log.Println("SSE 服务器运行在端口 8080")r.Run(":8080")
}
客户端使用
1. 原生 JavaScript (EventSource API)
基本连接
// 建立 SSE 连接
const eventSource = new EventSource('http://your-domain.com/api/sse/connect');// 监听连接打开
eventSource.onopen = function(event) {console.log('SSE 连接已打开');
};// 监听所有消息
eventSource.onmessage = function(event) {console.log('收到消息:', event.data);
};// 监听特定事件类型
eventSource.addEventListener('connected', function(event) {const data = JSON.parse(event.data);console.log('连接确认:', data);
});eventSource.addEventListener('heartbeat', function(event) {console.log('心跳:', event.data);
});eventSource.addEventListener('notification', function(event) {const data = JSON.parse(event.data);showNotification(data);
});// 错误处理
eventSource.onerror = function(error) {console.error('SSE 连接错误:', error);// EventSource 会自动尝试重连
};// 关闭连接
function closeConnection() {eventSource.close();console.log('SSE 连接已关闭');
}
React 示例
// SseComponent.jsx
import React, { useState, useEffect } from 'react';const SseComponent = () => {const [messages, setMessages] = useState([]);const [isConnected, setIsConnected] = useState(false);const [connectionCount, setConnectionCount] = useState(0);useEffect(() => {let eventSource;function connect() {eventSource = new EventSource('http://your-domain.com/api/sse/connect');eventSource.onopen = () => {setIsConnected(true);console.log('SSE 连接成功');};eventSource.onerror = () => {setIsConnected(false);console.log('SSE 连接断开,尝试重连...');};// 监听注册事件eventSource.addEventListener('register', (event) => {const data = JSON.parse(event.data);setMessages(prev => [data, ...prev]);showNotification(`新用户 ${data.username} 已注册`);});// 监听心跳eventSource.addEventListener('heartbeat', (event) => {console.log('心跳:', event.data);});}connect();// 清理return () => {if (eventSource) {eventSource.close();}};}, []);const showNotification = (message) => {// 显示浏览器通知if ('Notification' in window && Notification.permission === 'granted') {new Notification('系统通知', { body: message });}};return (<div><h3>SSE 实时消息</h3><div className={`status ${isConnected ? 'connected' : 'disconnected'}`}>{isConnected ? '✅ 已连接' : '❌ 未连接'}</div><div className="messages">{messages.map((msg, index) => (<div key={index} className="message"><strong>{msg.message}</strong><time>{new Date(msg.timestamp).toLocaleString()}</time></div>))}</div></div>);
};export default SseComponent;
Vue 3 示例
<!-- SseComponent.vue -->
<template><div><h3>SSE 实时消息</h3><div :class="['status', isConnected ? 'connected' : 'disconnected']">{{ isConnected ? '✅ 已连接' : '❌ 未连接' }}</div><div class="messages"><div v-for="(msg, index) in messages" :key="index" class="message"><strong>{{ msg.message }}</strong><time>{{ formatTime(msg.timestamp) }}</time></div></div></div>
</template><script setup>
import { ref, onMounted, onUnmounted } from 'vue';const messages = ref([]);
const isConnected = ref(false);
let eventSource = null;onMounted(() => {connect();// 请求通知权限if ('Notification' in window && Notification.permission === 'default') {Notification.requestPermission();}
});onUnmounted(() => {if (eventSource) {eventSource.close();}
});function connect() {eventSource = new EventSource('http://your-domain.com/api/sse/connect');eventSource.onopen = () => {isConnected.value = true;console.log('SSE 连接成功');};eventSource.onerror = () => {isConnected.value = false;console.log('SSE 连接断开');};eventSource.addEventListener('register', (event) => {const data = JSON.parse(event.data);messages.value.unshift(data);showNotification(data.message);});eventSource.addEventListener('heartbeat', (event) => {console.log('心跳:', event.data);});
}function showNotification(message) {if ('Notification' in window && Notification.permission === 'granted') {new Notification('系统通知', { body: message });}
}function formatTime(timestamp) {return new Date(timestamp).toLocaleString();
}
</script><style scoped>
.status {padding: 10px;margin: 10px 0;border-radius: 4px;
}
.status.connected {background-color: #d4edda;color: #155724;
}
.status.disconnected {background-color: #f8d7da;color: #721c24;
}
.message {padding: 10px;border-left: 3px solid #007bff;margin: 10px 0;background-color: #f8f9fa;
}
</style>
Angular 示例
// sse.service.ts
import { Injectable, NgZone } from '@angular/core';
import { EventSourcePolyfill } from 'event-source-polyfill';@Injectable({providedIn: 'root'
})
export class SseService {private eventSource: EventSource | null = null;constructor(private ngZone: NgZone) {}connect(url: string, eventHandlers: { [eventType: string]: (data: any) => void }) {// 使用 polyfill 支持更多浏览器this.eventSource = new EventSourcePolyfill(url, {withCredentials: true});this.eventSource.onopen = () => {console.log('SSE 连接成功');};this.eventSource.onerror = (error) => {console.error('SSE 连接错误:', error);};// 注册事件处理器Object.keys(eventHandlers).forEach(eventType => {this.eventSource!.addEventListener(eventType, (event: any) => {this.ngZone.run(() => {eventHandlers[eventType](JSON.parse(event.data));});});});}disconnect() {if (this.eventSource) {this.eventSource.close();this.eventSource = null;}}
}
// app.component.ts
import { Component, OnInit, OnDestroy } from '@angular/core';
import { SseService } from './sse.service';@Component({selector: 'app-root',template: `<div><h3>SSE 实时消息</h3><div [class]="'status ' + (isConnected ? 'connected' : 'disconnected')">{{ isConnected ? '✅ 已连接' : '❌ 未连接' }}</div><div class="messages"><div *ngFor="let msg of messages" class="message"><strong>{{ msg.message }}</strong><time>{{ formatTime(msg.timestamp) }}</time></div></div></div>`,styles: [`.status.connected { background: #d4edda; color: #155724; padding: 10px; }.status.disconnected { background: #f8d7da; color: #721c24; padding: 10px; }.message { padding: 10px; border-left: 3px solid #007bff; margin: 10px 0; }`]
})
export class AppComponent implements OnInit, OnDestroy {messages: any[] = [];isConnected = false;constructor(private sseService: SseService) {}ngOnInit() {this.sseService.connect('http://your-domain.com/api/sse/connect', {connected: (data) => {this.isConnected = true;console.log('连接确认:', data);},register: (data) => {this.messages.unshift(data);this.showNotification(data.message);},heartbeat: (data) => {console.log('心跳:', data);},error: () => {this.isConnected = false;}});}ngOnDestroy() {this.sseService.disconnect();}private showNotification(message: string) {if ('Notification' in window && Notification.permission === 'granted') {new Notification('系统通知', { body: message });}}private formatTime(timestamp: string): string {return new Date(timestamp).toLocaleString();}
}
2. 浏览器兼容性
Polyfill (支持旧浏览器)
<!-- 安装 event-source-polyfill -->
npm install event-source-polyfill<!-- 或使用 CDN -->
<script src="https://cdn.jsdelivr.net/npm/event-source-polyfill@1.0.25/src/eventsource.min.js"></script><script>
// 使用 polyfill
const EventSource = window.EventSourcePolyfill || window.EventSource;const eventSource = new EventSource('/api/sse/connect');
// 正常使用...
</script>
3. 移动端支持
// 移动端 SSE 连接
function connectMobile() {const eventSource = new EventSource('http://your-domain.com/api/sse/connect', {// 移动端优化withCredentials: true});eventSource.onopen = () => {console.log('移动端 SSE 连接成功');};// 监听消息eventSource.addEventListener('notification', (event) => {const data = JSON.parse(event.data);// 移动端推送通知if ('vibrate' in navigator) {navigator.vibrate(200); // 震动}// 显示本地通知showLocalNotification(data.title, data.body);});return eventSource;
}// 显示本地通知(iOS/Android)
function showLocalNotification(title, body) {// 检查支持if (!('Notification' in window)) {console.log('不支持通知');return;}if (Notification.permission === 'granted') {new Notification(title, { body, icon: '/icon.png' });} else if (Notification.permission !== 'denied') {Notification.requestPermission().then(permission => {if (permission === 'granted') {new Notification(title, { body, icon: '/icon.png' });}});}
}
最佳实践
1. 连接管理
自动重连
const MAX_RETRIES = 5;
let retryCount = 0;function connect() {const eventSource = new EventSource('/api/sse/connect');eventSource.onerror = () => {retryCount++;if (retryCount <= MAX_RETRIES) {console.log(`重连中... (${retryCount}/${MAX_RETRIES})`);setTimeout(() => {connect();}, 1000 * retryCount); // 指数退避} else {console.error('重连次数超过限制');alert('连接失败,请刷新页面');}};eventSource.onopen = () => {retryCount = 0; // 重置重连计数console.log('连接成功');};
}connect();
心跳机制
服务端:
// 每 30 秒发送心跳
while (!HttpContext.RequestAborted.IsCancellationRequested)
{await Task.Delay(30000);await _sseService.SendToClientAsync(connectionId, "heartbeat", DateTime.Now.ToString());
}
客户端:
let lastHeartbeat = Date.now();eventSource.addEventListener('heartbeat', () => {lastHeartbeat = Date.now();console.log('心跳正常');
});// 检查连接状态
setInterval(() => {const now = Date.now();if (now - lastHeartbeat > 60000) { // 超过60秒未收到心跳console.warn('连接可能已断开');eventSource.close();connect(); // 重新连接}
}, 10000);
2. 消息格式设计
推荐格式
// 统一的消息格式
{"type": "notification", // 消息类型"id": "msg-123", // 消息ID(用于去重)"timestamp": 1705699200000, // 时间戳"data": { // 业务数据"title": "新订单","content": "订单 #12345 已创建","url": "/orders/12345"}
}
服务端发送
var message = Newtonsoft.Json.JsonConvert.SerializeObject(new
{type = "notification",id = Guid.NewGuid().ToString(),timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),data = new{title = "新订单",content = $"订单 #{orderId} 已创建",url = $"/orders/{orderId}"}
});await _sseService.BroadcastAsync("notification", message);
客户端处理
const processedIds = new Set();eventSource.addEventListener('notification', (event) => {const msg = JSON.parse(event.data);// 去重处理if (processedIds.has(msg.id)) {return;}processedIds.add(msg.id);// 清理旧的ID(避免内存泄漏)if (processedIds.size > 1000) {const firstId = processedIds.values().next().value;processedIds.delete(firstId);}// 处理消息handleNotification(msg);
});
3. 性能优化
连接池管理
// 服务端 - 限制连接数
const MAX_CONNECTIONS = 10000;function addConnection(connection) {if (connections.size >= MAX_CONNECTIONS) {throw new Error('连接数已达上限');}connections.add(connection);
}// 客户端 - 连接复用
class SseManager {constructor(url) {this.url = url;this.eventSource = null;this.listeners = new Map();}connect() {if (this.eventSource) {return; // 已连接}this.eventSource = new EventSource(this.url);// 事件分发this.eventSource.onmessage = (event) => {const data = JSON.parse(event.data);const listeners = this.listeners.get(data.type) || [];listeners.forEach(callback => callback(data));};}on(eventType, callback) {if (!this.listeners.has(eventType)) {this.listeners.set(eventType, []);}this.listeners.get(eventType).push(callback);}off(eventType, callback) {const listeners = this.listeners.get(eventType) || [];const index = listeners.indexOf(callback);if (index > -1) {listeners.splice(index, 1);}}close() {if (this.eventSource) {this.eventSource.close();this.eventSource = null;}}
}// 使用
const sseManager = new SseManager('/api/sse/connect');
sseManager.connect();
sseManager.on('notification', (data) => {console.log('通知:', data);
});
批量发送
// 批量发送优化
public async Task BroadcastBatchAsync(string eventType, List<string> messages)
{var batchData = Newtonsoft.Json.JsonConvert.SerializeObject(messages);foreach (var connection in _connections){await SendMessageAsync(connection.Value, eventType, batchData);}
}
// 客户端批量处理
eventSource.addEventListener('notification-batch', (event) => {const messages = JSON.parse(event.data); // 消息数组messages.forEach(msg => {processNotification(msg);});
});
4. 错误处理
服务端错误处理
try
{await _sseService.SendToClientAsync(connectionId, eventType, data);
}
catch (IOException ex)
{// 客户端断开连接await _sseService.RemoveConnectionAsync(connectionId);
}
catch (Exception ex)
{_logger.LogError(ex, "SSE 发送消息失败");
}
客户端错误处理
eventSource.onerror = (error) => {// 判断错误类型if (error.target.readyState === EventSource.CLOSED) {console.error('连接已关闭');} else if (error.target.readyState === EventSource.CONNECTING) {console.warn('正在重连...');} else {console.error('未知错误:', error);}
};
常见问题
Q1: SSE 连接频繁断开
原因:
- 代理服务器超时设置过短
- 防火墙或负载均衡器配置问题
- 客户端网络不稳定
解决方案:
- 调整代理超时(Nginx):
location /api/sse/ {proxy_pass http://backend;proxy_set_header Connection '';proxy_http_version 1.1;proxy_read_timeout 86400s; # 24小时proxy_send_timeout 86400s;chunked_transfer_encoding on;
}
- 调整心跳间隔:
// 根据代理超时设置调整心跳间隔
await Task.Delay(25000); // 小于代理超时时间
Q2: 消息延迟或丢失
原因:
- 网络拥塞
- 客户端处理速度慢
- 缓冲区溢出
解决方案:
- 调整缓冲区大小:
// 客户端设置最大缓冲区
var eventSource = new EventSource(url);
eventSource.MAX_BUFFER_SIZE = 1024 * 1024; // 1MB
- 实现消息确认:
// 服务端 - 消息队列
public class MessageQueue
{private readonly ConcurrentQueue<Message> _queue = new();public async Task EnqueueAsync(Message message){_queue.Enqueue(message);await ProcessQueueAsync();}private async Task ProcessQueueAsync(){while (_queue.TryDequeue(out var message)){try{await _sseService.BroadcastAsync(message.Type, message.Data);await Task.Delay(100); // 批次间隔}catch{_queue.Enqueue(message); // 重新入队break;}}}
}
Q3: 跨域问题
错误: Access-Control-Allow-Origin 错误
解决方案:
服务端(ASP.NET Core):
builder.Services.AddCors(options =>
{options.AddPolicy("AllowSSE", builder =>{builder.WithOrigins("http://yourdomain.com").AllowAnyMethod().AllowAnyHeader().AllowCredentials();});
});app.UseCors("AllowSSE");
客户端:
// EventSource 不支持自定义 headers
// 确保服务端允许跨域
const eventSource = new EventSource('http://api.example.com/sse');
Q4: 浏览器兼容性问题
问题: IE 或旧版浏览器不支持 EventSource
解决方案:
// 使用 polyfill
import { EventSourcePolyfill } from 'event-source-polyfill';const EventSource = window.EventSourcePolyfill || window.EventSource;const eventSource = new EventSource('/api/sse/connect', {withCredentials: true
});
或使用轮询降级:
class SseOrPolling {constructor(url) {this.url = url;this.usePolling = typeof EventSource === 'undefined';}connect() {if (this.usePolling) {this.startPolling();} else {this.startSSE();}}startSSE() {this.eventSource = new EventSource(this.url);this.eventSource.onmessage = this.onMessage;}startPolling() {this.interval = setInterval(() => {fetch(this.url).then(res => res.json()).then(data => this.onMessage({ data: JSON.stringify(data) }));}, 3000); // 每3秒轮询}
}
Q5: 内存泄漏
原因:
- 未正确关闭连接
- 事件监听器未清理
- 消息队列无限增长
解决方案:
class SseConnection {constructor(url) {this.url = url;this.eventSource = null;this.handlers = [];this.messageBuffer = [];}connect() {this.eventSource = new EventSource(this.url);this.eventSource.onmessage = (event) => {// 限制缓冲区大小if (this.messageBuffer.length > 100) {this.messageBuffer.shift();}this.messageBuffer.push(event.data);// 触发处理器this.handlers.forEach(handler => handler(event.data));};}on(handler) {this.handlers.push(handler);}destroy() {// 清理资源if (this.eventSource) {this.eventSource.close();this.eventSource = null;}this.handlers = [];this.messageBuffer = [];}
}
性能优化
1. 服务端优化
连接池限制
public class SseConnectionLimiter
{private readonly SemaphoreSlim _semaphore;private readonly int _maxConnections;public SseConnectionLimiter(int maxConnections){_maxConnections = maxConnections;_semaphore = new SemaphoreSlim(maxConnections, maxConnections);}public async Task<bool> TryAcquireAsync(){return await _semaphore.WaitAsync(0);}public void Release(){_semaphore.Release();}public int AvailableConnections => _semaphore.CurrentCount;
}
消息压缩
using System.IO.Compression;public async Task SendCompressedAsync(HttpResponse response, string data)
{response.Headers.Add("Content-Encoding", "gzip");using var gzip = new GZipStream(response.Body, CompressionMode.Compress);using var writer = new StreamWriter(gzip);await writer.WriteAsync(data);
}
异步处理
// 使用 Channel 进行异步消息传递
public class SseBroadcaster
{private readonly Channel<string> _channel;public SseBroadcaster(){_channel = Channel.CreateUnbounded<string>();Task.Run(ProcessMessagesAsync);}public async Task BroadcastAsync(string message){await _channel.Writer.WriteAsync(message);}private async Task ProcessMessagesAsync(){await foreach (var message in _channel.Reader.ReadAllAsync()){// 异步广播_ = Task.Run(() => SendToAllClientsAsync(message));}}
}
2. 客户端优化
节流处理
class ThrottledSseHandler {constructor(throttleMs = 100) {this.throttleMs = throttleMs;this.lastMessageTime = 0;this.pendingMessage = null;this.timeout = null;}handle(message) {const now = Date.now();this.pendingMessage = message;if (now - this.lastMessageTime >= this.throttleMs) {this.processMessage(message);this.lastMessageTime = now;} else {clearTimeout(this.timeout);this.timeout = setTimeout(() => {this.processMessage(this.pendingMessage);this.lastMessageTime = Date.now();}, this.throttleMs);}}processMessage(message) {// 实际处理逻辑console.log('处理消息:', message);}
}// 使用
const handler = new ThrottledSseHandler(200);
eventSource.addEventListener('data', (e) => handler.handle(e.data));
批量更新 UI
class BatchedUiUpdater {constructor(batchSize = 10, delayMs = 100) {this.batch = [];this.batchSize = batchSize;this.delayMs = delayMs;this.timeout = null;}add(item) {this.batch.push(item);if (this.batch.length >= this.batchSize) {this.flush();} else {clearTimeout(this.timeout);this.timeout = setTimeout(() => this.flush(), this.delayMs);}}flush() {if (this.batch.length === 0) return;// 批量更新 DOMconst fragment = document.createDocumentFragment();this.batch.forEach(item => {const element = this.createElement(item);fragment.appendChild(element);});document.getElementById('messages').appendChild(fragment);this.batch = [];}
}
安全考虑
1. 认证与授权
Token 认证
// 方式1: URL 参数
const token = localStorage.getItem('token');
const eventSource = new EventSource(`/api/sse/connect?token=${token}`);// 方式2: Cookie(推荐)
// 服务端在登录时设置 HttpOnly Cookie
// EventSource 自动携带 Cookie
const eventSource = new EventSource('/api/sse/connect');
服务端验证:
[HttpGet]
[Route("connect")]
public async Task<IActionResult> Connect()
{// 从 URL 参数获取 Tokenvar token = HttpContext.Request.Query["token"];// 从 Cookie 获取 Tokenvar cookieToken = HttpContext.Request.Cookies["auth_token"];// 验证 Tokenif (!_authService.ValidateToken(token ?? cookieToken)){return Unauthorized();}// 建立连接// ...
}
2. 限流
public class SseRateLimiter
{private readonly Dictionary<string, (int Count, DateTime ResetTime)> _clients;private readonly int _maxRequestsPerMinute;public bool IsAllowed(string clientId){var now = DateTime.UtcNow;if (!_clients.ContainsKey(clientId)){_clients[clientId] = (1, now.AddMinutes(1));return true;}var (count, resetTime) = _clients[clientId];if (now >= resetTime){_clients[clientId] = (1, now.AddMinutes(1));return true;}if (count >= _maxRequestsPerMinute){return false;}_clients[clientId] = (count + 1, resetTime);return true;}
}
3. 数据加密
public string EncryptMessage(string data, string key)
{using var aes = Aes.Create();aes.Key = Encoding.UTF8.GetBytes(key);aes.IV = new byte[16];using var encryptor = aes.CreateEncryptor();using var ms = new MemoryStream();using (var cs = new CryptoStream(ms, encryptor, CryptoStreamMode.Write)){using var sw = new StreamWriter(cs);sw.Write(data);}return Convert.ToBase64String(ms.ToArray());
}
生产环境部署
1. Nginx 配置
upstream backend {server 127.0.0.1:5000;
}server {listen 80;server_name your-domain.com;# SSE 路径配置location /api/sse/ {proxy_pass http://backend;# SSE 必需配置proxy_set_header Connection '';proxy_http_version 1.1;chunked_transfer_encoding on;# 超时设置proxy_read_timeout 86400s; # 24小时proxy_send_timeout 86400s;# 禁用缓冲proxy_buffering off;proxy_cache off;# 连接保持proxy_connect_timeout 60s;# 转发事件流头proxy_pass_header Content-Type;proxy_pass_header Cache-Control;}# 其他 API 请求location /api/ {proxy_pass http://backend;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;proxy_set_header X-Forwarded-Proto $scheme;}
}
2. Docker 部署
# Dockerfile
FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base
WORKDIR /app
EXPOSE 80FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
WORKDIR /src
COPY ["SmartConv/SmartConv.csproj", "SmartConv/"]
RUN dotnet restore "SmartConv/SmartConv.csproj"
COPY . .
WORKDIR "/src/SmartConv"
RUN dotnet build "SmartConv.csproj" -c Release -o /app/buildFROM build AS publish
RUN dotnet publish "SmartConv.csproj" -c Release -o /app/publishFROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "SmartConv.dll"]
# docker-compose.yml
version: '3.8'services:app:build: .ports:- "8080:80"environment:- ASPNETCORE_ENVIRONMENT=Production- ASPNETCORE_URLS=http://+:80restart: unless-stoppedhealthcheck:test: ["CMD", "curl", "-f", "http://localhost/health"]interval: 30stimeout: 10sretries: 3
3. Kubernetes 部署
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:name: sse-app
spec:replicas: 3selector:matchLabels:app: sse-apptemplate:metadata:labels:app: sse-appspec:containers:- name: sse-appimage: your-registry/sse-app:latestports:- containerPort: 80env:- name: ASPNETCORE_ENVIRONMENTvalue: "Production"resources:requests:memory: "256Mi"cpu: "250m"limits:memory: "512Mi"cpu: "500m"livenessProbe:httpGet:path: /healthport: 80initialDelaySeconds: 30periodSeconds: 10readinessProbe:httpGet:path: /readyport: 80initialDelaySeconds: 5periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:name: sse-app-service
spec:selector:app: sse-appports:- protocol: TCPport: 80targetPort: 80type: LoadBalancer
4. 监控与日志
// 添加监控指标
public class SseMetrics
{public int TotalConnections { get; set; }public int ActiveConnections { get; set; }public long MessagesSent { get; set; }public long BytesTransferred { get; set; }
}// Prometheus 指标
public class SseMetricsExporter
{private readonly Counter _connectionsTotal;private readonly Gauge _connectionsActive;private readonly Counter _messagesTotal;public SseMetricsExporter(){_connectionsTotal = Metrics.CreateCounter("sse_connections_total", "Total SSE connections");_connectionsActive = Metrics.CreateGauge("sse_connections_active", "Active SSE connections");_messagesTotal = Metrics.CreateCounter("sse_messages_total", "Total SSE messages sent");}public void RecordConnection(){_connectionsTotal.Inc();_connectionsActive.Inc();}public void RecordDisconnection(){_connectionsActive.Dec();}public void RecordMessage(){_messagesTotal.Inc();}
}
5. 负载均衡注意事项
重要: SSE 是有状态连接,需要会话保持(Sticky Session)。
upstream backend {ip_hash; # 使用 IP 哈希实现会话保持server 127.0.0.1:5000;server 127.0.0.1:5001;server 127.0.0.1:5002;
}
或者使用 Redis 存储连接状态,实现跨节点广播。
总结
SSE 是实现服务器推送的简单高效方案,特别适合:
✅ 实时通知
✅ 数据更新推送
✅ 系统监控
✅ 日志流式输出
相比 WebSocket,SSE 更简单、更容易部署、且天然支持自动重连。
核心要点
- 配置正确的响应头:
Content-Type: text/event-stream - 实现心跳机制: 保持连接活跃
- 处理连接异常: 自动重连
- 优化性能: 连接池、批量处理
- 安全保障: 认证、限流、加密
- 生产部署: 代理配置、会话保持
快速开始
// 客户端
const eventSource = new EventSource('/api/sse/connect');
eventSource.addEventListener('notification', e => {console.log(JSON.parse(e.data));
});
// 服务端
Response.Headers.Add("Content-Type", "text/event-stream");
Response.Headers.Add("Cache-Control", "no-cache");await Response.WriteAsync($"event: notification\ndata: {message}\n\n");
await Response.Body.FlushAsync();
祝开发顺利!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!