From 167c1ec29e9f239be217fae9e86b5c9fcdd36b58 Mon Sep 17 00:00:00 2001 From: Ziin Date: Thu, 19 Jun 2025 21:51:25 +0800 Subject: [PATCH] =?UTF-8?q?=E5=85=A5=E5=BA=93=E6=96=B9=E6=B3=95=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E4=B8=BA=E5=A4=9A=E7=BA=BF=E7=A8=8B=EF=BC=8C=E5=A4=A7?= =?UTF-8?q?=E5=B9=85=E6=8F=90=E9=AB=98=E5=85=A5=E5=BA=93=E6=95=88=E7=8E=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../yupi/springbootinit/MainApplication.java | 2 ++ .../springbootinit/rabbitMQ/MQReceiver.java | 8 +++-- .../springbootinit/rabbitMQ/MQSender.java | 2 +- .../service/impl/HostInfoServiceImpl.java | 34 +++++++++++-------- src/main/resources/application.yml | 27 +++++++++++++-- 5 files changed, 52 insertions(+), 21 deletions(-) diff --git a/src/main/java/com/yupi/springbootinit/MainApplication.java b/src/main/java/com/yupi/springbootinit/MainApplication.java index 0438b56..653a9c2 100644 --- a/src/main/java/com/yupi/springbootinit/MainApplication.java +++ b/src/main/java/com/yupi/springbootinit/MainApplication.java @@ -5,6 +5,7 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration; import org.springframework.context.annotation.EnableAspectJAutoProxy; +import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; /** @@ -17,6 +18,7 @@ import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication() @MapperScan("com.yupi.springbootinit.mapper") @EnableScheduling +@EnableAsync @EnableAspectJAutoProxy(proxyTargetClass = true, exposeProxy = true) public class MainApplication { diff --git a/src/main/java/com/yupi/springbootinit/rabbitMQ/MQReceiver.java b/src/main/java/com/yupi/springbootinit/rabbitMQ/MQReceiver.java index 0a34676..2f26184 100644 --- a/src/main/java/com/yupi/springbootinit/rabbitMQ/MQReceiver.java +++ b/src/main/java/com/yupi/springbootinit/rabbitMQ/MQReceiver.java @@ -10,6 +10,7 @@ import com.yupi.springbootinit.service.HostInfoService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import javax.annotation.Resource; @@ -39,14 +40,15 @@ public class MQReceiver { // } @RabbitListener(queues = "HOST_INFO_QUEUE") + @Async public void receive(List hosts, Channel channel, Message message) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { // 等待所有异步任务完成 - hostInfoService.processHosts(hosts).join(); // 这里会抛出异常 + hostInfoService.processHosts(hosts); // 这里会抛出异常 channel.basicAck(deliveryTag, false); - log.info("deliveryTag:{}", deliveryTag); - log.info("{} 消息消费内容大小-------> {}",DateTime.now(),hosts.size()); +// log.info("deliveryTag:{}", deliveryTag); +// log.info("{} 消息消费内容大小-------> {}",DateTime.now(),hosts.size()); } catch (Exception e) { channel.basicNack(deliveryTag, false, false); log.error("消息消费失败------->消息内容大小{}", hosts.size(), e); diff --git a/src/main/java/com/yupi/springbootinit/rabbitMQ/MQSender.java b/src/main/java/com/yupi/springbootinit/rabbitMQ/MQSender.java index 7947532..13e0565 100644 --- a/src/main/java/com/yupi/springbootinit/rabbitMQ/MQSender.java +++ b/src/main/java/com/yupi/springbootinit/rabbitMQ/MQSender.java @@ -23,7 +23,7 @@ public class MQSender { //方法:发送消息 public void send(List list){ try { - log.info("{} 接收到的消息数量----------->{}", DateTime.now(),list.size()); +// log.info("{} 接收到的消息数量----------->{}", DateTime.now(),list.size()); this.rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); //指定你队列的名字 rabbitTemplate.convertAndSend("HOST_INFO_QUEUE",list); diff --git a/src/main/java/com/yupi/springbootinit/service/impl/HostInfoServiceImpl.java b/src/main/java/com/yupi/springbootinit/service/impl/HostInfoServiceImpl.java index 18fb957..028d121 100644 --- a/src/main/java/com/yupi/springbootinit/service/impl/HostInfoServiceImpl.java +++ b/src/main/java/com/yupi/springbootinit/service/impl/HostInfoServiceImpl.java @@ -56,7 +56,7 @@ public class HostInfoServiceImpl extends ServiceImpl i saveBatch(newHosts); stopWatch.stop(); long totalTimeMillis = stopWatch.getTotalTimeMillis(); - log.info("存储花费: {}ms", totalTimeMillis); + log.info("当前存储数据量大小 {}, 存储花费: {}ms",newHosts.size(),totalTimeMillis); return CompletableFuture.completedFuture(null); } catch (Exception e) { // 将异常包装到Future,使调用方能处理 @@ -82,21 +82,25 @@ public class HostInfoServiceImpl extends ServiceImpl i // }); // } public CompletableFuture processHosts(List hosts) { - List> futures = new ArrayList<>(); + this.saveHostInfo(hosts); + +// log.info("当前存储数据量大小 {}",hosts.size()); +// List> futures = new ArrayList<>(); // 分片提交(避免单批次过大) - Lists.partition(hosts, 1500).forEach(batch -> { - log.info("当前存储数据量大小 {}", batch.size()); - CompletableFuture future = this.saveHostInfo(batch); - futures.add(future); - }); - return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) - .whenComplete((result, ex) -> { - if (ex != null) { - log.error("部分批次处理失败", ex); - } else { - log.info("所有批次处理完成"); - } - }); +// Lists.partition(hosts, 1500).forEach(batch -> { +// log.info("当前存储数据量大小 {}", batch.size()); +// CompletableFuture future = this.saveHostInfo(batch); +// futures.add(future); +// }); +// return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) +// .whenComplete((result, ex) -> { +// if (ex != null) { +// log.error("部分批次处理失败", ex); +// } else { +// log.info("所有批次处理完成"); +// } +// }); + return CompletableFuture.completedFuture(null); } @Override diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 4bea75a..9e5d91c 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -2,11 +2,24 @@ # @author 程序员鱼皮 # @from 编程导航知识星球 spring: + task: + # Spring 执行器配置,对应 TaskExecutionProperties 配置类。对于 Spring 异步任务,会使用该执行器。 + execution: + thread-name-prefix: save-task # 线程池的线程名的前缀。默认为 task- ,建议根据自己应用来设置 + pool: # 线程池相关 + core-size: 10 # 核心线程数,线程池创建时候初始化的线程数。默认为 8 。 + max-size: 20 # 最大线程数,线程池最大的线程数,只有在缓冲队列满了之后,才会申请超过核心线程数的线程。默认为 Integer.MAX_VALUE + keep-alive: 60s # 允许线程的空闲时间,当超过了核心线程之外的线程,在空闲时间到达之后会被销毁。默认为 60 秒 + queue-capacity: 200 # 缓冲队列大小,用来缓冲执行任务的队列的大小。默认为 Integer.MAX_VALUE 。 + allow-core-thread-timeout: true # 是否允许核心线程超时,即开启线程池的动态增长和缩小。默认为 true 。 + shutdown: + await-termination: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true + await-termination-period: 60 # 等待任务完成的最大时长,单位为秒。默认为 0 ,根据自己应用来设置 application: name: springboot-init # 默认 dev 环境 profiles: - active: prod + active: dev # 支持 swagger3 mvc: pathmatch: @@ -43,15 +56,16 @@ mybatis-plus: configuration: map-underscore-to-camel-case: false log-impl: org.apache.ibatis.logging.nologging.NoLoggingImpl - log-sql: default-executor-type: batch global-config: + banner: false db-config: logic-delete-field: isDelete # 全局逻辑删除的实体字段名 logic-delete-value: 1 # 逻辑已删除值(默认为 1) logic-not-delete-value: 0 # 逻辑未删除值(默认为 0) + # 接口文档配置 knife4j: enable: true @@ -80,3 +94,12 @@ sa-token: token-style: random-128 # 是否输出操作日志 is-log: true + + +logging: + level: + org.mybatis: off + com.baomidou.mybatisplus: off + java.sql: off + org.apache.ibatis: off + com.yupi.springbootinit.mapper.NewHostsMapper: off # 替换成你的 Mapper 包名