diff --git a/pom.xml b/pom.xml
index 6ec9144..72c4d15 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,11 +87,6 @@
hutool-all
5.8.8
-
-
- org.springframework.boot
- spring-boot-starter-amqp
-
org.springframework.boot
spring-boot-devtools
diff --git a/src/main/java/com/yupi/springbootinit/config/RabbitMQConfig.java b/src/main/java/com/yupi/springbootinit/config/RabbitMQConfig.java
deleted file mode 100644
index 4837a08..0000000
--- a/src/main/java/com/yupi/springbootinit/config/RabbitMQConfig.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package com.yupi.springbootinit.config;
-
-import org.springframework.amqp.core.Queue;
-import org.springframework.amqp.rabbit.core.RabbitTemplate;
-import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
-import org.springframework.amqp.support.converter.MessageConverter;
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-import javax.annotation.Resource;
-
-@Configuration
-public class RabbitMQConfig {
- private static final String QUEUE = "HOST_INFO_QUEUE";
-
- //创建队列
- //true:表示持久化
- //队列在默认情况下放到内存,rabbitmq重启后就丢失了,如果希望重启后,队列
- //数据还能使用,就需要持久化
- @Bean
- public Queue hostInfoQueue(){
- return new Queue(QUEUE,true);
- }
- @Bean
- public MessageConverter messageConverter(){
- return new Jackson2JsonMessageConverter();
- }
-
-}
\ No newline at end of file
diff --git a/src/main/java/com/yupi/springbootinit/controller/HostInfoController.java b/src/main/java/com/yupi/springbootinit/controller/HostInfoController.java
index 22fce97..d041fab 100644
--- a/src/main/java/com/yupi/springbootinit/controller/HostInfoController.java
+++ b/src/main/java/com/yupi/springbootinit/controller/HostInfoController.java
@@ -4,9 +4,7 @@ import cn.dev33.satoken.stp.StpUtil;
import com.yupi.springbootinit.common.BaseResponse;
import com.yupi.springbootinit.common.ResultUtils;
import com.yupi.springbootinit.model.dto.host.HostInfoDTO;
-import com.yupi.springbootinit.model.entity.NewHosts;
import com.yupi.springbootinit.model.vo.hosts.NewHostsVO;
-import com.yupi.springbootinit.rabbitMQ.MQSender;
import com.yupi.springbootinit.service.HostInfoService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
@@ -24,17 +22,10 @@ import java.util.List;
@CrossOrigin
public class HostInfoController {
- @Resource
- private MQSender mqSender;
@Resource
private HostInfoService hostInfoService;
- @PostMapping("add_host")
- public BaseResponse addHost(@RequestBody List newHosts){
- mqSender.send(newHosts);
- return ResultUtils.success(true);
- }
@PostMapping("hosts_info")
public BaseResponse> hostsInfo(@RequestBody HostInfoDTO hostInfoDTO){
diff --git a/src/main/java/com/yupi/springbootinit/rabbitMQ/MQReceiver.java b/src/main/java/com/yupi/springbootinit/rabbitMQ/MQReceiver.java
deleted file mode 100644
index 3973e4a..0000000
--- a/src/main/java/com/yupi/springbootinit/rabbitMQ/MQReceiver.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package com.yupi.springbootinit.rabbitMQ;
-
-
-import cn.hutool.core.date.DateTime;
-import com.rabbitmq.client.Channel;
-import com.yupi.springbootinit.common.ErrorCode;
-import com.yupi.springbootinit.exception.BusinessException;
-import com.yupi.springbootinit.model.entity.NewHosts;
-import com.yupi.springbootinit.service.HostInfoService;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.amqp.core.Message;
-import org.springframework.amqp.rabbit.annotation.RabbitListener;
-import org.springframework.stereotype.Service;
-import javax.annotation.Resource;
-import java.io.IOException;
-import java.util.List;
-
-@Service
-@Slf4j
-public class MQReceiver {
- @Resource
- private HostInfoService hostInfoService;
-
-
-// //方法:接收消息
-// @RabbitListener(queues = "HOST_INFO_QUEUE")
-// public void receive(List hosts, Channel channel, Message message) throws IOException {
-// long deliveryTag = message.getMessageProperties().getDeliveryTag();
-// try {
-// hostInfoService.processHosts(hosts);
-// channel.basicAck(deliveryTag,false);
-// log.info("deliveryTag:{}",deliveryTag);
-// log.info("接收到的消息------->" + hosts.size());
-// }catch (Exception e){
-// channel.basicNack(deliveryTag,false,true);
-// log.error("消息消费失败------->消息内容大小{}",hosts.size() );
-// }
-// }
-
- @RabbitListener(queues = "HOST_INFO_QUEUE")
- public void receive(List hosts, Channel channel, Message message) throws IOException {
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- try {
- // 等待所有异步任务完成
- hostInfoService.processHosts(hosts).join(); // 这里会抛出异常
- channel.basicAck(deliveryTag, false);
- log.info("deliveryTag:{}", deliveryTag);
- log.info("{} 消息消费内容大小-------> {}",DateTime.now(),hosts.size());
- } catch (Exception e) {
- channel.basicNack(deliveryTag, false, false);
- log.error("消息消费失败------->消息内容大小{}", hosts.size(), e);
- throw new BusinessException(ErrorCode.QUEUE_CONSUMPTION_FAILURE);
- }
- }
- }
diff --git a/src/main/java/com/yupi/springbootinit/rabbitMQ/MQSender.java b/src/main/java/com/yupi/springbootinit/rabbitMQ/MQSender.java
deleted file mode 100644
index 24547ea..0000000
--- a/src/main/java/com/yupi/springbootinit/rabbitMQ/MQSender.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package com.yupi.springbootinit.rabbitMQ;
-
-import cn.hutool.core.date.DateTime;
-import com.yupi.springbootinit.common.ErrorCode;
-import com.yupi.springbootinit.exception.BusinessException;
-import com.yupi.springbootinit.model.entity.NewHosts;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.amqp.rabbit.core.RabbitTemplate;
-import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import javax.annotation.Resource;
-import java.util.Date;
-import java.util.List;
-
-@Slf4j
-@Service
-public class MQSender {
-
- @Resource
- private RabbitTemplate rabbitTemplate;
-
-
- //方法:发送消息
- public void send(List list){
- try {
- log.info("{} 接收到的消息数量----------->{}", DateTime.now(),list.size());
- this.rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
- //指定你队列的名字
- rabbitTemplate.convertAndSend("HOST_INFO_QUEUE",list);
- }catch (Exception e){
- throw new BusinessException(ErrorCode.QUEUE_ERROR);
- }
-
- }
-}
\ No newline at end of file
diff --git a/src/main/java/com/yupi/springbootinit/service/HostInfoService.java b/src/main/java/com/yupi/springbootinit/service/HostInfoService.java
index 70a82b8..402cccb 100644
--- a/src/main/java/com/yupi/springbootinit/service/HostInfoService.java
+++ b/src/main/java/com/yupi/springbootinit/service/HostInfoService.java
@@ -15,9 +15,6 @@ import java.util.concurrent.CompletableFuture;
*/
public interface HostInfoService extends IService {
- public CompletableFuture saveHostInfo(List newHosts);
-
- public CompletableFuture processHosts(List hosts);
List getConditionHosts(HostInfoDTO hostInfoDTO);
}
diff --git a/src/main/java/com/yupi/springbootinit/service/impl/HostInfoServiceImpl.java b/src/main/java/com/yupi/springbootinit/service/impl/HostInfoServiceImpl.java
index c625f6a..69a1a7b 100644
--- a/src/main/java/com/yupi/springbootinit/service/impl/HostInfoServiceImpl.java
+++ b/src/main/java/com/yupi/springbootinit/service/impl/HostInfoServiceImpl.java
@@ -28,58 +28,6 @@ import java.util.concurrent.CompletableFuture;
public class HostInfoServiceImpl extends ServiceImpl implements HostInfoService {
- @Override
- @Async("taskExecutor")
- public CompletableFuture saveHostInfo(List newHosts) {
- try {
- StopWatch stopWatch = new StopWatch();
- stopWatch.start();
- saveBatch(newHosts);
- stopWatch.stop();
- long totalTimeMillis = stopWatch.getTotalTimeMillis();
- log.info("存储花费: {}ms", totalTimeMillis);
- return CompletableFuture.completedFuture(null);
- } catch (Exception e) {
- // 将异常包装到Future,使调用方能处理
- return CompletableFuture.failedFuture(e);
- }
- }
-
- // public void processHosts(List hosts) {
-// List> futures = new ArrayList<>();
-// // 分片提交(避免单批次过大)
-// Lists.partition(hosts, 1500).forEach(batch -> {
-// CompletableFuture future = this.saveHostInfo(batch);
-// futures.add(future);
-// });
-// CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
-// .whenComplete((result, ex) -> {
-// if (ex != null) {
-// log.error("部分批次处理失败", ex);
-// } else {
-// log.info("所有批次处理完成");
-// }
-// // 这里可以触发其他业务逻辑(如发送通知)
-// });
-// }
- public CompletableFuture processHosts(List hosts) {
- List> futures = new ArrayList<>();
- // 分片提交(避免单批次过大)
- Lists.partition(hosts, 1500).forEach(batch -> {
- log.info("当前存储数据量大小 {}", batch.size());
- CompletableFuture future = this.saveHostInfo(batch);
- futures.add(future);
- });
- return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
- .whenComplete((result, ex) -> {
- if (ex != null) {
- log.error("部分批次处理失败", ex);
- } else {
- log.info("所有批次处理完成");
- }
- });
- }
-
@Override
public List getConditionHosts(HostInfoDTO hostInfoDTO) {