1.添加Sse接口推送数据

This commit is contained in:
2025-07-18 15:38:20 +08:00
parent 2912a29884
commit 4cd6ea0c3f
7 changed files with 228 additions and 33 deletions

View File

@@ -0,0 +1,42 @@
package com.yupi.springbootinit.controller;
import cn.hutool.json.JSONUtil;
import com.yupi.springbootinit.model.entity.NewHosts;
import com.yupi.springbootinit.utils.SseEmitterUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@Slf4j
@RestController
@RequestMapping("/sse")
public class SseController {
/**
* 用于创建连接
*/
@GetMapping("/connect/{tenantID}/{userId}")
public SseEmitter connect(@PathVariable String tenantID,@PathVariable String userId) {
return SseEmitterUtil.connect(tenantID+"-"+userId);
}
/**
* 关闭连接
*/
@GetMapping("/close/{tenantID}/{userId}")
public void close(@PathVariable String tenantID,@PathVariable String userId) {
SseEmitterUtil.removeUser(tenantID+"-"+userId);
}
@GetMapping("/sse")
public void sse(String connectId,String message){
// 构建推送消息体
NewHosts newHosts = new NewHosts();
newHosts.setId(123123213L);
newHosts.setHostsId(message);
SseEmitterUtil.sendMessage(connectId, JSONUtil.toJsonStr(newHosts));
}
}

View File

@@ -86,4 +86,6 @@ public class HostInfoDTO {
@ApiModelProperty(value = "入库人", example = "1001")
private Integer creator;
}

View File

