From 97e14273037fa2547c809ad619ba7c8ad2e7a3ca Mon Sep 17 00:00:00 2001 From: Ziin Date: Thu, 12 Jun 2025 15:15:29 +0800 Subject: [PATCH] =?UTF-8?q?=E5=89=A5=E7=A6=BB=E4=B8=BB=E6=92=AD=E5=85=A5?= =?UTF-8?q?=E5=BA=93=E6=95=B0=E6=8D=AE=E6=8E=A5=E5=8F=A3=EF=BC=8C=E5=88=A0?= =?UTF-8?q?=E9=99=A4=20rabbitMQ=20=E4=BE=9D=E8=B5=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 5 -- .../springbootinit/config/RabbitMQConfig.java | 30 ---------- .../controller/HostInfoController.java | 9 --- .../springbootinit/rabbitMQ/MQReceiver.java | 55 ------------------- .../springbootinit/rabbitMQ/MQSender.java | 37 ------------- .../service/HostInfoService.java | 3 - .../service/impl/HostInfoServiceImpl.java | 52 ------------------ 7 files changed, 191 deletions(-) delete mode 100644 src/main/java/com/yupi/springbootinit/config/RabbitMQConfig.java delete mode 100644 src/main/java/com/yupi/springbootinit/rabbitMQ/MQReceiver.java delete mode 100644 src/main/java/com/yupi/springbootinit/rabbitMQ/MQSender.java diff --git a/pom.xml b/pom.xml index 6ec9144..72c4d15 100644 --- a/pom.xml +++ b/pom.xml @@ -87,11 +87,6 @@ hutool-all 5.8.8 - - - org.springframework.boot - spring-boot-starter-amqp - org.springframework.boot spring-boot-devtools diff --git a/src/main/java/com/yupi/springbootinit/config/RabbitMQConfig.java b/src/main/java/com/yupi/springbootinit/config/RabbitMQConfig.java deleted file mode 100644 index 4837a08..0000000 --- a/src/main/java/com/yupi/springbootinit/config/RabbitMQConfig.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.yupi.springbootinit.config; - -import org.springframework.amqp.core.Queue; -import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; -import org.springframework.amqp.support.converter.MessageConverter; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import javax.annotation.Resource; - -@Configuration -public class RabbitMQConfig { - private static final String QUEUE = "HOST_INFO_QUEUE"; - - //创建队列 - //true:表示持久化 - //队列在默认情况下放到内存,rabbitmq重启后就丢失了,如果希望重启后,队列 - //数据还能使用,就需要持久化 - @Bean - public Queue hostInfoQueue(){ - return new Queue(QUEUE,true); - } - @Bean - public MessageConverter messageConverter(){ - return new Jackson2JsonMessageConverter(); - } - -} \ No newline at end of file diff --git a/src/main/java/com/yupi/springbootinit/controller/HostInfoController.java b/src/main/java/com/yupi/springbootinit/controller/HostInfoController.java index 22fce97..d041fab 100644 --- a/src/main/java/com/yupi/springbootinit/controller/HostInfoController.java +++ b/src/main/java/com/yupi/springbootinit/controller/HostInfoController.java @@ -4,9 +4,7 @@ import cn.dev33.satoken.stp.StpUtil; import com.yupi.springbootinit.common.BaseResponse; import com.yupi.springbootinit.common.ResultUtils; import com.yupi.springbootinit.model.dto.host.HostInfoDTO; -import com.yupi.springbootinit.model.entity.NewHosts; import com.yupi.springbootinit.model.vo.hosts.NewHostsVO; -import com.yupi.springbootinit.rabbitMQ.MQSender; import com.yupi.springbootinit.service.HostInfoService; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.*; @@ -24,17 +22,10 @@ import java.util.List; @CrossOrigin public class HostInfoController { - @Resource - private MQSender mqSender; @Resource private HostInfoService hostInfoService; - @PostMapping("add_host") - public BaseResponse addHost(@RequestBody List newHosts){ - mqSender.send(newHosts); - return ResultUtils.success(true); - } @PostMapping("hosts_info") public BaseResponse> hostsInfo(@RequestBody HostInfoDTO hostInfoDTO){ diff --git a/src/main/java/com/yupi/springbootinit/rabbitMQ/MQReceiver.java b/src/main/java/com/yupi/springbootinit/rabbitMQ/MQReceiver.java deleted file mode 100644 index 3973e4a..0000000 --- a/src/main/java/com/yupi/springbootinit/rabbitMQ/MQReceiver.java +++ /dev/null @@ -1,55 +0,0 @@ -package com.yupi.springbootinit.rabbitMQ; - - -import cn.hutool.core.date.DateTime; -import com.rabbitmq.client.Channel; -import com.yupi.springbootinit.common.ErrorCode; -import com.yupi.springbootinit.exception.BusinessException; -import com.yupi.springbootinit.model.entity.NewHosts; -import com.yupi.springbootinit.service.HostInfoService; -import lombok.extern.slf4j.Slf4j; -import org.springframework.amqp.core.Message; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.stereotype.Service; -import javax.annotation.Resource; -import java.io.IOException; -import java.util.List; - -@Service -@Slf4j -public class MQReceiver { - @Resource - private HostInfoService hostInfoService; - - -// //方法:接收消息 -// @RabbitListener(queues = "HOST_INFO_QUEUE") -// public void receive(List hosts, Channel channel, Message message) throws IOException { -// long deliveryTag = message.getMessageProperties().getDeliveryTag(); -// try { -// hostInfoService.processHosts(hosts); -// channel.basicAck(deliveryTag,false); -// log.info("deliveryTag:{}",deliveryTag); -// log.info("接收到的消息------->" + hosts.size()); -// }catch (Exception e){ -// channel.basicNack(deliveryTag,false,true); -// log.error("消息消费失败------->消息内容大小{}",hosts.size() ); -// } -// } - - @RabbitListener(queues = "HOST_INFO_QUEUE") - public void receive(List hosts, Channel channel, Message message) throws IOException { - long deliveryTag = message.getMessageProperties().getDeliveryTag(); - try { - // 等待所有异步任务完成 - hostInfoService.processHosts(hosts).join(); // 这里会抛出异常 - channel.basicAck(deliveryTag, false); - log.info("deliveryTag:{}", deliveryTag); - log.info("{} 消息消费内容大小-------> {}",DateTime.now(),hosts.size()); - } catch (Exception e) { - channel.basicNack(deliveryTag, false, false); - log.error("消息消费失败------->消息内容大小{}", hosts.size(), e); - throw new BusinessException(ErrorCode.QUEUE_CONSUMPTION_FAILURE); - } - } - } diff --git a/src/main/java/com/yupi/springbootinit/rabbitMQ/MQSender.java b/src/main/java/com/yupi/springbootinit/rabbitMQ/MQSender.java deleted file mode 100644 index 24547ea..0000000 --- a/src/main/java/com/yupi/springbootinit/rabbitMQ/MQSender.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.yupi.springbootinit.rabbitMQ; - -import cn.hutool.core.date.DateTime; -import com.yupi.springbootinit.common.ErrorCode; -import com.yupi.springbootinit.exception.BusinessException; -import com.yupi.springbootinit.model.entity.NewHosts; -import lombok.extern.slf4j.Slf4j; -import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import javax.annotation.Resource; -import java.util.Date; -import java.util.List; - -@Slf4j -@Service -public class MQSender { - - @Resource - private RabbitTemplate rabbitTemplate; - - - //方法:发送消息 - public void send(List list){ - try { - log.info("{} 接收到的消息数量----------->{}", DateTime.now(),list.size()); - this.rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); - //指定你队列的名字 - rabbitTemplate.convertAndSend("HOST_INFO_QUEUE",list); - }catch (Exception e){ - throw new BusinessException(ErrorCode.QUEUE_ERROR); - } - - } -} \ No newline at end of file diff --git a/src/main/java/com/yupi/springbootinit/service/HostInfoService.java b/src/main/java/com/yupi/springbootinit/service/HostInfoService.java index 70a82b8..402cccb 100644 --- a/src/main/java/com/yupi/springbootinit/service/HostInfoService.java +++ b/src/main/java/com/yupi/springbootinit/service/HostInfoService.java @@ -15,9 +15,6 @@ import java.util.concurrent.CompletableFuture; */ public interface HostInfoService extends IService { - public CompletableFuture saveHostInfo(List newHosts); - - public CompletableFuture processHosts(List hosts); List getConditionHosts(HostInfoDTO hostInfoDTO); } diff --git a/src/main/java/com/yupi/springbootinit/service/impl/HostInfoServiceImpl.java b/src/main/java/com/yupi/springbootinit/service/impl/HostInfoServiceImpl.java index c625f6a..69a1a7b 100644 --- a/src/main/java/com/yupi/springbootinit/service/impl/HostInfoServiceImpl.java +++ b/src/main/java/com/yupi/springbootinit/service/impl/HostInfoServiceImpl.java @@ -28,58 +28,6 @@ import java.util.concurrent.CompletableFuture; public class HostInfoServiceImpl extends ServiceImpl implements HostInfoService { - @Override - @Async("taskExecutor") - public CompletableFuture saveHostInfo(List newHosts) { - try { - StopWatch stopWatch = new StopWatch(); - stopWatch.start(); - saveBatch(newHosts); - stopWatch.stop(); - long totalTimeMillis = stopWatch.getTotalTimeMillis(); - log.info("存储花费: {}ms", totalTimeMillis); - return CompletableFuture.completedFuture(null); - } catch (Exception e) { - // 将异常包装到Future,使调用方能处理 - return CompletableFuture.failedFuture(e); - } - } - - // public void processHosts(List hosts) { -// List> futures = new ArrayList<>(); -// // 分片提交(避免单批次过大) -// Lists.partition(hosts, 1500).forEach(batch -> { -// CompletableFuture future = this.saveHostInfo(batch); -// futures.add(future); -// }); -// CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) -// .whenComplete((result, ex) -> { -// if (ex != null) { -// log.error("部分批次处理失败", ex); -// } else { -// log.info("所有批次处理完成"); -// } -// // 这里可以触发其他业务逻辑(如发送通知) -// }); -// } - public CompletableFuture processHosts(List hosts) { - List> futures = new ArrayList<>(); - // 分片提交(避免单批次过大) - Lists.partition(hosts, 1500).forEach(batch -> { - log.info("当前存储数据量大小 {}", batch.size()); - CompletableFuture future = this.saveHostInfo(batch); - futures.add(future); - }); - return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) - .whenComplete((result, ex) -> { - if (ex != null) { - log.error("部分批次处理失败", ex); - } else { - log.info("所有批次处理完成"); - } - }); - } - @Override public List getConditionHosts(HostInfoDTO hostInfoDTO) {