diff --git a/src/main/java/com/yupi/springbootinit/config/RabbitMQConfig.java b/src/main/java/com/yupi/springbootinit/config/RabbitMQConfig.java index da620a1..489a4d9 100644 --- a/src/main/java/com/yupi/springbootinit/config/RabbitMQConfig.java +++ b/src/main/java/com/yupi/springbootinit/config/RabbitMQConfig.java @@ -20,6 +20,7 @@ public class RabbitMQConfig { private static final String QUEUE = "HOST_INFO_QUEUE"; private static final String LIVE_HOST_DETAIL_QUEUE = "LIVE_HOST_DETAIL"; + private static final String HOSTS_REVENUE_STATS_QUEUE = "HOSTS_REVENUE_STATS"; public static final String EXCHANGE_NAME = "user.headers.exchange"; @@ -43,6 +44,11 @@ public class RabbitMQConfig { return new Queue(LIVE_HOST_DETAIL_QUEUE,true); } + @Bean + public Queue hostsRevenueStatsQueue(){ + return new Queue(HOSTS_REVENUE_STATS_QUEUE,true); + } + // // @Bean // public MessageConverter messageConverter(){ diff --git a/src/main/java/com/yupi/springbootinit/controller/HostInfoController.java b/src/main/java/com/yupi/springbootinit/controller/HostInfoController.java index 6cc4c66..d0bf744 100644 --- a/src/main/java/com/yupi/springbootinit/controller/HostInfoController.java +++ b/src/main/java/com/yupi/springbootinit/controller/HostInfoController.java @@ -4,6 +4,7 @@ import com.yupi.springbootinit.common.BaseResponse; import com.yupi.springbootinit.common.ResultUtils; import com.yupi.springbootinit.model.dto.host.QueryCountDTO; import com.yupi.springbootinit.model.entity.NewHosts; +import com.yupi.springbootinit.model.entity.ServerHostsRevenueStats; import com.yupi.springbootinit.model.entity.ServerLiveHostDetail; import com.yupi.springbootinit.model.vo.country.CountryInfoVO; import com.yupi.springbootinit.rabbitMQ.MQSender; @@ -43,6 +44,12 @@ public class HostInfoController { return ResultUtils.success(true); } + @PostMapping("add_hosts_revenue_stats") + public BaseResponse addHostsRevenueStats(@RequestBody List hostsRevenueStats){ + mqSender.hostsRevenueStatsSend(hostsRevenueStats); + return ResultUtils.success(true); + } + @GetMapping("country_info") public BaseResponse> getCountryInfo(@RequestParam(name = "countryName") String countryName){ List countryInfo = hostInfoService.getCountryInfo(countryName); diff --git a/src/main/java/com/yupi/springbootinit/rabbitMQ/MQReceiver.java b/src/main/java/com/yupi/springbootinit/rabbitMQ/MQReceiver.java index 022a654..b633e35 100644 --- a/src/main/java/com/yupi/springbootinit/rabbitMQ/MQReceiver.java +++ b/src/main/java/com/yupi/springbootinit/rabbitMQ/MQReceiver.java @@ -7,9 +7,11 @@ import com.yupi.springbootinit.common.ErrorCode; import com.yupi.springbootinit.exception.BusinessException; import com.yupi.springbootinit.model.entity.NewHosts; import com.yupi.springbootinit.model.entity.ServerBigBrother; +import com.yupi.springbootinit.model.entity.ServerHostsRevenueStats; import com.yupi.springbootinit.model.entity.ServerLiveHostDetail; import com.yupi.springbootinit.service.HostInfoService; import com.yupi.springbootinit.service.ServerBigBrotherService; +import com.yupi.springbootinit.service.ServerHostsRevenueStatsService; import com.yupi.springbootinit.service.ServerLiveHostDetailService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; @@ -34,6 +36,9 @@ public class MQReceiver { @Resource private ServerLiveHostDetailService serverLiveHostDetailService; + @Resource + private ServerHostsRevenueStatsService serverHostsRevenueStatsService; + // //方法:接收消息 // @RabbitListener(queues = "HOST_INFO_QUEUE") @@ -98,4 +103,18 @@ public class MQReceiver { throw new BusinessException(ErrorCode.QUEUE_CONSUMPTION_FAILURE); } } + + @RabbitListener(queues = "HOSTS_REVENUE_STATS",id = "hostsRevenueStats", autoStartup = "true") + @Async("taskExecutor") + public void hostsRevenueStatsReceive(List statsList, Channel channel, Message message) throws IOException { + long deliveryTag = message.getMessageProperties().getDeliveryTag(); + try { + serverHostsRevenueStatsService.processHostsRevenueStats(statsList); + channel.basicAck(deliveryTag, false); + } catch (Exception e) { + channel.basicNack(deliveryTag, false, false); + log.error("主播收益统计消息消费失败,数量: {}", statsList.size(), e); + throw new BusinessException(ErrorCode.QUEUE_CONSUMPTION_FAILURE); + } + } } diff --git a/src/main/java/com/yupi/springbootinit/rabbitMQ/MQSender.java b/src/main/java/com/yupi/springbootinit/rabbitMQ/MQSender.java index 044dbd9..12d1d60 100644 --- a/src/main/java/com/yupi/springbootinit/rabbitMQ/MQSender.java +++ b/src/main/java/com/yupi/springbootinit/rabbitMQ/MQSender.java @@ -6,6 +6,7 @@ import com.yupi.springbootinit.config.RabbitMQConfig; import com.yupi.springbootinit.exception.BusinessException; import com.yupi.springbootinit.model.entity.NewHosts; import com.yupi.springbootinit.model.entity.ServerBigBrother; +import com.yupi.springbootinit.model.entity.ServerHostsRevenueStats; import com.yupi.springbootinit.model.entity.ServerLiveHostDetail; import io.swagger.models.auth.In; import lombok.RequiredArgsConstructor; @@ -51,6 +52,15 @@ public class MQSender { } } + //方法:发送主播收益统计消息 + public void hostsRevenueStatsSend(List list){ + try { + rabbitTemplate.convertAndSend("HOSTS_REVENUE_STATS",list); + }catch (Exception e){ + throw new BusinessException(ErrorCode.QUEUE_ERROR); + } + } + //方法:发送消息 public void bigBrotherSend(ServerBigBrother bigBrothers){ try { diff --git a/src/main/java/com/yupi/springbootinit/service/ServerHostsRevenueStatsService.java b/src/main/java/com/yupi/springbootinit/service/ServerHostsRevenueStatsService.java index 9703436..3cc1fb9 100644 --- a/src/main/java/com/yupi/springbootinit/service/ServerHostsRevenueStatsService.java +++ b/src/main/java/com/yupi/springbootinit/service/ServerHostsRevenueStatsService.java @@ -2,12 +2,17 @@ package com.yupi.springbootinit.service; import com.yupi.springbootinit.model.entity.ServerHostsRevenueStats; import com.baomidou.mybatisplus.extension.service.IService; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + /* * @author: ziin * @date: 2026/1/9 19:06 */ - + public interface ServerHostsRevenueStatsService extends IService{ + CompletableFuture processHostsRevenueStats(List statsList); } diff --git a/src/main/java/com/yupi/springbootinit/service/impl/ServerHostsRevenueStatsServiceImpl.java b/src/main/java/com/yupi/springbootinit/service/impl/ServerHostsRevenueStatsServiceImpl.java index fed70ba..545468f 100644 --- a/src/main/java/com/yupi/springbootinit/service/impl/ServerHostsRevenueStatsServiceImpl.java +++ b/src/main/java/com/yupi/springbootinit/service/impl/ServerHostsRevenueStatsServiceImpl.java @@ -1,8 +1,11 @@ package com.yupi.springbootinit.service.impl; +import cn.hutool.core.util.StrUtil; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; -import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.transaction.annotation.Transactional; import java.util.List; +import java.util.concurrent.CompletableFuture; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.yupi.springbootinit.model.entity.ServerHostsRevenueStats; import com.yupi.springbootinit.mapper.ServerHostsRevenueStatsMapper; @@ -11,8 +14,28 @@ import com.yupi.springbootinit.service.ServerHostsRevenueStatsService; * @author: ziin * @date: 2026/1/9 19:06 */ - + @Service +@Slf4j +@Transactional(rollbackFor = Exception.class) public class ServerHostsRevenueStatsServiceImpl extends ServiceImpl implements ServerHostsRevenueStatsService{ + @Override + public CompletableFuture processHostsRevenueStats(List statsList) { + try { + // 处理 history 字段,确保为有效的 JSON + for (ServerHostsRevenueStats stats : statsList) { + if (StrUtil.isBlank(stats.getHistory())) { + stats.setHistory("[]"); + } + } + saveBatch(statsList); + log.info("主播收益统计数据存储成功,数量: {}", statsList.size()); + return CompletableFuture.completedFuture(null); + } catch (Exception e) { + log.error("主播收益统计数据存储失败", e); + return CompletableFuture.failedFuture(e); + } + } + }