1.通过 rabbitmq 发送 AI 消息

This commit is contained in:
2025-08-27 16:42:13 +08:00
parent a7a17667e4
commit 704482ae0e
5 changed files with 76 additions and 28 deletions

View File

@@ -42,10 +42,10 @@ public class LogInterceptor {
Object[] args = point.getArgs();
String reqParam = "[" + StringUtils.join(args, ", ") + "]";
// 输出请求日志
// log.info("request startid: {}, path: {}, ip: {}, params: {}", requestId, url,
// httpServletRequest.getRemoteHost(), reqParam);
log.info("request startid: {}, path: {}, ip: {}", requestId, url,
httpServletRequest.getRemoteHost());
log.info("request startid: {}, path: {}, ip: {}, params: {}", requestId, url,
httpServletRequest.getRemoteHost(), reqParam);
// log.info("request startid: {}, path: {}, ip: {}", requestId, url,
// httpServletRequest.getRemoteHost());
// 执行原方法
Object result = point.proceed();
// 输出响应日志

View File

@@ -5,9 +5,11 @@ import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.rabbitmq.client.ConnectionFactory;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
@@ -15,8 +17,11 @@ import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
private static final String QUEUE = "HOST_INFO_QUEUE";
public static final String EXCHANGE_NAME = "user.headers.exchange";
//创建队列
//true:表示持久化
//队列在默认情况下放到内存rabbitmq重启后就丢失了如果希望重启后队列
@@ -32,6 +37,21 @@ public class RabbitMQConfig {
// return new Jackson2JsonMessageConverter();
// }
@Bean
public HeadersExchange userHeadersExchange() {
return ExchangeBuilder.headersExchange(EXCHANGE_NAME)
.durable(true)
.build();
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory cf) {
return new RabbitAdmin(cf);
}
@Bean
public MessageConverter messageConverter() {
ObjectMapper om = new ObjectMapper();

View File

@@ -6,6 +6,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@@ -13,25 +14,24 @@ import org.springframework.data.redis.serializer.StringRedisSerializer;
public class RedisConfig {
@Bean(name="redisTemplate")
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, String> template = new RedisTemplate<>();
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
//key序列化方式
template.setKeySerializer(redisSerializer);
//value序列化
template.setValueSerializer(redisSerializer);
//value hashmap序列化
template.setHashValueSerializer(redisSerializer);
//key haspmap序列化
template.setHashKeySerializer(redisSerializer);
//
// 使用 JSON 序列化器
GenericJackson2JsonRedisSerializer jsonSerializer = new GenericJackson2JsonRedisSerializer();
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(jsonSerializer);
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(jsonSerializer);
template.afterPropertiesSet();
return template;
}
}

View File

@@ -2,13 +2,18 @@ package com.yupi.springbootinit.rabbitMQ;
import cn.hutool.core.date.DateTime;
import com.yupi.springbootinit.common.ErrorCode;
import com.yupi.springbootinit.config.RabbitMQConfig;
import com.yupi.springbootinit.exception.BusinessException;
import com.yupi.springbootinit.model.entity.NewHosts;
import com.yupi.springbootinit.model.entity.ServerBigBrother;
import io.swagger.models.auth.In;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@@ -16,13 +21,13 @@ import java.time.LocalDateTime;
import java.util.List;
@Slf4j
@Service
@Component
@RequiredArgsConstructor
public class MQSender {
@Resource
private RabbitTemplate rabbitTemplate;
//方法:发送消息
public void hostsSend(List<NewHosts> list){
try {
@@ -33,6 +38,7 @@ public class MQSender {
throw new BusinessException(ErrorCode.QUEUE_ERROR);
}
}
//方法:发送消息
public void bigBrotherSend(ServerBigBrother bigBrothers){
try {
@@ -45,4 +51,14 @@ public class MQSender {
throw new BusinessException(ErrorCode.QUEUE_ERROR);
}
}
public void send(Long tenantId, Object payload) {
// 发送消息,把 userId 放进 header
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "", payload, m -> {
m.getMessageProperties().getHeaders().put("tenantId", tenantId);
return m;
});
}
}

View File

@@ -1,27 +1,23 @@
package com.yupi.springbootinit.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Lists;
import com.yupi.springbootinit.mapper.CountryInfoMapper;
import com.yupi.springbootinit.mapper.NewHostsMapper;
import com.yupi.springbootinit.model.dto.host.QueryCountDTO;
import com.yupi.springbootinit.model.entity.NewHosts;
import com.yupi.springbootinit.model.vo.country.CountryInfoVO;
import com.yupi.springbootinit.rabbitMQ.MQSender;
import com.yupi.springbootinit.service.HostInfoService;
import com.yupi.springbootinit.utils.JsonUtils;
import com.yupi.springbootinit.utils.SseEmitterUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StopWatch;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -43,6 +39,11 @@ public class HostInfoServiceImpl extends ServiceImpl<NewHostsMapper, NewHosts> i
private RedisTemplate redisTemplate;
@Resource
private MQSender mqSender;
//
// private final RedisTemplate<String,Object> redisTemplate;
//
@@ -68,12 +69,23 @@ public class HostInfoServiceImpl extends ServiceImpl<NewHostsMapper, NewHosts> i
JsonUtils.toJsonString(newHost));
});
}
Boolean o = (Boolean) redisTemplate.opsForValue().get("ai_login:"+newHosts.get(0).getTenantId());
if (Boolean.TRUE.equals(o)) {
newHosts.forEach(newHost -> {
mqSender.send(newHost.getTenantId(),newHost);
});
}
stopWatch.stop();
long totalTimeMillis = stopWatch.getTotalTimeMillis();
log.info("当前存储数据量大小 {}, 存储花费: {}ms",newHosts.size(),totalTimeMillis);
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
// 将异常包装到Future使调用方能处理
log.error(e.getMessage());
return CompletableFuture.failedFuture(e);
}
}