Compare commits

...

10 Commits

Author SHA1 Message Date
dfccc03df8 fix(entity): 为ServerBigBrother实体增加secUid字段及映射 2025-12-15 20:13:09 +08:00
cf15298968 feat(mq): 新增 WebAI 消息队列支持
为支持 WebAI 登录场景,新增 web.ai.headers.exchange 交换机及对应发送方法;
在 HostInfoServiceImpl 中增加 Redis 前缀判断与 webAISend 调用逻辑。
2025-12-05 13:57:34 +08:00
9f062492c4 refactor(controller): 移除未使用的 List 导入并整理代码格式 2025-11-20 17:42:59 +08:00
4d20448b82 refactor(mq): 拆分交换器并新增brotherSend方法
- 新增 AI_CHAT_EXCHANGE_NAME、BIG_BROTHER_EXCHANGE_NAME 两个HeadersExchange
- MQSender 新增 brotherSend 方法,支持按 tenantId 路由到大佬队列
- ServerBigBrotherServiceImpl 在保存/更新后,若 Redis 标记存在则异步发送消息
- 移除 HostInfoServiceImpl 多余空行,保持代码整洁
2025-11-10 20:43:43 +08:00
b82dbacfee 1.添加英文国家名字段 2025-10-13 17:56:03 +08:00
a4020b9176 1.添加爬主播 发送到AI消息队列 2025-09-25 21:52:43 +08:00
ca86db8f66 1.大哥添加粉丝团字段 2025-09-23 15:57:06 +08:00
417c2186c2 1.修改判断用户是否登录 AI 逻辑 2025-08-27 21:40:38 +08:00
85d62459a6 1.修改判断发送AI消息队列逻辑 2025-08-27 21:27:16 +08:00
704482ae0e 1.通过 rabbitmq 发送 AI 消息 2025-08-27 19:28:48 +08:00
14 changed files with 209 additions and 40 deletions

View File

