diff --git a/src/main/java/com/yupi/springbootinit/controller/SseController.java b/src/main/java/com/yupi/springbootinit/controller/SseController.java new file mode 100644 index 0000000..5ed4ebb --- /dev/null +++ b/src/main/java/com/yupi/springbootinit/controller/SseController.java @@ -0,0 +1,42 @@ +package com.yupi.springbootinit.controller; + +import cn.hutool.json.JSONUtil; +import com.yupi.springbootinit.model.entity.NewHosts; +import com.yupi.springbootinit.utils.SseEmitterUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +@Slf4j +@RestController +@RequestMapping("/sse") +public class SseController { + + /** + * 用于创建连接 + */ + @GetMapping("/connect/{tenantID}/{userId}") + public SseEmitter connect(@PathVariable String tenantID,@PathVariable String userId) { + return SseEmitterUtil.connect(tenantID+"-"+userId); + } + + /** + * 关闭连接 + */ + @GetMapping("/close/{tenantID}/{userId}") + public void close(@PathVariable String tenantID,@PathVariable String userId) { + SseEmitterUtil.removeUser(tenantID+"-"+userId); + } + + @GetMapping("/sse") + public void sse(String connectId,String message){ + // 构建推送消息体 + NewHosts newHosts = new NewHosts(); + newHosts.setId(123123213L); + newHosts.setHostsId(message); + SseEmitterUtil.sendMessage(connectId, JSONUtil.toJsonStr(newHosts)); + } +} \ No newline at end of file diff --git a/src/main/java/com/yupi/springbootinit/model/dto/host/HostInfoDTO.java b/src/main/java/com/yupi/springbootinit/model/dto/host/HostInfoDTO.java index 58ecac0..634f43e 100644 --- a/src/main/java/com/yupi/springbootinit/model/dto/host/HostInfoDTO.java +++ b/src/main/java/com/yupi/springbootinit/model/dto/host/HostInfoDTO.java @@ -86,4 +86,6 @@ public class HostInfoDTO { @ApiModelProperty(value = "入库人", example = "1001") private Integer creator; + + } diff --git a/src/main/java/com/yupi/springbootinit/model/entity/NewHosts.java b/src/main/java/com/yupi/springbootinit/model/entity/NewHosts.java index 5139d5e..475bf2c 100644 --- a/src/main/java/com/yupi/springbootinit/model/entity/NewHosts.java +++ b/src/main/java/com/yupi/springbootinit/model/entity/NewHosts.java @@ -10,40 +10,40 @@ import java.time.LocalDateTime; import java.util.Date; /* -* @author: ziin -* @date: 2025/6/10 18:54 -*/ + * @author: ziin + * @date: 2025/6/10 18:54 + */ @Data @ApiModel("主播信息DTO") public class NewHosts { /** - * 主键 - */ + * 主键 + */ @ApiModelProperty(value = "主键", example = "1") @TableId(value = "id", type = IdType.AUTO) private Long id; /** - * 主播id - */ + * 主播id + */ @ApiModelProperty(value = "主播id", example = "host123") private String hostsId; /** - * 主播等级 - */ + * 主播等级 + */ @ApiModelProperty(value = "主播等级", example = "A") private String hostsLevel; /** - * 主播金币 - */ + * 主播金币 + */ @ApiModelProperty(value = "主播金币", example = "1000") private Integer hostsCoins; /** - * 邀请类型 - */ + * 邀请类型 + */ @ApiModelProperty(value = "邀请类型", example = "1") private Integer invitationType; @@ -52,40 +52,40 @@ public class NewHosts { private Integer onlineFans; /** - * 粉丝数量 - */ + * 粉丝数量 + */ @ApiModelProperty(value = "粉丝数量", example = "5000") private Integer fans; /** - * 关注数量 - */ + * 关注数量 + */ @ApiModelProperty(value = "关注数量", example = "200") private Integer fllowernum; /** - * 昨日金币 - */ + * 昨日金币 + */ @ApiModelProperty(value = "昨日金币", example = "800") private Integer yesterdayCoins; /** - * 主播国家 - */ + * 主播国家 + */ @ApiModelProperty(value = "主播国家", example = "中国") private String country; /** - * 直播类型 娱乐,游戏 - */ + * 直播类型 娱乐,游戏 + */ @ApiModelProperty(value = "直播类型 娱乐,游戏", example = "娱乐") private String hostsKind; /** - * 租户 Id - */ + * 租户 Id + */ @ApiModelProperty(value = "租户 Id", example = "1") private Long tenantId; @@ -96,15 +96,15 @@ public class NewHosts { @ApiModelProperty(value = "租户 Id", example = "1") private Long userId; /** - * 入库人 - */ + * 入库人 + */ @ApiModelProperty(value = "入库人", example = "1001") private Long creator; /** - * 数据插入时间 - */ + * 数据插入时间 + */ private LocalDateTime createTime; diff --git a/src/main/java/com/yupi/springbootinit/service/impl/HostInfoServiceImpl.java b/src/main/java/com/yupi/springbootinit/service/impl/HostInfoServiceImpl.java index 12943d2..3e7118c 100644 --- a/src/main/java/com/yupi/springbootinit/service/impl/HostInfoServiceImpl.java +++ b/src/main/java/com/yupi/springbootinit/service/impl/HostInfoServiceImpl.java @@ -8,6 +8,8 @@ import com.yupi.springbootinit.model.dto.host.QueryCountDTO; import com.yupi.springbootinit.model.entity.NewHosts; import com.yupi.springbootinit.model.vo.country.CountryInfoVO; import com.yupi.springbootinit.service.HostInfoService; +import com.yupi.springbootinit.utils.JsonUtils; +import com.yupi.springbootinit.utils.SseEmitterUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; @@ -49,10 +51,15 @@ public class HostInfoServiceImpl extends ServiceImpl i @Override public CompletableFuture saveHostInfo(List newHosts) { + try { StopWatch stopWatch = new StopWatch(); stopWatch.start(); saveBatch(newHosts); + newHosts.forEach(newHost -> { + SseEmitterUtil.sendMessage(newHost.getTenantId().toString()+"-"+newHost.getUserId().toString(), + JsonUtils.toJsonString(newHost)); + }); stopWatch.stop(); long totalTimeMillis = stopWatch.getTotalTimeMillis(); log.info("当前存储数据量大小 {}, 存储花费: {}ms",newHosts.size(),totalTimeMillis); diff --git a/src/main/java/com/yupi/springbootinit/utils/SseEmitterUtil.java b/src/main/java/com/yupi/springbootinit/utils/SseEmitterUtil.java new file mode 100644 index 0000000..84d8dc8 --- /dev/null +++ b/src/main/java/com/yupi/springbootinit/utils/SseEmitterUtil.java @@ -0,0 +1,136 @@ +package com.yupi.springbootinit.utils; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.MediaType; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; + +/** + * SSE长链接工具类 + */ +@Slf4j +public class SseEmitterUtil { + + /** + * 使用map对象,便于根据userId来获取对应的SseEmitter,或者放redis里面 + */ + private final static Map sseEmitterMap = new ConcurrentHashMap<>(); + + public static SseEmitter connect(String userId) { + // 设置超时时间,0表示不过期。默认30S,超时时间未完成会抛出异常:AsyncRequestTimeoutException + SseEmitter sseEmitter = new SseEmitter(0L); + + // 注册回调 + sseEmitter.onCompletion(completionCallBack(userId)); + sseEmitter.onError(errorCallBack(userId)); + sseEmitter.onTimeout(timeoutCallBack(userId)); + sseEmitterMap.put(userId, sseEmitter); + + log.info("创建新的 SSE 连接,当前用户 {}, 连接总数 {}", userId, sseEmitterMap.size()); + return sseEmitter; + } + + /** + * 给制定用户发送消息 + * + * @param connectId 指定用户名 + * @param sseMessage 消息体 + */ + public static void sendMessage(String connectId, String sseMessage) { + if (sseEmitterMap.containsKey(connectId)) { + try { + sseEmitterMap.get(connectId).send(sseMessage); + log.info("用户 {} 推送消息 {}", connectId, sseMessage); + } catch (IOException e) { + log.error("用户 {} 推送消息异常", connectId, e); + removeUser(connectId); + } + } else { + log.error("消息推送 用户 {} 不存在,链接总数 {}", connectId, sseEmitterMap.size()); + } + } + + /** + * 群发消息 + */ + public static void batchSendMessage(String message, List ids) { + ids.forEach(userId -> sendMessage(userId, message)); + } + + /** + * 群发所有人 + */ + public static void batchSendMessage(String message) { + sseEmitterMap.forEach((k, v) -> { + try { + v.send(message, MediaType.APPLICATION_JSON); + } catch (IOException e) { + log.error("用户 {} 推送异常", k, e); + removeUser(k); + } + }); + } + + /** + * 移除用户连接 + * + * @param userId 用户 ID + */ + public static void removeUser(String userId) { + if (sseEmitterMap.containsKey(userId)) { + sseEmitterMap.get(userId).complete(); + sseEmitterMap.remove(userId); + log.info("移除用户 {}, 剩余连接 {}", userId, sseEmitterMap.size()); + } else { + log.error("消息推送 用户 {} 已被移除,剩余连接 {}", userId, sseEmitterMap.size()); + } + } + + /** + * 获取当前连接信息 + * + * @return 所有的连接用户 + */ + public static List getIds() { + return new ArrayList<>(sseEmitterMap.keySet()); + } + + /** + * 获取当前的连接数量 + * + * @return 当前的连接数量 + */ + public static int getUserCount() { + return sseEmitterMap.size(); + } + + public static SseEmitter getSseEmitter(String userId) { + return sseEmitterMap.get(userId); + } + + private static Runnable completionCallBack(String userId) { + return () -> { + log.info("用户 {} 结束连接", userId); + }; + } + + private static Runnable timeoutCallBack(String userId) { + return () -> { + log.error("用户 {} 连接超时", userId); + removeUser(userId); + }; + } + + private static Consumer errorCallBack(String userId) { + return throwable -> { + log.error("用户 {} 连接异常", userId); + removeUser(userId); + }; + } +} \ No newline at end of file diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index ebe3572..bc5bf4f 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -19,7 +19,7 @@ spring: name: springboot-init # 默认 dev 环境 profiles: - active: prod + active: dev # 支持 swagger3 mvc: pathmatch: diff --git a/src/main/resources/mapper/NewHostsMapper.xml b/src/main/resources/mapper/NewHostsMapper.xml index 8ce690e..e35b4db 100644 --- a/src/main/resources/mapper/NewHostsMapper.xml +++ b/src/main/resources/mapper/NewHostsMapper.xml @@ -15,15 +15,23 @@ + - + + + + + + + - id, hosts_id, hosts_level, hosts_coins, Invitation_type, online_fans,fans, fllowernum, yesterday_coins, - country, hosts_kind, tenant_id, creator, create_time, updater, update_time,uid + id, hosts_id, hosts_level, hosts_coins, Invitation_type, online_fans, fans, fllowernum, + yesterday_coins, country, hosts_kind, is_assigned, tenant_id, creator, create_time, + updater, update_time, user_id, deleted, `uid`, ai_operation, operation_status