通化市网站建设_网站建设公司_网站制作_seo优化
2026/1/21 10:33:03 网站建设 项目流程

SSE (Server-Sent Events) 通用配置指南

目录

  1. SSE 简介
  2. SSE vs WebSocket vs 轮询
  3. 服务端配置
  4. 客户端使用
  5. 最佳实践
  6. 常见问题
  7. 性能优化
  8. 安全考虑
  9. 生产环境部署

SSE 简介

什么是 SSE?

Server-Sent Events (SSE) 是一种基于 HTTP 的服务器推送技术,允许服务器向客户端单向发送实时数据。

SSE 特点

特性 说明
单向通信 服务器 → 客户端(推送消息)
基于 HTTP 使用标准 HTTP/HTTPS 协议
自动重连 浏览器自动处理断线重连
文本格式 仅支持文本数据(UTF-8 编码)
EventSource API 浏览器原生支持,无需额外库

适用场景

适合

  • 实时通知(新订单、新用户注册等)
  • 股票价格、汇率实时更新
  • 社交媒体动态推送
  • 系统状态监控
  • 日志实时输出
  • 进度条更新

不适合

  • 双向实时通信(聊天、游戏等)→ 使用 WebSocket
  • 二进制数据传输 → 使用 WebSocket
  • 需要高频率双向通信 → 使用 WebSocket

SSE vs WebSocket vs 轮询

特性 SSE WebSocket HTTP 轮询
通信方向 单向(服务器→客户端) 双向 客户端→服务器
连接方式 长连接 长连接 短连接
协议 HTTP/HTTPS WS/WSS HTTP/HTTPS
自动重连 ✅ 原生支持 ❌ 需手动实现 -
数据格式 文本 文本/二进制 JSON/XML
浏览器支持 现代浏览器全支持 IE10+ 全支持
服务器资源
实现复杂度 简单 中等 简单
防火墙穿透 ✅ 容易 ⚠️ 可能受阻 ✅ 容易
跨域 ✅ 支持 CORS ✅ 支持 CORS ✅ 支持 CORS

选择建议

场景 → 技术选择需要实时推送,只需服务器→客户端↓使用 SSE ✅需要双向实时通信↓使用 WebSocket ✅不需要实时,偶尔更新即可↓使用 HTTP 轮询 ✅需要兼容旧浏览器(IE9 及以下)↓使用 HTTP 轮询 ✅

服务端配置

1. ASP.NET Core (.NET 6+)

SSE 服务接口

// ISseService.cs
using Microsoft.AspNetCore.Http;public interface ISseService
{string AddConnection(HttpResponse response);Task BroadcastAsync(string eventType, string data);Task SendToClientAsync(string connectionId, string eventType, string data);int GetConnectedClientCount();Task RemoveConnectionAsync(string connectionId);
}

SSE 服务实现

// SseService.cs
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using System.Collections.Concurrent;
using System.Text;
using System.Threading.Tasks;public class SseService : ISseService
{private readonly ConcurrentDictionary<string, HttpResponse> _connections = new();private readonly ILogger<SseService> _logger;public SseService(ILogger<SseService> logger){_logger = logger;}public string AddConnection(HttpResponse response){var connectionId = Guid.NewGuid().ToString();_connections.TryAdd(connectionId, response);_logger.LogInformation($"SSE 连接添加: {connectionId}");return connectionId;}public async Task BroadcastAsync(string eventType, string data){foreach (var connection in _connections){await SendMessageAsync(connection.Value, eventType, data);}_logger.LogInformation($"SSE 广播发送至 {_connections.Count} 个客户端: {eventType}");}public async Task SendToClientAsync(string connectionId, string eventType, string data){if (_connections.TryGetValue(connectionId, out var response)){await SendMessageAsync(response, eventType, data);}}private async Task SendMessageAsync(HttpResponse response, string eventType, string data){try{await response.WriteAsync($"event: {eventType}\n");await response.WriteAsync($"data: {data}\n\n");await response.Body.FlushAsync();}catch (Exception ex){_logger.LogError($"SSE 消息发送错误: {ex.Message}");}}public int GetConnectedClientCount(){return _connections.Count;}public async Task RemoveConnectionAsync(string connectionId){if (_connections.TryRemove(connectionId, out var response)){try{await response.WriteAsync("event: close\ndata: Connection closed\n\n");await response.Body.FlushAsync();}catch { }}}
}

注册服务

// Program.cs
builder.Services.AddSingleton<ISseService, SseService>();

SSE 控制器