@@ -88,7 +88,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>r07</version>
<version>30.1-jre</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@@ -42,10 +42,10 @@ public class LogInterceptor {
Object[] args = point.getArgs();
String reqParam = "[" + StringUtils.join(args, ", ") + "]";
// 输出请求日志
// log.info("request startid: {}, path: {}, ip: {}, params: {}", requestId, url,
// httpServletRequest.getRemoteHost(), reqParam);
log.info("request startid: {}, path: {}, ip: {}", requestId, url,
httpServletRequest.getRemoteHost());
log.info("request startid: {}, path: {}, ip: {}, params: {}", requestId, url,
httpServletRequest.getRemoteHost(), reqParam);
// log.info("request startid: {}, path: {}, ip: {}", requestId, url,
// httpServletRequest.getRemoteHost());
// 执行原方法
Object result = point.proceed();
// 输出响应日志

View File

@@ -5,9 +5,11 @@ import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.rabbitmq.client.ConnectionFactory;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
@@ -15,8 +17,17 @@ import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
private static final String QUEUE = "HOST_INFO_QUEUE";
public static final String EXCHANGE_NAME = "user.headers.exchange";
public static final String AI_CHAT_EXCHANGE_NAME = "ai.chat.headers.exchange";
public static final String BIG_BROTHER_EXCHANGE_NAME = "big.brother.headers.exchange";
public static final String WEB_AI_EXCHANGE_NAME = "web.ai.headers.exchange";
//创建队列
//true:表示持久化
//队列在默认情况下放到内存rabbitmq重启后就丢失了如果希望重启后队列
@@ -32,6 +43,43 @@ public class RabbitMQConfig {
// return new Jackson2JsonMessageConverter();
// }
@Bean
public HeadersExchange userHeadersExchange() {
return ExchangeBuilder.headersExchange(EXCHANGE_NAME)
.durable(true)
.build();
}
@Bean
public HeadersExchange aiChatHeadersExchange() {
return ExchangeBuilder.headersExchange(AI_CHAT_EXCHANGE_NAME)
.durable(true)
.build();
}
@Bean
public HeadersExchange bigBrotherHeadersExchange() {
return ExchangeBuilder.headersExchange(BIG_BROTHER_EXCHANGE_NAME)
.durable(true)
.build();
}
@Bean
public HeadersExchange webAiHeadersExchange() {
return ExchangeBuilder.headersExchange(WEB_AI_EXCHANGE_NAME)
.durable(true)
.build();
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory cf) {
return new RabbitAdmin(cf);
}
@Bean
public MessageConverter messageConverter() {
ObjectMapper om = new ObjectMapper();

View File

@@ -6,6 +6,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@@ -13,25 +14,24 @@ import org.springframework.data.redis.serializer.StringRedisSerializer;
public class RedisConfig {
@Bean(name="redisTemplate")
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, String> template = new RedisTemplate<>();
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
//key序列化方式
template.setKeySerializer(redisSerializer);
//value序列化
template.setValueSerializer(redisSerializer);
//value hashmap序列化
template.setHashValueSerializer(redisSerializer);
//key haspmap序列化
template.setHashKeySerializer(redisSerializer);
//
// 使用 JSON 序列化器
GenericJackson2JsonRedisSerializer jsonSerializer = new GenericJackson2JsonRedisSerializer();
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(jsonSerializer);
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(jsonSerializer);
template.afterPropertiesSet();
return template;
}
}

View File

@@ -8,7 +8,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.List;
/*
* @author: ziin

View File

@@ -75,6 +75,9 @@ public class NewHosts {
@ApiModelProperty(value = "主播国家", example = "中国")
private String country;
@ApiModelProperty(value = "主播国家英文", example = "China")
private String countryEng;
/**
* 直播类型 娱乐,游戏
*/

View File

@@ -13,6 +13,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import io.swagger.models.auth.In;
import lombok.Data;
import org.springframework.format.annotation.DateTimeFormat;
@@ -109,7 +110,11 @@ public class ServerBigBrother {
private Long tenantId;
@TableField(value = "create_time")
private LocalDateTime createTime;
@TableField(value = "fans_level")
private Integer fansLevel;
@TableField(value = "secUid")
private String secUid;
}

View File

@@ -2,13 +2,19 @@ package com.yupi.springbootinit.rabbitMQ;
import cn.hutool.core.date.DateTime;
import com.yupi.springbootinit.common.ErrorCode;
import com.yupi.springbootinit.config.RabbitMQConfig;
import com.yupi.springbootinit.exception.BusinessException;
import com.yupi.springbootinit.model.entity.NewHosts;
import com.yupi.springbootinit.model.entity.ServerBigBrother;
import io.swagger.models.auth.In;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@@ -16,7 +22,8 @@ import java.time.LocalDateTime;
import java.util.List;
@Slf4j
@Service
@Component
@RequiredArgsConstructor
public class MQSender {
@Resource
@@ -33,6 +40,7 @@ public class MQSender {
throw new BusinessException(ErrorCode.QUEUE_ERROR);
}
}
//方法:发送消息
public void bigBrotherSend(ServerBigBrother bigBrothers){
try {
@@ -45,4 +53,31 @@ public class MQSender {
throw new BusinessException(ErrorCode.QUEUE_ERROR);
}
}
public void send(Long tenantId, Object payload) {
// 发送消息,把 userId 放进 header
rabbitTemplate.convertAndSend(RabbitMQConfig.AI_CHAT_EXCHANGE_NAME, "", payload, m -> {
m.getMessageProperties().getHeaders().put("tenantId", tenantId);
return m;
});
}
public void brotherSend(Long tenantId, Object payload) {
// 发送消息,把 userId 放进 header
rabbitTemplate.convertAndSend(RabbitMQConfig.BIG_BROTHER_EXCHANGE_NAME, "", payload, m -> {
m.getMessageProperties().getHeaders().put("tenantId", tenantId);
return m;
});
}
public void webAISend(Long tenantId, Object payload) {
// 发送消息,把 userId 放进 header
rabbitTemplate.convertAndSend(RabbitMQConfig.WEB_AI_EXCHANGE_NAME, "", payload, m -> {
m.getMessageProperties().getHeaders().put("tenantId", tenantId);
return m;
});
}
}

View File

@@ -1,28 +1,26 @@
package com.yupi.springbootinit.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Lists;
import com.yupi.springbootinit.mapper.CountryInfoMapper;
import com.yupi.springbootinit.mapper.NewHostsMapper;
import com.yupi.springbootinit.model.dto.host.QueryCountDTO;
import com.yupi.springbootinit.model.entity.NewHosts;
import com.yupi.springbootinit.model.vo.country.CountryInfoVO;
import com.yupi.springbootinit.rabbitMQ.MQSender;
import com.yupi.springbootinit.service.HostInfoService;
import com.yupi.springbootinit.utils.JsonUtils;
import com.yupi.springbootinit.utils.RedisUtils;
import com.yupi.springbootinit.utils.SseEmitterUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StopWatch;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -43,6 +41,13 @@ public class HostInfoServiceImpl extends ServiceImpl<NewHostsMapper, NewHosts> i
private RedisTemplate redisTemplate;
@Resource
private RedisUtils redisUtils;
@Resource
private MQSender mqSender;
//
// private final RedisTemplate<String,Object> redisTemplate;
//
@@ -68,12 +73,31 @@ public class HostInfoServiceImpl extends ServiceImpl<NewHostsMapper, NewHosts> i
JsonUtils.toJsonString(newHost));
});
}
// Boolean o = (Boolean) redisTemplate.opsForValue().get("ai_login:"+newHosts.get(0).getTenantId());
if (redisUtils.hasKeyByPrefix("ai_login:"+ newHosts.get(0).getTenantId())) {
newHosts.forEach(newHost -> {
mqSender.send(newHost.getTenantId(),newHost);
});
log.info("发送消息到队列{}, 消息数量: {}", newHosts.get(0).getTenantId(), newHosts.size());
}
if (redisUtils.hasKeyByPrefix("webAI_login:"+ newHosts.get(0).getTenantId())) {
newHosts.forEach(newHost -> {
mqSender.webAISend(newHost.getTenantId(),newHost);
});
log.info("发送消息到队列WebAI{}, 消息数量: {}", newHosts.get(0).getTenantId(), newHosts.size());
}
stopWatch.stop();
long totalTimeMillis = stopWatch.getTotalTimeMillis();
log.info("当前存储数据量大小 {}, 存储花费: {}ms",newHosts.size(),totalTimeMillis);
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
// 将异常包装到Future使调用方能处理
log.error(e.getMessage());
return CompletableFuture.failedFuture(e);
}
}
@@ -124,7 +148,6 @@ public class HostInfoServiceImpl extends ServiceImpl<NewHostsMapper, NewHosts> i
@Override
public void queryCount(QueryCountDTO queryCountDTO) {
redisTemplate.opsForValue().increment( "tkaccount:" + queryCountDTO.getTkAccount(),1);
}

