1.删除无用配置

2.Sse 添加心跳机制
This commit is contained in:
2025-07-18 19:04:33 +08:00
parent 4cd6ea0c3f
commit 0fbb59b18e
2 changed files with 23 additions and 22 deletions

View File

@@ -8,7 +8,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.*;
import java.util.function.Consumer;
/**
@@ -16,7 +16,8 @@ import java.util.function.Consumer;
*/
@Slf4j
public class SseEmitterUtil {
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
/**
* 使用map对象便于根据userId来获取对应的SseEmitter或者放redis里面
*/
@@ -25,16 +26,30 @@ public class SseEmitterUtil {
public static SseEmitter connect(String userId) {
// 设置超时时间0表示不过期。默认30S超时时间未完成会抛出异常AsyncRequestTimeoutException
SseEmitter sseEmitter = new SseEmitter(0L);
// 设置超时时间为0表示永不超时或设置为较长的时间
// 注册回调
sseEmitter.onCompletion(completionCallBack(userId));
sseEmitter.onError(errorCallBack(userId));
sseEmitter.onTimeout(timeoutCallBack(userId));
sseEmitterMap.put(userId, sseEmitter);
log.info("创建新的 SSE 连接,当前用户 {}, 连接总数 {}", userId, sseEmitterMap.size());
// 启动一个任务定期发送心跳
ScheduledFuture<?> heartbeat = scheduler.scheduleAtFixedRate(() -> {
try {
// 发送心跳注释,防止代理或浏览器断开
sseEmitter.send(SseEmitter.event().comment("heartbeat"));
} catch (IOException e) {
sseEmitter.completeWithError(e);
}
}, 0, 10, TimeUnit.SECONDS); // 每10秒一次
return sseEmitter;
}
/**
* 给制定用户发送消息
@@ -113,6 +128,10 @@ public class SseEmitterUtil {
public static SseEmitter getSseEmitter(String userId) {
return sseEmitterMap.get(userId);
}
private static Runnable completionCallBack(String userId) {
return () -> {