feat(rabbitmq): 新增主播收益统计队列及处理逻辑

新增 HOSTS_REVENUE_STATS 队列、发送/接收方法与业务实现,统一处理 ServerHostsRevenueStats 数据流
This commit is contained in:
2026-01-13 18:13:40 +08:00
parent f4ceb56fd4
commit 7e2fc54030
6 changed files with 73 additions and 3 deletions

View File

@@ -20,6 +20,7 @@ public class RabbitMQConfig {
private static final String QUEUE = "HOST_INFO_QUEUE";
private static final String LIVE_HOST_DETAIL_QUEUE = "LIVE_HOST_DETAIL";
private static final String HOSTS_REVENUE_STATS_QUEUE = "HOSTS_REVENUE_STATS";
public static final String EXCHANGE_NAME = "user.headers.exchange";
@@ -43,6 +44,11 @@ public class RabbitMQConfig {
return new Queue(LIVE_HOST_DETAIL_QUEUE,true);
}
@Bean
public Queue hostsRevenueStatsQueue(){
return new Queue(HOSTS_REVENUE_STATS_QUEUE,true);
}
//
// @Bean
// public MessageConverter messageConverter(){

View File

@@ -4,6 +4,7 @@ import com.yupi.springbootinit.common.BaseResponse;
import com.yupi.springbootinit.common.ResultUtils;
import com.yupi.springbootinit.model.dto.host.QueryCountDTO;
import com.yupi.springbootinit.model.entity.NewHosts;
import com.yupi.springbootinit.model.entity.ServerHostsRevenueStats;
import com.yupi.springbootinit.model.entity.ServerLiveHostDetail;
import com.yupi.springbootinit.model.vo.country.CountryInfoVO;
import com.yupi.springbootinit.rabbitMQ.MQSender;
@@ -43,6 +44,12 @@ public class HostInfoController {
return ResultUtils.success(true);
}
@PostMapping("add_hosts_revenue_stats")
public BaseResponse<Boolean> addHostsRevenueStats(@RequestBody List<ServerHostsRevenueStats> hostsRevenueStats){
mqSender.hostsRevenueStatsSend(hostsRevenueStats);
return ResultUtils.success(true);
}
@GetMapping("country_info")
public BaseResponse<List<String>> getCountryInfo(@RequestParam(name = "countryName") String countryName){
List<CountryInfoVO> countryInfo = hostInfoService.getCountryInfo(countryName);

View File

@@ -7,9 +7,11 @@ import com.yupi.springbootinit.common.ErrorCode;
import com.yupi.springbootinit.exception.BusinessException;
import com.yupi.springbootinit.model.entity.NewHosts;
import com.yupi.springbootinit.model.entity.ServerBigBrother;
import com.yupi.springbootinit.model.entity.ServerHostsRevenueStats;
import com.yupi.springbootinit.model.entity.ServerLiveHostDetail;
import com.yupi.springbootinit.service.HostInfoService;
import com.yupi.springbootinit.service.ServerBigBrotherService;
import com.yupi.springbootinit.service.ServerHostsRevenueStatsService;
import com.yupi.springbootinit.service.ServerLiveHostDetailService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
@@ -34,6 +36,9 @@ public class MQReceiver {
@Resource
private ServerLiveHostDetailService serverLiveHostDetailService;
@Resource
private ServerHostsRevenueStatsService serverHostsRevenueStatsService;
// //方法:接收消息
// @RabbitListener(queues = "HOST_INFO_QUEUE")
@@ -98,4 +103,18 @@ public class MQReceiver {
throw new BusinessException(ErrorCode.QUEUE_CONSUMPTION_FAILURE);
}
}
@RabbitListener(queues = "HOSTS_REVENUE_STATS",id = "hostsRevenueStats", autoStartup = "true")
@Async("taskExecutor")
public void hostsRevenueStatsReceive(List<ServerHostsRevenueStats> statsList, Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
serverHostsRevenueStatsService.processHostsRevenueStats(statsList);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, false);
log.error("主播收益统计消息消费失败,数量: {}", statsList.size(), e);
throw new BusinessException(ErrorCode.QUEUE_CONSUMPTION_FAILURE);
}
}
}

View File

@@ -6,6 +6,7 @@ import com.yupi.springbootinit.config.RabbitMQConfig;
import com.yupi.springbootinit.exception.BusinessException;
import com.yupi.springbootinit.model.entity.NewHosts;
import com.yupi.springbootinit.model.entity.ServerBigBrother;
import com.yupi.springbootinit.model.entity.ServerHostsRevenueStats;
import com.yupi.springbootinit.model.entity.ServerLiveHostDetail;
import io.swagger.models.auth.In;
import lombok.RequiredArgsConstructor;
@@ -51,6 +52,15 @@ public class MQSender {
}
}
//方法:发送主播收益统计消息
public void hostsRevenueStatsSend(List<ServerHostsRevenueStats> list){
try {
rabbitTemplate.convertAndSend("HOSTS_REVENUE_STATS",list);
}catch (Exception e){
throw new BusinessException(ErrorCode.QUEUE_ERROR);
}
}
//方法:发送消息
public void bigBrotherSend(ServerBigBrother bigBrothers){
try {

View File

@@ -2,12 +2,17 @@ package com.yupi.springbootinit.service;
import com.yupi.springbootinit.model.entity.ServerHostsRevenueStats;
import com.baomidou.mybatisplus.extension.service.IService;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/*
* @author: ziin
* @date: 2026/1/9 19:06
*/
public interface ServerHostsRevenueStatsService extends IService<ServerHostsRevenueStats>{
CompletableFuture<Void> processHostsRevenueStats(List<ServerHostsRevenueStats> statsList);
}

View File

@@ -1,8 +1,11 @@
package com.yupi.springbootinit.service.impl;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.yupi.springbootinit.model.entity.ServerHostsRevenueStats;
import com.yupi.springbootinit.mapper.ServerHostsRevenueStatsMapper;
@@ -11,8 +14,28 @@ import com.yupi.springbootinit.service.ServerHostsRevenueStatsService;
* @author: ziin
* @date: 2026/1/9 19:06
*/
@Service
@Slf4j
@Transactional(rollbackFor = Exception.class)
public class ServerHostsRevenueStatsServiceImpl extends ServiceImpl<ServerHostsRevenueStatsMapper, ServerHostsRevenueStats> implements ServerHostsRevenueStatsService{
@Override
public CompletableFuture<Void> processHostsRevenueStats(List<ServerHostsRevenueStats> statsList) {
try {
// 处理 history 字段,确保为有效的 JSON
for (ServerHostsRevenueStats stats : statsList) {
if (StrUtil.isBlank(stats.getHistory())) {
stats.setHistory("[]");
}
}
saveBatch(statsList);
log.info("主播收益统计数据存储成功,数量: {}", statsList.size());
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
log.error("主播收益统计数据存储失败", e);
return CompletableFuture.failedFuture(e);
}
}
}