@@ -10,40 +10,40 @@ import java.time.LocalDateTime;
import java.util.Date;
/*
* @author: ziin
* @date: 2025/6/10 18:54
*/
* @author: ziin
* @date: 2025/6/10 18:54
*/
@Data
@ApiModel("主播信息DTO")
public class NewHosts {
/**
* 主键
*/
* 主键
*/
@ApiModelProperty(value = "主键", example = "1")
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
* 主播id
*/
* 主播id
*/
@ApiModelProperty(value = "主播id", example = "host123")
private String hostsId;
/**
* 主播等级
*/
* 主播等级
*/
@ApiModelProperty(value = "主播等级", example = "A")
private String hostsLevel;
/**
* 主播金币
*/
* 主播金币
*/
@ApiModelProperty(value = "主播金币", example = "1000")
private Integer hostsCoins;
/**
* 邀请类型
*/
* 邀请类型
*/
@ApiModelProperty(value = "邀请类型", example = "1")
private Integer invitationType;
@@ -52,40 +52,40 @@ public class NewHosts {
private Integer onlineFans;
/**
* 粉丝数量
*/
* 粉丝数量
*/
@ApiModelProperty(value = "粉丝数量", example = "5000")
private Integer fans;
/**
* 关注数量
*/
* 关注数量
*/
@ApiModelProperty(value = "关注数量", example = "200")
private Integer fllowernum;
/**
* 昨日金币
*/
* 昨日金币
*/
@ApiModelProperty(value = "昨日金币", example = "800")
private Integer yesterdayCoins;
/**
* 主播国家
*/
* 主播国家
*/
@ApiModelProperty(value = "主播国家", example = "中国")
private String country;
/**
* 直播类型 娱乐,游戏
*/
* 直播类型 娱乐,游戏
*/
@ApiModelProperty(value = "直播类型 娱乐,游戏", example = "娱乐")
private String hostsKind;
/**
* 租户 Id
*/
* 租户 Id
*/
@ApiModelProperty(value = "租户 Id", example = "1")
private Long tenantId;
@@ -96,15 +96,15 @@ public class NewHosts {
@ApiModelProperty(value = "租户 Id", example = "1")
private Long userId;
/**
* 入库人
*/
* 入库人
*/
@ApiModelProperty(value = "入库人", example = "1001")
private Long creator;
/**
* 数据插入时间
*/
* 数据插入时间
*/
private LocalDateTime createTime;

View File

@@ -8,6 +8,8 @@ import com.yupi.springbootinit.model.dto.host.QueryCountDTO;
import com.yupi.springbootinit.model.entity.NewHosts;
import com.yupi.springbootinit.model.vo.country.CountryInfoVO;
import com.yupi.springbootinit.service.HostInfoService;
import com.yupi.springbootinit.utils.JsonUtils;
import com.yupi.springbootinit.utils.SseEmitterUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
@@ -49,10 +51,15 @@ public class HostInfoServiceImpl extends ServiceImpl<NewHostsMapper, NewHosts> i
@Override
public CompletableFuture<Void> saveHostInfo(List<NewHosts> newHosts) {
try {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
saveBatch(newHosts);
newHosts.forEach(newHost -> {
SseEmitterUtil.sendMessage(newHost.getTenantId().toString()+"-"+newHost.getUserId().toString(),
JsonUtils.toJsonString(newHost));
});
stopWatch.stop();
long totalTimeMillis = stopWatch.getTotalTimeMillis();
log.info("当前存储数据量大小 {}, 存储花费: {}ms",newHosts.size(),totalTimeMillis);

View File

@@ -0,0 +1,136 @@
package com.yupi.springbootinit.utils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
/**
* SSE长链接工具类
*/
@Slf4j
public class SseEmitterUtil {
/**
* 使用map对象便于根据userId来获取对应的SseEmitter或者放redis里面
*/
private final static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
public static SseEmitter connect(String userId) {
// 设置超时时间0表示不过期。默认30S超时时间未完成会抛出异常AsyncRequestTimeoutException
SseEmitter sseEmitter = new SseEmitter(0L);
// 注册回调
sseEmitter.onCompletion(completionCallBack(userId));
sseEmitter.onError(errorCallBack(userId));
sseEmitter.onTimeout(timeoutCallBack(userId));
sseEmitterMap.put(userId, sseEmitter);
log.info("创建新的 SSE 连接,当前用户 {}, 连接总数 {}", userId, sseEmitterMap.size());
return sseEmitter;
}
/**
* 给制定用户发送消息
*
* @param connectId 指定用户名
* @param sseMessage 消息体
*/
public static void sendMessage(String connectId, String sseMessage) {
if (sseEmitterMap.containsKey(connectId)) {
try {
sseEmitterMap.get(connectId).send(sseMessage);
log.info("用户 {} 推送消息 {}", connectId, sseMessage);
} catch (IOException e) {
log.error("用户 {} 推送消息异常", connectId, e);
removeUser(connectId);
}
} else {
log.error("消息推送 用户 {} 不存在,链接总数 {}", connectId, sseEmitterMap.size());
}
}
/**
* 群发消息
*/
public static void batchSendMessage(String message, List<String> ids) {
ids.forEach(userId -> sendMessage(userId, message));
}
/**
* 群发所有人
*/
public static void batchSendMessage(String message) {
sseEmitterMap.forEach((k, v) -> {
try {
v.send(message, MediaType.APPLICATION_JSON);
} catch (IOException e) {
log.error("用户 {} 推送异常", k, e);
removeUser(k);
}
});
}
/**
* 移除用户连接
*
* @param userId 用户 ID
*/
public static void removeUser(String userId) {
if (sseEmitterMap.containsKey(userId)) {
sseEmitterMap.get(userId).complete();
sseEmitterMap.remove(userId);
log.info("移除用户 {}, 剩余连接 {}", userId, sseEmitterMap.size());
} else {
log.error("消息推送 用户 {} 已被移除,剩余连接 {}", userId, sseEmitterMap.size());
}
}
/**
* 获取当前连接信息
*
* @return 所有的连接用户
*/
public static List<String> getIds() {
return new ArrayList<>(sseEmitterMap.keySet());
}
/**
* 获取当前的连接数量
*
* @return 当前的连接数量
*/
public static int getUserCount() {
return sseEmitterMap.size();
}
public static SseEmitter getSseEmitter(String userId) {
return sseEmitterMap.get(userId);
}
private static Runnable completionCallBack(String userId) {
return () -> {
log.info("用户 {} 结束连接", userId);
};
}
private static Runnable timeoutCallBack(String userId) {
return () -> {
log.error("用户 {} 连接超时", userId);
removeUser(userId);
};
}
private static Consumer<Throwable> errorCallBack(String userId) {
return throwable -> {
log.error("用户 {} 连接异常", userId);
removeUser(userId);
};
}
}

View File

@@ -19,7 +19,7 @@ spring:
name: springboot-init
# 默认 dev 环境
profiles:
active: prod
active: dev
# 支持 swagger3
mvc:
pathmatch:

View File

@@ -15,15 +15,23 @@
<result column="yesterday_coins" jdbcType="INTEGER" property="yesterdayCoins" />
<result column="country" jdbcType="VARCHAR" property="country" />
<result column="hosts_kind" jdbcType="VARCHAR" property="hostsKind" />
<result column="is_assigned" jdbcType="TINYINT" property="isAssigned" />
<result column="tenant_id" jdbcType="BIGINT" property="tenantId" />
<result column="creator" jdbcType="BIGINT" property="creator" />
<result column="create_time" jdbcType="TIMESTAMP" property="createTime" />
<result column="uid" jdbcType="TIMESTAMP" property="uid" />
<result column="updater" jdbcType="VARCHAR" property="updater" />
<result column="update_time" jdbcType="TIMESTAMP" property="updateTime" />
<result column="user_id" jdbcType="BIGINT" property="userId" />
<result column="deleted" jdbcType="TINYINT" property="deleted" />
<result column="uid" jdbcType="VARCHAR" property="uid" />
<result column="ai_operation" jdbcType="TINYINT" property="aiOperation" />
<result column="operation_status" jdbcType="TINYINT" property="operationStatus" />
</resultMap>
<sql id="Base_Column_List">
<!--@mbg.generated-->
id, hosts_id, hosts_level, hosts_coins, Invitation_type, online_fans,fans, fllowernum, yesterday_coins,
country, hosts_kind, tenant_id, creator, create_time, updater, update_time,uid
id, hosts_id, hosts_level, hosts_coins, Invitation_type, online_fans, fans, fllowernum,
yesterday_coins, country, hosts_kind, is_assigned, tenant_id, creator, create_time,
updater, update_time, user_id, deleted, `uid`, ai_operation, operation_status
</sql>
<select id="selectByPrimaryKey" parameterType="java.lang.Long" resultMap="BaseResultMap">
<!--@mbg.generated-->