// SseController.cs
using Microsoft.AspNetCore.Cors;
using Microsoft.AspNetCore.Mvc;
using System.Threading.Tasks;[ApiController]
[Route("api/[controller]")]
[EnableCors("any")]
public class SseController : ControllerBase
{private readonly ISseService _sseService;public SseController(ISseService sseService){_sseService = sseService;}[HttpGet][Route("connect")]public async Task Connect(){// 设置 SSE 响应头Response.Headers.Add("Content-Type", "text/event-stream");Response.Headers.Add("Cache-Control", "no-cache");Response.Headers.Add("Connection", "keep-alive");Response.Headers.Add("X-Accel-Buffering", "no"); // Nginx 禁用缓冲var connectionId = _sseService.AddConnection(Response);try{// 发送连接确认消息await _sseService.SendToClientAsync(connectionId, "connected",Newtonsoft.Json.JsonConvert.SerializeObject(new{connectionId,message = "SSE connection established",timestamp = DateTime.Now}));// 保持连接活跃while (!HttpContext.RequestAborted.IsCancellationRequested){await Task.Delay(30000); // 每30秒发送心跳await _sseService.SendToClientAsync(connectionId, "heartbeat", DateTime.Now.ToString());}}catch{// 客户端断开}finally{await _sseService.RemoveConnectionAsync(connectionId);}}[HttpGet][Route("count")]public IActionResult GetConnectionCount(){return Ok(new { count = _sseService.GetConnectedClientCount() });}[HttpPost][Route("broadcast")]public async Task<IActionResult> Broadcast([FromBody] BroadcastRequest request){await _sseService.BroadcastAsync(request.EventType, request.Message);return Ok(new { success = true });}
}public class BroadcastRequest
{public string EventType { get; set; }public string Message { get; set; }
}

2. Node.js + Express

基本实现

// server.js
const express = require('express');
const cors = require('cors');
const app = express();app.use(cors());
app.use(express.json());// 存储所有客户端连接
const clients = new Set();// SSE 连接端点
app.get('/sse', (req, res) => {// 设置 SSE 响应头res.setHeader('Content-Type', 'text/event-stream');res.setHeader('Cache-Control', 'no-cache');res.setHeader('Connection', 'keep-alive');res.setHeader('X-Accel-Buffering', 'no'); // Nginx 禁用缓冲const clientId = Date.now();clients.add(res);console.log(`客户端连接: ${clientId}`);// 发送连接确认res.write(`event: connected\ndata: ${JSON.stringify({ clientId, message: 'Connected' })}\n\n`);// 发送心跳const heartbeat = setInterval(() => {res.write(`event: heartbeat\ndata: ${new Date().toISOString()}\n\n`);}, 30000);// 客户端断开连接req.on('close', () => {console.log(`客户端断开: ${clientId}`);clearInterval(heartbeat);clients.delete(res);});
});// 广播消息
app.post('/broadcast', (req, res) => {const { eventType, message } = req.body;clients.forEach(client => {client.write(`event: ${eventType}\ndata: ${message}\n\n`);});console.log(`广播消息发送至 ${clients.size} 个客户端`);res.json({ success: true, count: clients.size });
});// 获取连接数
app.get('/count', (req, res) => {res.json({ count: clients.size });
});app.listen(3000, () => {console.log('SSE 服务器运行在端口 3000');
});

3. Java + Spring Boot

SSE 控制器

// SseController.java
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;@RestController
@RequestMapping("/api/sse")
@CrossOrigin(origins = "*")
public class SseController {// 存储所有 SSE 连接private final CopyOnWriteArrayList<SseEmitter> emitters = new CopyOnWriteArrayList<>();private final ExecutorService executor = Executors.newCachedThreadPool();/*** 建立 SSE 连接*/@GetMapping(value = "/connect", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter connect() {SseEmitter emitter = new SseEmitter(60 * 60 * 1000L); // 1小时超时emitter.onCompletion(() -> {emitters.remove(emitter);System.out.println("SSE 连接完成");});emitter.onTimeout(() -> {emitters.remove(emitter);System.out.println("SSE 连接超时");});emitter.onError((e) -> {emitters.remove(emitter);System.err.println("SSE 连接错误: " + e.getMessage());});emitters.add(emitter);// 发送连接确认executor.execute(() -> {try {emitter.send(SseEmitter.event().name("connected").data("{\"message\":\"SSE connection established\"}"));} catch (IOException e) {emitter.completeWithError(e);}});// 发送心跳executor.execute(() -> {while (emitters.contains(emitter)) {try {Thread.sleep(30000);emitter.send(SseEmitter.event().name("heartbeat").data(new Date().toString()));} catch (Exception e) {break;}}});return emitter;}/*** 广播消息*/@PostMapping("/broadcast")public ResponseEntity<?> broadcast(@RequestBody BroadcastRequest request) {int count = 0;for (SseEmitter emitter : emitters) {executor.execute(() -> {try {emitter.send(SseEmitter.event().name(request.getEventType()).data(request.getMessage()));} catch (IOException e) {emitter.completeWithError(e);}});count++;}return ResponseEntity.ok(new Response(true, "消息已发送至 " + count + " 个客户端"));}/*** 获取连接数*/@GetMapping("/count")public ResponseEntity<?> count() {return ResponseEntity.ok(new CountResponse(emitters.size()));}
}// 请求/响应类
class BroadcastRequest {private String eventType;private String message;// getters and setters
}class Response {private boolean success;private String message;// constructor, getters
}class CountResponse {private int count;// constructor, getters
}

4. Python + Flask

# app.py
from flask import Flask, Response, jsonify, request
import json
import time
import threading
from datetime import datetimeapp = Flask(__name__)# 存储所有客户端连接
clients = []@app.route('/sse')
def sse():def event_stream():# 生成唯一客户端IDclient_id = id(event_stream)clients.append(event_stream)try:# 发送连接确认yield f"event: connected\ndata: {json.dumps({'clientId': client_id})}\n\n"# 保持连接活跃,发送心跳while True:time.sleep(30)  # 每30秒发送心跳yield f"event: heartbeat\ndata: {datetime.now().isoformat()}\n\n"except GeneratorExit:clients.remove(event_stream)print(f"客户端断开: {client_id}")return Response(event_stream(), mimetype='text/event-stream',headers={'Cache-Control': 'no-cache','Connection': 'keep-alive','X-Accel-Buffering': 'no'})@app.route('/broadcast', methods=['POST'])
def broadcast():data = request.jsonevent_type = data.get('eventType', 'message')message = data.get('message', '')# 发送给所有客户端message_str = f"event: {event_type}\ndata: {message}\n\n"for client in clients[:]:  # 使用切片避免迭代时修改try:client.send(message_str)except:clients.remove(client)return jsonify({'success': True, 'count': len(clients)})@app.route('/count')
def count():return jsonify({'count': len(clients)})if __name__ == '__main__':app.run(host='0.0.0.0', port=5000, threaded=True)

5. Go + Gin

// main.go
package mainimport ("encoding/json""fmt""log""net/http""sync""time""github.com/gin-gonic/gin"
)// 客户端连接
type Client struct {Channel chan stringID      string
}// SSE 服务
type SSEService struct {clients map[*Client]boolmu      sync.RWMutex
}var sseService = &SSEService{clients: make(map[*Client]bool),
}func (s *SSEService) AddClient(client *Client) {s.mu.Lock()defer s.mu.Unlock()s.clients[client] = truelog.Printf("客户端连接: %s, 当前连接数: %d", client.ID, len(s.clients))
}func (s *SSEService) RemoveClient(client *Client) {s.mu.Lock()defer s.mu.Unlock()if _, ok := s.clients[client]; ok {delete(s.clients, client)close(client.Channel)log.Printf("客户端断开: %s, 当前连接数: %d", client.ID, len(s.clients))}
}func (s *SSEService) Broadcast(eventType, data string) {s.mu.RLock()defer s.mu.RUnlock()message := fmt.Sprintf("event: %s\ndata: %s\n\n", eventType, data)for client := range s.clients {select {case client.Channel <- message:default:// 客户端接收缓冲区满,移除go s.RemoveClient(client)}}
}func (s *SSEService) GetClientCount() int {s.mu.RLock()defer s.mu.RUnlock()return len(s.clients)
}// SSE 处理器
func sseHandler(c *gin.Context) {client := &Client{ID:      generateID(),Channel: make(chan string, 100),}sseService.AddClient(client)defer sseService.RemoveClient(client)// 设置响应头c.Header("Content-Type", "text/event-stream")c.Header("Cache-Control", "no-cache")c.Header("Connection", "keep-alive")c.Header("X-Accel-Buffering", "no")// 发送连接确认c.SSEvent("connected", "").JSON(gin.H{"clientId": client.ID,"message":  "Connected",})c.Writer.Flush()// 发送心跳ticker := time.NewTicker(30 * time.Second)defer ticker.Stop()for {select {case message := <-client.Channel:// 发送消息fmt.Fprint(c.Writer, message)c.Writer.Flush()case <-ticker.C:// 发送心跳c.SSEvent("heartbeat", "").Send(time.Now().Format(time.RFC3339))c.Writer.Flush()case <-c.Request.Context().Done():return}}
}// 广播处理器
func broadcastHandler(c *gin.Context) {var req struct {EventType string `json:"eventType"`Message  string `json:"message"`}if err := c.ShouldBindJSON(&req); err != nil {c.JSON(400, gin.H{"error": err.Error()})return}sseService.Broadcast(req.EventType, req.Message)c.JSON(200, gin.H{"success": true,"count":   sseService.GetClientCount(),})
}// 获取连接数
func countHandler(c *gin.Context) {c.JSON(200, gin.H{"count": sseService.GetClientCount()})
}func generateID() string {return fmt.Sprintf("%d", time.Now().UnixNano())
}func main() {r := gin.Default()// 允许跨域r.Use(func(c *gin.Context) {c.Writer.Header().Set("Access-Control-Allow-Origin", "*")c.Writer.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS")if c.Request.Method == "OPTIONS" {c.AbortWithStatus(204)return}c.Next()})r.GET("/api/sse/connect", sseHandler)r.POST("/api/sse/broadcast", broadcastHandler)r.GET("/api/sse/count", countHandler)log.Println("SSE 服务器运行在端口 8080")r.Run(":8080")
}

客户端使用

1. 原生 JavaScript (EventSource API)

基本连接

// 建立 SSE 连接
const eventSource = new EventSource('http://your-domain.com/api/sse/connect');// 监听连接打开
eventSource.onopen = function(event) {console.log('SSE 连接已打开');
};// 监听所有消息
eventSource.onmessage = function(event) {console.log('收到消息:', event.data);
};// 监听特定事件类型
eventSource.addEventListener('connected', function(event) {const data = JSON.parse(event.data);console.log('连接确认:', data);
});eventSource.addEventListener('heartbeat', function(event) {console.log('心跳:', event.data);
});eventSource.addEventListener('notification', function(event) {const data = JSON.parse(event.data);showNotification(data);
});// 错误处理
eventSource.onerror = function(error) {console.error('SSE 连接错误:', error);// EventSource 会自动尝试重连
};// 关闭连接
function closeConnection() {eventSource.close();console.log('SSE 连接已关闭');
}

React 示例

// SseComponent.jsx
import React, { useState, useEffect } from 'react';const SseComponent = () => {const [messages, setMessages] = useState([]);const [isConnected, setIsConnected] = useState(false);const [connectionCount, setConnectionCount] = useState(0);useEffect(() => {let eventSource;function connect() {eventSource = new EventSource('http://your-domain.com/api/sse/connect');eventSource.onopen = () => {setIsConnected(true);console.log('SSE 连接成功');};eventSource.onerror = () => {setIsConnected(false);console.log('SSE 连接断开,尝试重连...');};// 监听注册事件eventSource.addEventListener('register', (event) => {const data = JSON.parse(event.data);setMessages(prev => [data, ...prev]);showNotification(`新用户 ${data.username} 已注册`);});// 监听心跳eventSource.addEventListener('heartbeat', (event) => {console.log('心跳:', event.data);});}connect();// 清理return () => {if (eventSource) {eventSource.close();}};}, []);const showNotification = (message) => {// 显示浏览器通知if ('Notification' in window && Notification.permission === 'granted') {new Notification('系统通知', { body: message });}};return (<div><h3>SSE 实时消息</h3><div className={`status ${isConnected ? 'connected' : 'disconnected'}`}>{isConnected ? '✅ 已连接' : '❌ 未连接'}</div><div className="messages">{messages.map((msg, index) => (<div key={index} className="message"><strong>{msg.message}</strong><time>{new Date(msg.timestamp).toLocaleString()}</time></div>))}</div></div>);
};export default SseComponent;

Vue 3 示例

<!-- SseComponent.vue -->
<template><div><h3>SSE 实时消息</h3><div :class="['status', isConnected ? 'connected' : 'disconnected']">{{ isConnected ? '✅ 已连接' : '❌ 未连接' }}</div><div class="messages"><div v-for="(msg, index) in messages" :key="index" class="message"><strong>{{ msg.message }}</strong><time>{{ formatTime(msg.timestamp) }}</time></div></div></div>
</template><script setup>
import { ref, onMounted, onUnmounted } from 'vue';const messages = ref([]);
const isConnected = ref(false);
let eventSource = null;onMounted(() => {connect();// 请求通知权限if ('Notification' in window && Notification.permission === 'default') {Notification.requestPermission();}
});onUnmounted(() => {if (eventSource) {eventSource.close();}
});function connect() {eventSource = new EventSource('http://your-domain.com/api/sse/connect');eventSource.onopen = () => {isConnected.value = true;console.log('SSE 连接成功');};eventSource.onerror = () => {isConnected.value = false;console.log('SSE 连接断开');};eventSource.addEventListener('register', (event) => {const data = JSON.parse(event.data);messages.value.unshift(data);showNotification(data.message);});eventSource.addEventListener('heartbeat', (event) => {console.log('心跳:', event.data);});
}function showNotification(message) {if ('Notification' in window && Notification.permission === 'granted') {new Notification('系统通知', { body: message });}
}function formatTime(timestamp) {return new Date(timestamp).toLocaleString();
}
</script><style scoped>
.status {padding: 10px;margin: 10px 0;border-radius: 4px;
}
.status.connected {background-color: #d4edda;color: #155724;
}
.status.disconnected {background-color: #f8d7da;color: #721c24;
}
.message {padding: 10px;border-left: 3px solid #007bff;margin: 10px 0;background-color: #f8f9fa;
}
</style>

Angular 示例

// sse.service.ts
import { Injectable, NgZone } from '@angular/core';
import { EventSourcePolyfill } from 'event-source-polyfill';@Injectable({providedIn: 'root'
})
export class SseService {private eventSource: EventSource | null = null;constructor(private ngZone: NgZone) {}connect(url: string, eventHandlers: { [eventType: string]: (data: any) => void }) {// 使用 polyfill 支持更多浏览器this.eventSource = new EventSourcePolyfill(url, {withCredentials: true});this.eventSource.onopen = () => {console.log('SSE 连接成功');};this.eventSource.onerror = (error) => {console.error('SSE 连接错误:', error);};// 注册事件处理器Object.keys(eventHandlers).forEach(eventType => {this.eventSource!.addEventListener(eventType, (event: any) => {this.ngZone.run(() => {eventHandlers[eventType](JSON.parse(event.data));});});});}disconnect() {if (this.eventSource) {this.eventSource.close();this.eventSource = null;}}
}
// app.component.ts
import { Component, OnInit, OnDestroy } from '@angular/core';
import { SseService } from './sse.service';@Component({selector: 'app-root',template: `<div><h3>SSE 实时消息</h3><div [class]="'status ' + (isConnected ? 'connected' : 'disconnected')">{{ isConnected ? '✅ 已连接' : '❌ 未连接' }}</div><div class="messages"><div *ngFor="let msg of messages" class="message"><strong>{{ msg.message }}</strong><time>{{ formatTime(msg.timestamp) }}</time></div></div></div>`,styles: [`.status.connected { background: #d4edda; color: #155724; padding: 10px; }.status.disconnected { background: #f8d7da; color: #721c24; padding: 10px; }.message { padding: 10px; border-left: 3px solid #007bff; margin: 10px 0; }`]
})
export class AppComponent implements OnInit, OnDestroy {messages: any[] = [];isConnected = false;constructor(private sseService: SseService) {}ngOnInit() {this.sseService.connect('http://your-domain.com/api/sse/connect', {connected: (data) => {this.isConnected = true;console.log('连接确认:', data);},register: (data) => {this.messages.unshift(data);this.showNotification(data.message);},heartbeat: (data) => {console.log('心跳:', data);},error: () => {this.isConnected = false;}});}ngOnDestroy() {this.sseService.disconnect();}private showNotification(message: string) {if ('Notification' in window && Notification.permission === 'granted') {new Notification('系统通知', { body: message });}}private formatTime(timestamp: string): string {return new Date(timestamp).toLocaleString();}
}

2. 浏览器兼容性

Polyfill (支持旧浏览器)

<!-- 安装 event-source-polyfill -->
npm install event-source-polyfill<!-- 或使用 CDN -->
<script src="https://cdn.jsdelivr.net/npm/event-source-polyfill@1.0.25/src/eventsource.min.js"></script><script>
// 使用 polyfill
const EventSource = window.EventSourcePolyfill || window.EventSource;const eventSource = new EventSource('/api/sse/connect');
// 正常使用...
</script>

3. 移动端支持

// 移动端 SSE 连接
function connectMobile() {const eventSource = new EventSource('http://your-domain.com/api/sse/connect', {// 移动端优化withCredentials: true});eventSource.onopen = () => {console.log('移动端 SSE 连接成功');};// 监听消息eventSource.addEventListener('notification', (event) => {const data = JSON.parse(event.data);// 移动端推送通知if ('vibrate' in navigator) {navigator.vibrate(200); // 震动}// 显示本地通知showLocalNotification(data.title, data.body);});return eventSource;
}// 显示本地通知(iOS/Android)
function showLocalNotification(title, body) {// 检查支持if (!('Notification' in window)) {console.log('不支持通知');return;}if (Notification.permission === 'granted') {new Notification(title, { body, icon: '/icon.png' });} else if (Notification.permission !== 'denied') {Notification.requestPermission().then(permission => {if (permission === 'granted') {new Notification(title, { body, icon: '/icon.png' });}});}
}

最佳实践

1. 连接管理

自动重连

const MAX_RETRIES = 5;
let retryCount = 0;function connect() {const eventSource = new EventSource('/api/sse/connect');eventSource.onerror = () => {retryCount++;if (retryCount <= MAX_RETRIES) {console.log(`重连中... (${retryCount}/${MAX_RETRIES})`);setTimeout(() => {connect();}, 1000 * retryCount); // 指数退避} else {console.error('重连次数超过限制');alert('连接失败,请刷新页面');}};eventSource.onopen = () => {retryCount = 0; // 重置重连计数console.log('连接成功');};
}connect();

心跳机制

服务端:

// 每 30 秒发送心跳
while (!HttpContext.RequestAborted.IsCancellationRequested)
{await Task.Delay(30000);await _sseService.SendToClientAsync(connectionId, "heartbeat", DateTime.Now.ToString());
}

客户端:

let lastHeartbeat = Date.now();eventSource.addEventListener('heartbeat', () => {lastHeartbeat = Date.now();console.log('心跳正常');
});// 检查连接状态
setInterval(() => {const now = Date.now();if (now - lastHeartbeat > 60000) { // 超过60秒未收到心跳console.warn('连接可能已断开');eventSource.close();connect(); // 重新连接}
}, 10000);

2. 消息格式设计

推荐格式

// 统一的消息格式
{"type": "notification",     // 消息类型"id": "msg-123",         // 消息ID(用于去重)"timestamp": 1705699200000, // 时间戳"data": {                 // 业务数据"title": "新订单","content": "订单 #12345 已创建","url": "/orders/12345"}
}

服务端发送

var message = Newtonsoft.Json.JsonConvert.SerializeObject(new
{type = "notification",id = Guid.NewGuid().ToString(),timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),data = new{title = "新订单",content = $"订单 #{orderId} 已创建",url = $"/orders/{orderId}"}
});await _sseService.BroadcastAsync("notification", message);

客户端处理

const processedIds = new Set();eventSource.addEventListener('notification', (event) => {const msg = JSON.parse(event.data);// 去重处理if (processedIds.has(msg.id)) {return;}processedIds.add(msg.id);// 清理旧的ID(避免内存泄漏)if (processedIds.size > 1000) {const firstId = processedIds.values().next().value;processedIds.delete(firstId);}// 处理消息handleNotification(msg);
});

3. 性能优化

连接池管理

// 服务端 - 限制连接数
const MAX_CONNECTIONS = 10000;function addConnection(connection) {if (connections.size >= MAX_CONNECTIONS) {throw new Error('连接数已达上限');}connections.add(connection);
}// 客户端 - 连接复用
class SseManager {constructor(url) {this.url = url;this.eventSource = null;this.listeners = new Map();}connect() {if (this.eventSource) {return; // 已连接}this.eventSource = new EventSource(this.url);// 事件分发this.eventSource.onmessage = (event) => {const data = JSON.parse(event.data);const listeners = this.listeners.get(data.type) || [];listeners.forEach(callback => callback(data));};}on(eventType, callback) {if (!this.listeners.has(eventType)) {this.listeners.set(eventType, []);}this.listeners.get(eventType).push(callback);}off(eventType, callback) {const listeners = this.listeners.get(eventType) || [];const index = listeners.indexOf(callback);if (index > -1) {listeners.splice(index, 1);}}close() {if (this.eventSource) {this.eventSource.close();this.eventSource = null;}}
}// 使用
const sseManager = new SseManager('/api/sse/connect');
sseManager.connect();
sseManager.on('notification', (data) => {console.log('通知:', data);
});

批量发送

// 批量发送优化
public async Task BroadcastBatchAsync(string eventType, List<string> messages)
{var batchData = Newtonsoft.Json.JsonConvert.SerializeObject(messages);foreach (var connection in _connections){await SendMessageAsync(connection.Value, eventType, batchData);}
}
// 客户端批量处理
eventSource.addEventListener('notification-batch', (event) => {const messages = JSON.parse(event.data); // 消息数组messages.forEach(msg => {processNotification(msg);});
});

4. 错误处理

服务端错误处理

try
{await _sseService.SendToClientAsync(connectionId, eventType, data);
}
catch (IOException ex)
{// 客户端断开连接await _sseService.RemoveConnectionAsync(connectionId);
}
catch (Exception ex)
{_logger.LogError(ex, "SSE 发送消息失败");
}

客户端错误处理

eventSource.onerror = (error) => {// 判断错误类型if (error.target.readyState === EventSource.CLOSED) {console.error('连接已关闭');} else if (error.target.readyState === EventSource.CONNECTING) {console.warn('正在重连...');} else {console.error('未知错误:', error);}
};

常见问题

Q1: SSE 连接频繁断开

原因

  • 代理服务器超时设置过短
  • 防火墙或负载均衡器配置问题
  • 客户端网络不稳定

解决方案

  1. 调整代理超时(Nginx):
location /api/sse/ {proxy_pass http://backend;proxy_set_header Connection '';proxy_http_version 1.1;proxy_read_timeout 86400s;  # 24小时proxy_send_timeout 86400s;chunked_transfer_encoding on;
}
  1. 调整心跳间隔
// 根据代理超时设置调整心跳间隔
await Task.Delay(25000); // 小于代理超时时间

Q2: 消息延迟或丢失

原因

  • 网络拥塞
  • 客户端处理速度慢
  • 缓冲区溢出

解决方案

  1. 调整缓冲区大小
// 客户端设置最大缓冲区
var eventSource = new EventSource(url);
eventSource.MAX_BUFFER_SIZE = 1024 * 1024; // 1MB
  1. 实现消息确认
// 服务端 - 消息队列
public class MessageQueue
{private readonly ConcurrentQueue<Message> _queue = new();public async Task EnqueueAsync(Message message){_queue.Enqueue(message);await ProcessQueueAsync();}private async Task ProcessQueueAsync(){while (_queue.TryDequeue(out var message)){try{await _sseService.BroadcastAsync(message.Type, message.Data);await Task.Delay(100); // 批次间隔}catch{_queue.Enqueue(message); // 重新入队break;}}}
}

Q3: 跨域问题

错误: Access-Control-Allow-Origin 错误

解决方案

服务端(ASP.NET Core):

builder.Services.AddCors(options =>
{options.AddPolicy("AllowSSE", builder =>{builder.WithOrigins("http://yourdomain.com").AllowAnyMethod().AllowAnyHeader().AllowCredentials();});
});app.UseCors("AllowSSE");

客户端:

// EventSource 不支持自定义 headers
// 确保服务端允许跨域
const eventSource = new EventSource('http://api.example.com/sse');

Q4: 浏览器兼容性问题

问题: IE 或旧版浏览器不支持 EventSource

解决方案

// 使用 polyfill
import { EventSourcePolyfill } from 'event-source-polyfill';const EventSource = window.EventSourcePolyfill || window.EventSource;const eventSource = new EventSource('/api/sse/connect', {withCredentials: true
});

或使用轮询降级:

class SseOrPolling {constructor(url) {this.url = url;this.usePolling = typeof EventSource === 'undefined';}connect() {if (this.usePolling) {this.startPolling();} else {this.startSSE();}}startSSE() {this.eventSource = new EventSource(this.url);this.eventSource.onmessage = this.onMessage;}startPolling() {this.interval = setInterval(() => {fetch(this.url).then(res => res.json()).then(data => this.onMessage({ data: JSON.stringify(data) }));}, 3000); // 每3秒轮询}
}

Q5: 内存泄漏

原因

  • 未正确关闭连接
  • 事件监听器未清理
  • 消息队列无限增长

解决方案

class SseConnection {constructor(url) {this.url = url;this.eventSource = null;this.handlers = [];this.messageBuffer = [];}connect() {this.eventSource = new EventSource(this.url);this.eventSource.onmessage = (event) => {// 限制缓冲区大小if (this.messageBuffer.length > 100) {this.messageBuffer.shift();}this.messageBuffer.push(event.data);// 触发处理器this.handlers.forEach(handler => handler(event.data));};}on(handler) {this.handlers.push(handler);}destroy() {// 清理资源if (this.eventSource) {this.eventSource.close();this.eventSource = null;}this.handlers = [];this.messageBuffer = [];}
}

性能优化

1. 服务端优化

连接池限制

public class SseConnectionLimiter
{private readonly SemaphoreSlim _semaphore;private readonly int _maxConnections;public SseConnectionLimiter(int maxConnections){_maxConnections = maxConnections;_semaphore = new SemaphoreSlim(maxConnections, maxConnections);}public async Task<bool> TryAcquireAsync(){return await _semaphore.WaitAsync(0);}public void Release(){_semaphore.Release();}public int AvailableConnections => _semaphore.CurrentCount;
}

消息压缩

using System.IO.Compression;public async Task SendCompressedAsync(HttpResponse response, string data)
{response.Headers.Add("Content-Encoding", "gzip");using var gzip = new GZipStream(response.Body, CompressionMode.Compress);using var writer = new StreamWriter(gzip);await writer.WriteAsync(data);
}

异步处理

// 使用 Channel 进行异步消息传递
public class SseBroadcaster
{private readonly Channel<string> _channel;public SseBroadcaster(){_channel = Channel.CreateUnbounded<string>();Task.Run(ProcessMessagesAsync);}public async Task BroadcastAsync(string message){await _channel.Writer.WriteAsync(message);}private async Task ProcessMessagesAsync(){await foreach (var message in _channel.Reader.ReadAllAsync()){// 异步广播_ = Task.Run(() => SendToAllClientsAsync(message));}}
}

2. 客户端优化

节流处理

class ThrottledSseHandler {constructor(throttleMs = 100) {this.throttleMs = throttleMs;this.lastMessageTime = 0;this.pendingMessage = null;this.timeout = null;}handle(message) {const now = Date.now();this.pendingMessage = message;if (now - this.lastMessageTime >= this.throttleMs) {this.processMessage(message);this.lastMessageTime = now;} else {clearTimeout(this.timeout);this.timeout = setTimeout(() => {this.processMessage(this.pendingMessage);this.lastMessageTime = Date.now();}, this.throttleMs);}}processMessage(message) {// 实际处理逻辑console.log('处理消息:', message);}
}// 使用
const handler = new ThrottledSseHandler(200);
eventSource.addEventListener('data', (e) => handler.handle(e.data));

批量更新 UI

class BatchedUiUpdater {constructor(batchSize = 10, delayMs = 100) {this.batch = [];this.batchSize = batchSize;this.delayMs = delayMs;this.timeout = null;}add(item) {this.batch.push(item);if (this.batch.length >= this.batchSize) {this.flush();} else {clearTimeout(this.timeout);this.timeout = setTimeout(() => this.flush(), this.delayMs);}}flush() {if (this.batch.length === 0) return;// 批量更新 DOMconst fragment = document.createDocumentFragment();this.batch.forEach(item => {const element = this.createElement(item);fragment.appendChild(element);});document.getElementById('messages').appendChild(fragment);this.batch = [];}
}

安全考虑

1. 认证与授权

Token 认证

// 方式1: URL 参数
const token = localStorage.getItem('token');
const eventSource = new EventSource(`/api/sse/connect?token=${token}`);// 方式2: Cookie(推荐)
// 服务端在登录时设置 HttpOnly Cookie
// EventSource 自动携带 Cookie
const eventSource = new EventSource('/api/sse/connect');

服务端验证:

[HttpGet]
[Route("connect")]
public async Task<IActionResult> Connect()
{// 从 URL 参数获取 Tokenvar token = HttpContext.Request.Query["token"];// 从 Cookie 获取 Tokenvar cookieToken = HttpContext.Request.Cookies["auth_token"];// 验证 Tokenif (!_authService.ValidateToken(token ?? cookieToken)){return Unauthorized();}// 建立连接// ...
}

2. 限流

public class SseRateLimiter
{private readonly Dictionary<string, (int Count, DateTime ResetTime)> _clients;private readonly int _maxRequestsPerMinute;public bool IsAllowed(string clientId){var now = DateTime.UtcNow;if (!_clients.ContainsKey(clientId)){_clients[clientId] = (1, now.AddMinutes(1));return true;}var (count, resetTime) = _clients[clientId];if (now >= resetTime){_clients[clientId] = (1, now.AddMinutes(1));return true;}if (count >= _maxRequestsPerMinute){return false;}_clients[clientId] = (count + 1, resetTime);return true;}
}

3. 数据加密

public string EncryptMessage(string data, string key)
{using var aes = Aes.Create();aes.Key = Encoding.UTF8.GetBytes(key);aes.IV = new byte[16];using var encryptor = aes.CreateEncryptor();using var ms = new MemoryStream();using (var cs = new CryptoStream(ms, encryptor, CryptoStreamMode.Write)){using var sw = new StreamWriter(cs);sw.Write(data);}return Convert.ToBase64String(ms.ToArray());
}

生产环境部署

1. Nginx 配置

upstream backend {server 127.0.0.1:5000;
}server {listen 80;server_name your-domain.com;# SSE 路径配置location /api/sse/ {proxy_pass http://backend;# SSE 必需配置proxy_set_header Connection '';proxy_http_version 1.1;chunked_transfer_encoding on;# 超时设置proxy_read_timeout 86400s;    # 24小时proxy_send_timeout 86400s;# 禁用缓冲proxy_buffering off;proxy_cache off;# 连接保持proxy_connect_timeout 60s;# 转发事件流头proxy_pass_header Content-Type;proxy_pass_header Cache-Control;}# 其他 API 请求location /api/ {proxy_pass http://backend;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;proxy_set_header X-Forwarded-Proto $scheme;}
}

2. Docker 部署

# Dockerfile
FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base
WORKDIR /app
EXPOSE 80FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
WORKDIR /src
COPY ["SmartConv/SmartConv.csproj", "SmartConv/"]
RUN dotnet restore "SmartConv/SmartConv.csproj"
COPY . .
WORKDIR "/src/SmartConv"
RUN dotnet build "SmartConv.csproj" -c Release -o /app/buildFROM build AS publish
RUN dotnet publish "SmartConv.csproj" -c Release -o /app/publishFROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "SmartConv.dll"]
# docker-compose.yml
version: '3.8'services:app:build: .ports:- "8080:80"environment:- ASPNETCORE_ENVIRONMENT=Production- ASPNETCORE_URLS=http://+:80restart: unless-stoppedhealthcheck:test: ["CMD", "curl", "-f", "http://localhost/health"]interval: 30stimeout: 10sretries: 3

3. Kubernetes 部署

# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:name: sse-app
spec:replicas: 3selector:matchLabels:app: sse-apptemplate:metadata:labels:app: sse-appspec:containers:- name: sse-appimage: your-registry/sse-app:latestports:- containerPort: 80env:- name: ASPNETCORE_ENVIRONMENTvalue: "Production"resources:requests:memory: "256Mi"cpu: "250m"limits:memory: "512Mi"cpu: "500m"livenessProbe:httpGet:path: /healthport: 80initialDelaySeconds: 30periodSeconds: 10readinessProbe:httpGet:path: /readyport: 80initialDelaySeconds: 5periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:name: sse-app-service
spec:selector:app: sse-appports:- protocol: TCPport: 80targetPort: 80type: LoadBalancer

4. 监控与日志

// 添加监控指标
public class SseMetrics
{public int TotalConnections { get; set; }public int ActiveConnections { get; set; }public long MessagesSent { get; set; }public long BytesTransferred { get; set; }
}// Prometheus 指标
public class SseMetricsExporter
{private readonly Counter _connectionsTotal;private readonly Gauge _connectionsActive;private readonly Counter _messagesTotal;public SseMetricsExporter(){_connectionsTotal = Metrics.CreateCounter("sse_connections_total", "Total SSE connections");_connectionsActive = Metrics.CreateGauge("sse_connections_active", "Active SSE connections");_messagesTotal = Metrics.CreateCounter("sse_messages_total", "Total SSE messages sent");}public void RecordConnection(){_connectionsTotal.Inc();_connectionsActive.Inc();}public void RecordDisconnection(){_connectionsActive.Dec();}public void RecordMessage(){_messagesTotal.Inc();}
}

5. 负载均衡注意事项

重要: SSE 是有状态连接,需要会话保持(Sticky Session)。

upstream backend {ip_hash;  # 使用 IP 哈希实现会话保持server 127.0.0.1:5000;server 127.0.0.1:5001;server 127.0.0.1:5002;
}

或者使用 Redis 存储连接状态,实现跨节点广播。


总结

SSE 是实现服务器推送的简单高效方案,特别适合:

✅ 实时通知
✅ 数据更新推送
✅ 系统监控
✅ 日志流式输出

相比 WebSocket,SSE 更简单、更容易部署、且天然支持自动重连。

核心要点

  1. 配置正确的响应头: Content-Type: text/event-stream
  2. 实现心跳机制: 保持连接活跃
  3. 处理连接异常: 自动重连
  4. 优化性能: 连接池、批量处理
  5. 安全保障: 认证、限流、加密
  6. 生产部署: 代理配置、会话保持

快速开始

// 客户端
const eventSource = new EventSource('/api/sse/connect');
eventSource.addEventListener('notification', e => {console.log(JSON.parse(e.data));
});
// 服务端
Response.Headers.Add("Content-Type", "text/event-stream");
Response.Headers.Add("Cache-Control", "no-cache");await Response.WriteAsync($"event: notification\ndata: {message}\n\n");
await Response.Body.FlushAsync();

祝开发顺利!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询