From 704482ae0ecc69d0d7e4d1ef514bd2525f7c7e45 Mon Sep 17 00:00:00 2001 From: Ziin Date: Wed, 27 Aug 2025 16:42:13 +0800 Subject: [PATCH] =?UTF-8?q?1.=E9=80=9A=E8=BF=87=20rabbitmq=20=E5=8F=91?= =?UTF-8?q?=E9=80=81=20AI=20=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../springbootinit/aop/LogInterceptor.java | 8 +++--- .../springbootinit/config/RabbitMQConfig.java | 26 +++++++++++++++-- .../springbootinit/config/RedisConfig.java | 28 +++++++++---------- .../springbootinit/rabbitMQ/MQSender.java | 20 +++++++++++-- .../service/impl/HostInfoServiceImpl.java | 22 +++++++++++---- 5 files changed, 76 insertions(+), 28 deletions(-) diff --git a/src/main/java/com/yupi/springbootinit/aop/LogInterceptor.java b/src/main/java/com/yupi/springbootinit/aop/LogInterceptor.java index 149f0f3..84ae24d 100644 --- a/src/main/java/com/yupi/springbootinit/aop/LogInterceptor.java +++ b/src/main/java/com/yupi/springbootinit/aop/LogInterceptor.java @@ -42,10 +42,10 @@ public class LogInterceptor { Object[] args = point.getArgs(); String reqParam = "[" + StringUtils.join(args, ", ") + "]"; // 输出请求日志 -// log.info("request start,id: {}, path: {}, ip: {}, params: {}", requestId, url, -// httpServletRequest.getRemoteHost(), reqParam); - log.info("request start,id: {}, path: {}, ip: {}", requestId, url, - httpServletRequest.getRemoteHost()); + log.info("request start,id: {}, path: {}, ip: {}, params: {}", requestId, url, + httpServletRequest.getRemoteHost(), reqParam); +// log.info("request start,id: {}, path: {}, ip: {}", requestId, url, +// httpServletRequest.getRemoteHost()); // 执行原方法 Object result = point.proceed(); // 输出响应日志 diff --git a/src/main/java/com/yupi/springbootinit/config/RabbitMQConfig.java b/src/main/java/com/yupi/springbootinit/config/RabbitMQConfig.java index 23b149d..9840dee 100644 --- a/src/main/java/com/yupi/springbootinit/config/RabbitMQConfig.java +++ b/src/main/java/com/yupi/springbootinit/config/RabbitMQConfig.java @@ -5,9 +5,11 @@ import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import com.rabbitmq.client.ConnectionFactory; +import org.springframework.amqp.core.ExchangeBuilder; +import org.springframework.amqp.core.HeadersExchange; import org.springframework.amqp.core.Queue; -import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; @@ -15,8 +17,11 @@ import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { + private static final String QUEUE = "HOST_INFO_QUEUE"; - + + public static final String EXCHANGE_NAME = "user.headers.exchange"; + //创建队列 //true:表示持久化 //队列在默认情况下放到内存,rabbitmq重启后就丢失了,如果希望重启后,队列 @@ -32,6 +37,21 @@ public class RabbitMQConfig { // return new Jackson2JsonMessageConverter(); // } + + + @Bean + public HeadersExchange userHeadersExchange() { + return ExchangeBuilder.headersExchange(EXCHANGE_NAME) + .durable(true) + .build(); + } + + @Bean + public RabbitAdmin rabbitAdmin(ConnectionFactory cf) { + return new RabbitAdmin(cf); + } + + @Bean public MessageConverter messageConverter() { ObjectMapper om = new ObjectMapper(); diff --git a/src/main/java/com/yupi/springbootinit/config/RedisConfig.java b/src/main/java/com/yupi/springbootinit/config/RedisConfig.java index 287c1d5..7476850 100644 --- a/src/main/java/com/yupi/springbootinit/config/RedisConfig.java +++ b/src/main/java/com/yupi/springbootinit/config/RedisConfig.java @@ -6,6 +6,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; @@ -13,25 +14,24 @@ import org.springframework.data.redis.serializer.StringRedisSerializer; public class RedisConfig { - @Bean(name="redisTemplate") - public RedisTemplate redisTemplate(RedisConnectionFactory factory) { - RedisTemplate template = new RedisTemplate<>(); - RedisSerializer redisSerializer = new StringRedisSerializer(); + @Bean + public RedisTemplate redisTemplate(RedisConnectionFactory factory) { + RedisTemplate template = new RedisTemplate<>(); template.setConnectionFactory(factory); - //key序列化方式 - template.setKeySerializer(redisSerializer); - //value序列化 - template.setValueSerializer(redisSerializer); - //value hashmap序列化 - template.setHashValueSerializer(redisSerializer); - //key haspmap序列化 - template.setHashKeySerializer(redisSerializer); - // + + // 使用 JSON 序列化器 + GenericJackson2JsonRedisSerializer jsonSerializer = new GenericJackson2JsonRedisSerializer(); + + template.setKeySerializer(new StringRedisSerializer()); + template.setValueSerializer(jsonSerializer); + template.setHashKeySerializer(new StringRedisSerializer()); + template.setHashValueSerializer(jsonSerializer); + + template.afterPropertiesSet(); return template; } - } diff --git a/src/main/java/com/yupi/springbootinit/rabbitMQ/MQSender.java b/src/main/java/com/yupi/springbootinit/rabbitMQ/MQSender.java index 15840c0..106716f 100644 --- a/src/main/java/com/yupi/springbootinit/rabbitMQ/MQSender.java +++ b/src/main/java/com/yupi/springbootinit/rabbitMQ/MQSender.java @@ -2,13 +2,18 @@ package com.yupi.springbootinit.rabbitMQ; import cn.hutool.core.date.DateTime; import com.yupi.springbootinit.common.ErrorCode; +import com.yupi.springbootinit.config.RabbitMQConfig; import com.yupi.springbootinit.exception.BusinessException; import com.yupi.springbootinit.model.entity.NewHosts; import com.yupi.springbootinit.model.entity.ServerBigBrother; +import io.swagger.models.auth.In; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; import javax.annotation.Resource; @@ -16,13 +21,13 @@ import java.time.LocalDateTime; import java.util.List; @Slf4j -@Service +@Component +@RequiredArgsConstructor public class MQSender { @Resource private RabbitTemplate rabbitTemplate; - //方法:发送消息 public void hostsSend(List list){ try { @@ -33,6 +38,7 @@ public class MQSender { throw new BusinessException(ErrorCode.QUEUE_ERROR); } } + //方法:发送消息 public void bigBrotherSend(ServerBigBrother bigBrothers){ try { @@ -45,4 +51,14 @@ public class MQSender { throw new BusinessException(ErrorCode.QUEUE_ERROR); } } + + + public void send(Long tenantId, Object payload) { + // 发送消息,把 userId 放进 header + rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "", payload, m -> { + m.getMessageProperties().getHeaders().put("tenantId", tenantId); + return m; + }); + } + } \ No newline at end of file diff --git a/src/main/java/com/yupi/springbootinit/service/impl/HostInfoServiceImpl.java b/src/main/java/com/yupi/springbootinit/service/impl/HostInfoServiceImpl.java index fc9975a..8f02de6 100644 --- a/src/main/java/com/yupi/springbootinit/service/impl/HostInfoServiceImpl.java +++ b/src/main/java/com/yupi/springbootinit/service/impl/HostInfoServiceImpl.java @@ -1,27 +1,23 @@ package com.yupi.springbootinit.service.impl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.google.common.collect.Lists; import com.yupi.springbootinit.mapper.CountryInfoMapper; import com.yupi.springbootinit.mapper.NewHostsMapper; import com.yupi.springbootinit.model.dto.host.QueryCountDTO; import com.yupi.springbootinit.model.entity.NewHosts; import com.yupi.springbootinit.model.vo.country.CountryInfoVO; +import com.yupi.springbootinit.rabbitMQ.MQSender; import com.yupi.springbootinit.service.HostInfoService; import com.yupi.springbootinit.utils.JsonUtils; import com.yupi.springbootinit.utils.SseEmitterUtil; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.StopWatch; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import javax.annotation.Resource; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -43,6 +39,11 @@ public class HostInfoServiceImpl extends ServiceImpl i private RedisTemplate redisTemplate; + + @Resource + private MQSender mqSender; + + // // private final RedisTemplate redisTemplate; // @@ -68,12 +69,23 @@ public class HostInfoServiceImpl extends ServiceImpl i JsonUtils.toJsonString(newHost)); }); } + + Boolean o = (Boolean) redisTemplate.opsForValue().get("ai_login:"+newHosts.get(0).getTenantId()); + if (Boolean.TRUE.equals(o)) { + newHosts.forEach(newHost -> { + mqSender.send(newHost.getTenantId(),newHost); + }); + } + + + stopWatch.stop(); long totalTimeMillis = stopWatch.getTotalTimeMillis(); log.info("当前存储数据量大小 {}, 存储花费: {}ms",newHosts.size(),totalTimeMillis); return CompletableFuture.completedFuture(null); } catch (Exception e) { // 将异常包装到Future,使调用方能处理 + log.error(e.getMessage()); return CompletableFuture.failedFuture(e); } }