refactor(mq): 拆分交换器并新增brotherSend方法
- 新增 AI_CHAT_EXCHANGE_NAME、BIG_BROTHER_EXCHANGE_NAME 两个HeadersExchange - MQSender 新增 brotherSend 方法,支持按 tenantId 路由到大佬队列 - ServerBigBrotherServiceImpl 在保存/更新后,若 Redis 标记存在则异步发送消息 - 移除 HostInfoServiceImpl 多余空行,保持代码整洁
This commit is contained in:
@@ -22,6 +22,9 @@ public class RabbitMQConfig {
|
||||
|
||||
public static final String EXCHANGE_NAME = "user.headers.exchange";
|
||||
|
||||
public static final String AI_CHAT_EXCHANGE_NAME = "ai.chat.headers.exchange";
|
||||
public static final String BIG_BROTHER_EXCHANGE_NAME = "big.brother.headers.exchange";
|
||||
|
||||
//创建队列
|
||||
//true:表示持久化
|
||||
//队列在默认情况下放到内存,rabbitmq重启后就丢失了,如果希望重启后,队列
|
||||
@@ -46,6 +49,21 @@ public class RabbitMQConfig {
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public HeadersExchange aiChatHeadersExchange() {
|
||||
return ExchangeBuilder.headersExchange(AI_CHAT_EXCHANGE_NAME)
|
||||
.durable(true)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public HeadersExchange bigBrotherHeadersExchange() {
|
||||
return ExchangeBuilder.headersExchange(BIG_BROTHER_EXCHANGE_NAME)
|
||||
.durable(true)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public RabbitAdmin rabbitAdmin(ConnectionFactory cf) {
|
||||
return new RabbitAdmin(cf);
|
||||
|
||||
@@ -9,6 +9,7 @@ import com.yupi.springbootinit.model.entity.ServerBigBrother;
|
||||
import io.swagger.models.auth.In;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.HeadersExchange;
|
||||
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
||||
@@ -28,6 +29,7 @@ public class MQSender {
|
||||
@Resource
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
|
||||
//方法:发送消息
|
||||
public void hostsSend(List<NewHosts> list){
|
||||
try {
|
||||
@@ -55,10 +57,18 @@ public class MQSender {
|
||||
|
||||
public void send(Long tenantId, Object payload) {
|
||||
// 发送消息,把 userId 放进 header
|
||||
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "", payload, m -> {
|
||||
rabbitTemplate.convertAndSend(RabbitMQConfig.AI_CHAT_EXCHANGE_NAME, "", payload, m -> {
|
||||
m.getMessageProperties().getHeaders().put("tenantId", tenantId);
|
||||
return m;
|
||||
});
|
||||
}
|
||||
public void brotherSend(Long tenantId, Object payload) {
|
||||
// 发送消息,把 userId 放进 header
|
||||
rabbitTemplate.convertAndSend(RabbitMQConfig.BIG_BROTHER_EXCHANGE_NAME, "", payload, m -> {
|
||||
m.getMessageProperties().getHeaders().put("tenantId", tenantId);
|
||||
return m;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -141,7 +141,6 @@ public class HostInfoServiceImpl extends ServiceImpl<NewHostsMapper, NewHosts> i
|
||||
|
||||
@Override
|
||||
public void queryCount(QueryCountDTO queryCountDTO) {
|
||||
|
||||
redisTemplate.opsForValue().increment( "tkaccount:" + queryCountDTO.getTkAccount(),1);
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,8 @@ package com.yupi.springbootinit.service.impl;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
||||
import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper;
|
||||
import com.yupi.springbootinit.rabbitMQ.MQSender;
|
||||
import com.yupi.springbootinit.utils.RedisUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Service;
|
||||
@@ -14,6 +16,8 @@ import com.yupi.springbootinit.model.entity.ServerBigBrother;
|
||||
import com.yupi.springbootinit.service.ServerBigBrotherService;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.util.StopWatch;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
/*
|
||||
* @author: ziin
|
||||
* @date: 2025/6/24 16:19
|
||||
@@ -23,6 +27,12 @@ import org.springframework.util.StopWatch;
|
||||
@Slf4j
|
||||
public class ServerBigBrotherServiceImpl extends ServiceImpl<ServerBigBrotherMapper, ServerBigBrother> implements ServerBigBrotherService{
|
||||
|
||||
@Resource
|
||||
private RedisUtils redisUtils;
|
||||
|
||||
@Resource
|
||||
private MQSender mqSender;
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void saveData(ServerBigBrother bigBrother) {
|
||||
@@ -34,6 +44,10 @@ public class ServerBigBrotherServiceImpl extends ServiceImpl<ServerBigBrotherMap
|
||||
ServerBigBrother serverBigBrother = baseMapper.selectOne(queryWrapper);
|
||||
if(serverBigBrother == null){
|
||||
save(bigBrother);
|
||||
if (redisUtils.hasKeyByPrefix("bigbrother_login:"+ bigBrother.getTenantId())) {
|
||||
mqSender.brotherSend(bigBrother.getTenantId(),bigBrother);
|
||||
log.info("发送消息到队列{}, 大哥 Id: {}", bigBrother.getTenantId(),bigBrother.getDisplayId());
|
||||
}
|
||||
stopWatch.stop();
|
||||
long totalTimeMillis = stopWatch.getTotalTimeMillis();
|
||||
log.info("当前存储花费: {}ms",totalTimeMillis);
|
||||
@@ -44,6 +58,10 @@ public class ServerBigBrotherServiceImpl extends ServiceImpl<ServerBigBrotherMap
|
||||
}
|
||||
bigBrother.setTotalGiftCoins(bigBrother.getHistoricHighCoins()+serverBigBrother.getTotalGiftCoins());
|
||||
updateById(bigBrother);
|
||||
if (redisUtils.hasKeyByPrefix("bigbrother_login:"+ bigBrother.getTenantId())) {
|
||||
mqSender.brotherSend(bigBrother.getTenantId(),bigBrother);
|
||||
log.info("发送消息到队列{}, 大哥 Id: {}", bigBrother.getTenantId(),bigBrother.getDisplayId());
|
||||
}
|
||||
stopWatch.stop();
|
||||
long totalTimeMillis = stopWatch.getTotalTimeMillis();
|
||||
log.info("当前更新花费: {}ms",totalTimeMillis);
|
||||
|
||||
Reference in New Issue
Block a user