View File

@@ -3,6 +3,8 @@ package com.yupi.springbootinit.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper;
import com.yupi.springbootinit.rabbitMQ.MQSender;
import com.yupi.springbootinit.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@@ -14,6 +16,8 @@ import com.yupi.springbootinit.model.entity.ServerBigBrother;
import com.yupi.springbootinit.service.ServerBigBrotherService;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StopWatch;
import javax.annotation.Resource;
/*
* @author: ziin
* @date: 2025/6/24 16:19
@@ -23,6 +27,12 @@ import org.springframework.util.StopWatch;
@Slf4j
public class ServerBigBrotherServiceImpl extends ServiceImpl<ServerBigBrotherMapper, ServerBigBrother> implements ServerBigBrotherService{
@Resource
private RedisUtils redisUtils;
@Resource
private MQSender mqSender;
@Override
@Transactional(rollbackFor = Exception.class)
public void saveData(ServerBigBrother bigBrother) {
@@ -34,6 +44,10 @@ public class ServerBigBrotherServiceImpl extends ServiceImpl<ServerBigBrotherMap
ServerBigBrother serverBigBrother = baseMapper.selectOne(queryWrapper);
if(serverBigBrother == null){
save(bigBrother);
if (redisUtils.hasKeyByPrefix("bigbrother_login:"+ bigBrother.getTenantId())) {
mqSender.brotherSend(bigBrother.getTenantId(),bigBrother);
log.info("发送消息到队列{}, 大哥 Id: {}", bigBrother.getTenantId(),bigBrother.getDisplayId());
}
stopWatch.stop();
long totalTimeMillis = stopWatch.getTotalTimeMillis();
log.info("当前存储花费: {}ms",totalTimeMillis);
@@ -44,6 +58,10 @@ public class ServerBigBrotherServiceImpl extends ServiceImpl<ServerBigBrotherMap
}
bigBrother.setTotalGiftCoins(bigBrother.getHistoricHighCoins()+serverBigBrother.getTotalGiftCoins());
updateById(bigBrother);
if (redisUtils.hasKeyByPrefix("bigbrother_login:"+ bigBrother.getTenantId())) {
mqSender.brotherSend(bigBrother.getTenantId(),bigBrother);
log.info("发送消息到队列{}, 大哥 Id: {}", bigBrother.getTenantId(),bigBrother.getDisplayId());
}
stopWatch.stop();
long totalTimeMillis = stopWatch.getTotalTimeMillis();
log.info("当前更新花费: {}ms",totalTimeMillis);

View File

@@ -0,0 +1,38 @@
package com.yupi.springbootinit.utils;
/*
* @author: ziin
* @date: 2025/8/27 20:35
*/
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Set;
@Component
public class RedisUtils {
@Resource
private RedisTemplate<String,Object> redisTemplate;
public boolean hasKeyByPrefix(String prefix) {
return Boolean.TRUE.equals(
redisTemplate.execute((RedisCallback<Boolean>) conn -> {
try (Cursor<byte[]> cursor = conn.scan(
ScanOptions.scanOptions()
.match(prefix + ":*")
.count(100) // 每次返回条数,可调整
.build())) {
return cursor.hasNext(); // 只要存在一条就返回 true
}
})
);
}
}

