剥离主播入库数据接口,删除 rabbitMQ 依赖
This commit is contained in:
5
pom.xml
5
pom.xml
@@ -87,11 +87,6 @@
|
|||||||
<artifactId>hutool-all</artifactId>
|
<artifactId>hutool-all</artifactId>
|
||||||
<version>5.8.8</version>
|
<version>5.8.8</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<!-- rabbit-mq -->
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.springframework.boot</groupId>
|
|
||||||
<artifactId>spring-boot-starter-amqp</artifactId>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.springframework.boot</groupId>
|
<groupId>org.springframework.boot</groupId>
|
||||||
<artifactId>spring-boot-devtools</artifactId>
|
<artifactId>spring-boot-devtools</artifactId>
|
||||||
|
|||||||
@@ -1,30 +0,0 @@
|
|||||||
package com.yupi.springbootinit.config;
|
|
||||||
|
|
||||||
import org.springframework.amqp.core.Queue;
|
|
||||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
|
||||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
|
||||||
import org.springframework.amqp.support.converter.MessageConverter;
|
|
||||||
import org.springframework.beans.factory.InitializingBean;
|
|
||||||
import org.springframework.context.annotation.Bean;
|
|
||||||
import org.springframework.context.annotation.Configuration;
|
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
|
||||||
|
|
||||||
@Configuration
|
|
||||||
public class RabbitMQConfig {
|
|
||||||
private static final String QUEUE = "HOST_INFO_QUEUE";
|
|
||||||
|
|
||||||
//创建队列
|
|
||||||
//true:表示持久化
|
|
||||||
//队列在默认情况下放到内存,rabbitmq重启后就丢失了,如果希望重启后,队列
|
|
||||||
//数据还能使用,就需要持久化
|
|
||||||
@Bean
|
|
||||||
public Queue hostInfoQueue(){
|
|
||||||
return new Queue(QUEUE,true);
|
|
||||||
}
|
|
||||||
@Bean
|
|
||||||
public MessageConverter messageConverter(){
|
|
||||||
return new Jackson2JsonMessageConverter();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -4,9 +4,7 @@ import cn.dev33.satoken.stp.StpUtil;
|
|||||||
import com.yupi.springbootinit.common.BaseResponse;
|
import com.yupi.springbootinit.common.BaseResponse;
|
||||||
import com.yupi.springbootinit.common.ResultUtils;
|
import com.yupi.springbootinit.common.ResultUtils;
|
||||||
import com.yupi.springbootinit.model.dto.host.HostInfoDTO;
|
import com.yupi.springbootinit.model.dto.host.HostInfoDTO;
|
||||||
import com.yupi.springbootinit.model.entity.NewHosts;
|
|
||||||
import com.yupi.springbootinit.model.vo.hosts.NewHostsVO;
|
import com.yupi.springbootinit.model.vo.hosts.NewHostsVO;
|
||||||
import com.yupi.springbootinit.rabbitMQ.MQSender;
|
|
||||||
import com.yupi.springbootinit.service.HostInfoService;
|
import com.yupi.springbootinit.service.HostInfoService;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.web.bind.annotation.*;
|
import org.springframework.web.bind.annotation.*;
|
||||||
@@ -24,17 +22,10 @@ import java.util.List;
|
|||||||
@CrossOrigin
|
@CrossOrigin
|
||||||
public class HostInfoController {
|
public class HostInfoController {
|
||||||
|
|
||||||
@Resource
|
|
||||||
private MQSender mqSender;
|
|
||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private HostInfoService hostInfoService;
|
private HostInfoService hostInfoService;
|
||||||
|
|
||||||
@PostMapping("add_host")
|
|
||||||
public BaseResponse<Boolean> addHost(@RequestBody List<NewHosts> newHosts){
|
|
||||||
mqSender.send(newHosts);
|
|
||||||
return ResultUtils.success(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@PostMapping("hosts_info")
|
@PostMapping("hosts_info")
|
||||||
public BaseResponse<List<NewHostsVO>> hostsInfo(@RequestBody HostInfoDTO hostInfoDTO){
|
public BaseResponse<List<NewHostsVO>> hostsInfo(@RequestBody HostInfoDTO hostInfoDTO){
|
||||||
|
|||||||
@@ -1,55 +0,0 @@
|
|||||||
package com.yupi.springbootinit.rabbitMQ;
|
|
||||||
|
|
||||||
|
|
||||||
import cn.hutool.core.date.DateTime;
|
|
||||||
import com.rabbitmq.client.Channel;
|
|
||||||
import com.yupi.springbootinit.common.ErrorCode;
|
|
||||||
import com.yupi.springbootinit.exception.BusinessException;
|
|
||||||
import com.yupi.springbootinit.model.entity.NewHosts;
|
|
||||||
import com.yupi.springbootinit.service.HostInfoService;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.amqp.core.Message;
|
|
||||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
import javax.annotation.Resource;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
@Service
|
|
||||||
@Slf4j
|
|
||||||
public class MQReceiver {
|
|
||||||
@Resource
|
|
||||||
private HostInfoService hostInfoService;
|
|
||||||
|
|
||||||
|
|
||||||
// //方法:接收消息
|
|
||||||
// @RabbitListener(queues = "HOST_INFO_QUEUE")
|
|
||||||
// public void receive(List<NewHosts> hosts, Channel channel, Message message) throws IOException {
|
|
||||||
// long deliveryTag = message.getMessageProperties().getDeliveryTag();
|
|
||||||
// try {
|
|
||||||
// hostInfoService.processHosts(hosts);
|
|
||||||
// channel.basicAck(deliveryTag,false);
|
|
||||||
// log.info("deliveryTag:{}",deliveryTag);
|
|
||||||
// log.info("接收到的消息------->" + hosts.size());
|
|
||||||
// }catch (Exception e){
|
|
||||||
// channel.basicNack(deliveryTag,false,true);
|
|
||||||
// log.error("消息消费失败------->消息内容大小{}",hosts.size() );
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
@RabbitListener(queues = "HOST_INFO_QUEUE")
|
|
||||||
public void receive(List<NewHosts> hosts, Channel channel, Message message) throws IOException {
|
|
||||||
long deliveryTag = message.getMessageProperties().getDeliveryTag();
|
|
||||||
try {
|
|
||||||
// 等待所有异步任务完成
|
|
||||||
hostInfoService.processHosts(hosts).join(); // 这里会抛出异常
|
|
||||||
channel.basicAck(deliveryTag, false);
|
|
||||||
log.info("deliveryTag:{}", deliveryTag);
|
|
||||||
log.info("{} 消息消费内容大小-------> {}",DateTime.now(),hosts.size());
|
|
||||||
} catch (Exception e) {
|
|
||||||
channel.basicNack(deliveryTag, false, false);
|
|
||||||
log.error("消息消费失败------->消息内容大小{}", hosts.size(), e);
|
|
||||||
throw new BusinessException(ErrorCode.QUEUE_CONSUMPTION_FAILURE);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,37 +0,0 @@
|
|||||||
package com.yupi.springbootinit.rabbitMQ;
|
|
||||||
|
|
||||||
import cn.hutool.core.date.DateTime;
|
|
||||||
import com.yupi.springbootinit.common.ErrorCode;
|
|
||||||
import com.yupi.springbootinit.exception.BusinessException;
|
|
||||||
import com.yupi.springbootinit.model.entity.NewHosts;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
|
||||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
|
||||||
import java.util.Date;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
@Slf4j
|
|
||||||
@Service
|
|
||||||
public class MQSender {
|
|
||||||
|
|
||||||
@Resource
|
|
||||||
private RabbitTemplate rabbitTemplate;
|
|
||||||
|
|
||||||
|
|
||||||
//方法:发送消息
|
|
||||||
public void send(List<NewHosts> list){
|
|
||||||
try {
|
|
||||||
log.info("{} 接收到的消息数量----------->{}", DateTime.now(),list.size());
|
|
||||||
this.rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
|
|
||||||
//指定你队列的名字
|
|
||||||
rabbitTemplate.convertAndSend("HOST_INFO_QUEUE",list);
|
|
||||||
}catch (Exception e){
|
|
||||||
throw new BusinessException(ErrorCode.QUEUE_ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -15,9 +15,6 @@ import java.util.concurrent.CompletableFuture;
|
|||||||
*/
|
*/
|
||||||
public interface HostInfoService extends IService<NewHosts> {
|
public interface HostInfoService extends IService<NewHosts> {
|
||||||
|
|
||||||
public CompletableFuture<Void> saveHostInfo(List<NewHosts> newHosts);
|
|
||||||
|
|
||||||
public CompletableFuture<Void> processHosts(List<NewHosts> hosts);
|
|
||||||
|
|
||||||
List<NewHostsVO> getConditionHosts(HostInfoDTO hostInfoDTO);
|
List<NewHostsVO> getConditionHosts(HostInfoDTO hostInfoDTO);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -28,58 +28,6 @@ import java.util.concurrent.CompletableFuture;
|
|||||||
public class HostInfoServiceImpl extends ServiceImpl<NewHostsMapper, NewHosts> implements HostInfoService {
|
public class HostInfoServiceImpl extends ServiceImpl<NewHostsMapper, NewHosts> implements HostInfoService {
|
||||||
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@Async("taskExecutor")
|
|
||||||
public CompletableFuture<Void> saveHostInfo(List<NewHosts> newHosts) {
|
|
||||||
try {
|
|
||||||
StopWatch stopWatch = new StopWatch();
|
|
||||||
stopWatch.start();
|
|
||||||
saveBatch(newHosts);
|
|
||||||
stopWatch.stop();
|
|
||||||
long totalTimeMillis = stopWatch.getTotalTimeMillis();
|
|
||||||
log.info("存储花费: {}ms", totalTimeMillis);
|
|
||||||
return CompletableFuture.completedFuture(null);
|
|
||||||
} catch (Exception e) {
|
|
||||||
// 将异常包装到Future,使调用方能处理
|
|
||||||
return CompletableFuture.failedFuture(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// public void processHosts(List<NewHosts> hosts) {
|
|
||||||
// List<CompletableFuture<Void>> futures = new ArrayList<>();
|
|
||||||
// // 分片提交(避免单批次过大)
|
|
||||||
// Lists.partition(hosts, 1500).forEach(batch -> {
|
|
||||||
// CompletableFuture<Void> future = this.saveHostInfo(batch);
|
|
||||||
// futures.add(future);
|
|
||||||
// });
|
|
||||||
// CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
|
|
||||||
// .whenComplete((result, ex) -> {
|
|
||||||
// if (ex != null) {
|
|
||||||
// log.error("部分批次处理失败", ex);
|
|
||||||
// } else {
|
|
||||||
// log.info("所有批次处理完成");
|
|
||||||
// }
|
|
||||||
// // 这里可以触发其他业务逻辑(如发送通知)
|
|
||||||
// });
|
|
||||||
// }
|
|
||||||
public CompletableFuture<Void> processHosts(List<NewHosts> hosts) {
|
|
||||||
List<CompletableFuture<Void>> futures = new ArrayList<>();
|
|
||||||
// 分片提交(避免单批次过大)
|
|
||||||
Lists.partition(hosts, 1500).forEach(batch -> {
|
|
||||||
log.info("当前存储数据量大小 {}", batch.size());
|
|
||||||
CompletableFuture<Void> future = this.saveHostInfo(batch);
|
|
||||||
futures.add(future);
|
|
||||||
});
|
|
||||||
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
|
|
||||||
.whenComplete((result, ex) -> {
|
|
||||||
if (ex != null) {
|
|
||||||
log.error("部分批次处理失败", ex);
|
|
||||||
} else {
|
|
||||||
log.info("所有批次处理完成");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<NewHostsVO> getConditionHosts(HostInfoDTO hostInfoDTO) {
|
public List<NewHostsVO> getConditionHosts(HostInfoDTO hostInfoDTO) {
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user