// sse-server.js (CommonJS) const http = require('http'); const express = require('express'); const cors = require('cors'); function startSSE({ port = 3312, path = '/events', corsOrigin = '*', // 也可填 'http://localhost:8080' 或你的前端地址 heartbeatMs = 15000, } = {}) { const app = express(); if (corsOrigin) app.use(cors({ origin: corsOrigin, credentials: true })); const clients = new Map(); // id -> res app.get(path, (req, res) => { res.set({ 'Content-Type': 'text/event-stream; charset=utf-8', 'Cache-Control': 'no-cache, no-transform', 'Connection': 'keep-alive', 'X-Accel-Buffering': 'no' }); if (corsOrigin) res.set('Access-Control-Allow-Origin', corsOrigin); res.flushHeaders?.(); const id = `${Date.now()}-${Math.random().toString(36).slice(2)}`; clients.set(id, res); // // 初次握手 // res.write(`event: hello\n`); // res.write(`data: ${JSON.stringify({ connected: true, t: Date.now() })}\n\n`); console.log("sse连接成功") const timer = setInterval(() => { if (!res.writableEnded) res.write(`: keep-alive ${Date.now()}\n\n`); }, heartbeatMs); req.on('close', () => { clearInterval(timer); clients.delete(id); try { res.end(); } catch { } }); }); // 简单健康检查 app.get('/health', (_req, res) => res.json({ ok: true, clients: clients.size })); const server = http.createServer(app); server.listen(port, () => { console.log(`[SSE] listening at http://127.0.0.1:${port}${path}`); }); // 广播:event 自定义;data 可传对象/字符串/Buffer function broadcast(eventOrData, maybeData) { const hasEventName = typeof maybeData !== 'undefined'; // 两种调用方式 const eventName = hasEventName ? eventOrData : null; const data = hasEventName ? maybeData : eventOrData; const payload = Buffer.isBuffer(data) ? data.toString() : (typeof data === 'string' ? data : JSON.stringify(data)); for (const [id, res] of clients.entries()) { try { if (eventName && eventName !== 'message') { res.write(`event: ${eventName}\n`); } // 否则省略 event 行 => 触发前端 onmessage res.write(`data: ${payload}\n\n`); } catch { clients.delete(id); try { res.end(); } catch { } } } } async function close() { for (const [, res] of clients.entries()) { try { res.end(); } catch { } } clients.clear(); await new Promise(r => server.close(r)); } return { app, server, broadcast, close, clientsCount: () => clients.size }; } module.exports = { startSSE };