View File

@@ -4,9 +4,6 @@ import org.apache.commons.lang3.StringUtils;
/**
* SQL 工具
*
* @author <a href="https://github.com/liyupi">程序员鱼皮</a>
* @from <a href="https://yupi.icu">编程导航知识星球</a>
*/
public class SqlUtils {

View File

@@ -26,12 +26,13 @@
<result column="uid" jdbcType="VARCHAR" property="uid" />
<result column="ai_operation" jdbcType="TINYINT" property="aiOperation" />
<result column="operation_status" jdbcType="TINYINT" property="operationStatus" />
<result column="country_eng" jdbcType="VARCHAR" property="countryEng" />
</resultMap>
<sql id="Base_Column_List">
<!--@mbg.generated-->
id, hosts_id, hosts_level, hosts_coins, Invitation_type, online_fans, fans, fllowernum,
yesterday_coins, country, hosts_kind, is_assigned, tenant_id, creator, create_time,
updater, update_time, user_id, deleted, `uid`, ai_operation, operation_status
updater, update_time, user_id, deleted, `uid`, ai_operation, operation_status,country_eng
</sql>
<select id="selectByPrimaryKey" parameterType="java.lang.Long" resultMap="BaseResultMap">
<!--@mbg.generated-->
@@ -50,12 +51,12 @@
insert into server_new_hosts (hosts_id, hosts_level, hosts_coins,
Invitation_type, online_fans,fans, fllowernum,
yesterday_coins, country, hosts_kind,
tenant_id, creator,create_time,uid
tenant_id, creator,create_time,uid,country_eng
)
values (#{hostsId,jdbcType=VARCHAR}, #{hostsLevel,jdbcType=VARCHAR}, #{hostsCoins,jdbcType=INTEGER},
#{invitationType,jdbcType=INTEGER}, #{onlineFans,jdbcType=INTEGER},#{fans,jdbcType=INTEGER}, #{fllowernum,jdbcType=INTEGER},
#{yesterdayCoins,jdbcType=INTEGER}, #{country,jdbcType=VARCHAR}, #{hostsKind,jdbcType=VARCHAR},
#{tenantId,jdbcType=BIGINT}, #{creator,jdbcType=BIGINT},#{createTime,jdbcType=TIMESTAMP},#{uid,jdbcType=VARCHAR})
#{tenantId,jdbcType=BIGINT}, #{creator,jdbcType=BIGINT},#{createTime,jdbcType=TIMESTAMP},#{uid,jdbcType=VARCHAR},#{countryEng,jdbcType=VARCHAR})
</insert>
<insert id="insertSelective" keyColumn="id" keyProperty="id" parameterType="com.yupi.springbootinit.model.entity.NewHosts" useGeneratedKeys="true">
<!--@mbg.generated-->
@@ -201,14 +202,14 @@
<!--@mbg.generated-->
insert into server_new_hosts
(hosts_id, hosts_level, hosts_coins, Invitation_type, online_fans,fans, fllowernum, yesterday_coins,
country, hosts_kind, tenant_id, creator, user_id,create_time,uid)
country, hosts_kind, tenant_id, creator, user_id,create_time,uid,country_eng)
values
<foreach collection="list" item="item" separator=",">
(#{item.hostsId,jdbcType=VARCHAR}, #{item.hostsLevel,jdbcType=VARCHAR}, #{item.hostsCoins,jdbcType=INTEGER},
#{item.invitationType,jdbcType=INTEGER}, #{item.onlineFans,jdbcType=INTEGER},#{item.fans,jdbcType=INTEGER},
#{item.fllowernum,jdbcType=INTEGER}, #{item.yesterdayCoins,jdbcType=INTEGER}, #{item.country,jdbcType=VARCHAR},
#{item.hostsKind,jdbcType=VARCHAR}, #{item.tenantId,jdbcType=BIGINT}, #{item.creator,jdbcType=INTEGER},#{item.userId,jdbcType=BIGINT},
#{item.createTime,jdbcType=TIMESTAMP},#{item.uid,jdbcType=VARCHAR})
#{item.createTime,jdbcType=TIMESTAMP},#{item.uid,jdbcType=VARCHAR},#{item.countryEng,jdbcType=VARCHAR})
</foreach>
</insert>
</mapper>

View File

@@ -19,11 +19,13 @@
<result column="owner_id" jdbcType="VARCHAR" property="ownerId" />
<result column="create_time" jdbcType="TIMESTAMP" property="createTime" />
<result column="tenant_id" jdbcType="BIGINT" property="tenantId" />
<result column="fans_level" jdbcType="INTEGER" property="fansLevel" />
<result column="secUid" jdbcType="VARCHAR" property="secUid" />
</resultMap>
<sql id="Base_Column_List">
<!--@mbg.generated-->
id, display_id, user_id_str, nickname, `level`, hostcoins, follower_count, following_count,
region, historic_high_coins, total_gift_coins, host_display_id, owner_id, create_time,
update_time, creator, updater, deleted, tenant_id
update_time, creator, updater, deleted, tenant_id,fans_level,secUid
</sql>
</mapper>