Compare commits

...

2 Commits

Author SHA1 Message Date
7e2fc54030 feat(rabbitmq): 新增主播收益统计队列及处理逻辑
新增 HOSTS_REVENUE_STATS 队列、发送/接收方法与业务实现,统一处理 ServerHostsRevenueStats 数据流
2026-01-13 18:13:40 +08:00
f4ceb56fd4 feat(core): 新增主播收益统计模块 2026-01-09 19:08:59 +08:00
9 changed files with 210 additions and 0 deletions

View File

@@ -20,6 +20,7 @@ public class RabbitMQConfig {
private static final String QUEUE = "HOST_INFO_QUEUE";
private static final String LIVE_HOST_DETAIL_QUEUE = "LIVE_HOST_DETAIL";
private static final String HOSTS_REVENUE_STATS_QUEUE = "HOSTS_REVENUE_STATS";
public static final String EXCHANGE_NAME = "user.headers.exchange";
@@ -43,6 +44,11 @@ public class RabbitMQConfig {
return new Queue(LIVE_HOST_DETAIL_QUEUE,true);
}
@Bean
public Queue hostsRevenueStatsQueue(){
return new Queue(HOSTS_REVENUE_STATS_QUEUE,true);
}
//
// @Bean
// public MessageConverter messageConverter(){

View File

@@ -4,6 +4,7 @@ import com.yupi.springbootinit.common.BaseResponse;
import com.yupi.springbootinit.common.ResultUtils;
import com.yupi.springbootinit.model.dto.host.QueryCountDTO;
import com.yupi.springbootinit.model.entity.NewHosts;
import com.yupi.springbootinit.model.entity.ServerHostsRevenueStats;
import com.yupi.springbootinit.model.entity.ServerLiveHostDetail;
import com.yupi.springbootinit.model.vo.country.CountryInfoVO;
import com.yupi.springbootinit.rabbitMQ.MQSender;
@@ -43,6 +44,12 @@ public class HostInfoController {
return ResultUtils.success(true);
}
@PostMapping("add_hosts_revenue_stats")
public BaseResponse<Boolean> addHostsRevenueStats(@RequestBody List<ServerHostsRevenueStats> hostsRevenueStats){
mqSender.hostsRevenueStatsSend(hostsRevenueStats);
return ResultUtils.success(true);
}
@GetMapping("country_info")
public BaseResponse<List<String>> getCountryInfo(@RequestParam(name = "countryName") String countryName){
List<CountryInfoVO> countryInfo = hostInfoService.getCountryInfo(countryName);

View File

@@ -0,0 +1,12 @@
package com.yupi.springbootinit.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.yupi.springbootinit.model.entity.ServerHostsRevenueStats;
/*
* @author: ziin
* @date: 2026/1/9 19:06
*/
public interface ServerHostsRevenueStatsMapper extends BaseMapper<ServerHostsRevenueStats> {
}

View File

@@ -0,0 +1,75 @@
package com.yupi.springbootinit.model.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import java.math.BigDecimal;
import java.util.Date;
import lombok.Data;
/*
* @author: ziin
* @date: 2026/1/9 19:06
*/
/**
* 用户收入数据统计表
*/
@Data
@TableName(value = "server_hosts_revenue_stats")
public class ServerHostsRevenueStats {
/**
* 自增主键
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
* 租户Id
*/
@TableField(value = "tenant_id")
private Long tenantId;
/**
* 历史收入数据列表 (数组格式)
*/
@TableField(value = "history")
private String history;
/**
* 展示ID/用户标识
*/
@TableField(value = "display_id")
private String displayId;
/**
* 今日收入
*/
@TableField(value = "today_revenue")
private BigDecimal todayRevenue;
/**
* 累计总收入
*/
@TableField(value = "total_revenue")
private BigDecimal totalRevenue;
/**
* 统计的天数/周期数
*/
@TableField(value = "last_days_count")
private Integer lastDaysCount;
/**
* 最后更新时间
*/
@TableField(value = "updated_at")
private Date updatedAt;
/**
* 创建时间
*/
@TableField(value = "created_at")
private Date createdAt;
}

View File

@@ -7,9 +7,11 @@ import com.yupi.springbootinit.common.ErrorCode;
import com.yupi.springbootinit.exception.BusinessException;
import com.yupi.springbootinit.model.entity.NewHosts;
import com.yupi.springbootinit.model.entity.ServerBigBrother;
import com.yupi.springbootinit.model.entity.ServerHostsRevenueStats;
import com.yupi.springbootinit.model.entity.ServerLiveHostDetail;
import com.yupi.springbootinit.service.HostInfoService;
import com.yupi.springbootinit.service.ServerBigBrotherService;
import com.yupi.springbootinit.service.ServerHostsRevenueStatsService;
import com.yupi.springbootinit.service.ServerLiveHostDetailService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
@@ -34,6 +36,9 @@ public class MQReceiver {
@Resource
private ServerLiveHostDetailService serverLiveHostDetailService;
@Resource
private ServerHostsRevenueStatsService serverHostsRevenueStatsService;
// //方法:接收消息
// @RabbitListener(queues = "HOST_INFO_QUEUE")
@@ -98,4 +103,18 @@ public class MQReceiver {
throw new BusinessException(ErrorCode.QUEUE_CONSUMPTION_FAILURE);
}
}
@RabbitListener(queues = "HOSTS_REVENUE_STATS",id = "hostsRevenueStats", autoStartup = "true")
@Async("taskExecutor")
public void hostsRevenueStatsReceive(List<ServerHostsRevenueStats> statsList, Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
serverHostsRevenueStatsService.processHostsRevenueStats(statsList);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, false);
log.error("主播收益统计消息消费失败,数量: {}", statsList.size(), e);
throw new BusinessException(ErrorCode.QUEUE_CONSUMPTION_FAILURE);
}
}
}

View File

@@ -6,6 +6,7 @@ import com.yupi.springbootinit.config.RabbitMQConfig;
import com.yupi.springbootinit.exception.BusinessException;
import com.yupi.springbootinit.model.entity.NewHosts;
import com.yupi.springbootinit.model.entity.ServerBigBrother;
import com.yupi.springbootinit.model.entity.ServerHostsRevenueStats;
import com.yupi.springbootinit.model.entity.ServerLiveHostDetail;
import io.swagger.models.auth.In;
import lombok.RequiredArgsConstructor;
@@ -51,6 +52,15 @@ public class MQSender {
}
}
//方法:发送主播收益统计消息
public void hostsRevenueStatsSend(List<ServerHostsRevenueStats> list){
try {
rabbitTemplate.convertAndSend("HOSTS_REVENUE_STATS",list);
}catch (Exception e){
throw new BusinessException(ErrorCode.QUEUE_ERROR);
}
}
//方法:发送消息
public void bigBrotherSend(ServerBigBrother bigBrothers){
try {

View File

@@ -0,0 +1,18 @@
package com.yupi.springbootinit.service;
import com.yupi.springbootinit.model.entity.ServerHostsRevenueStats;
import com.baomidou.mybatisplus.extension.service.IService;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/*
* @author: ziin
* @date: 2026/1/9 19:06
*/
public interface ServerHostsRevenueStatsService extends IService<ServerHostsRevenueStats>{
CompletableFuture<Void> processHostsRevenueStats(List<ServerHostsRevenueStats> statsList);
}

View File

@@ -0,0 +1,41 @@
package com.yupi.springbootinit.service.impl;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.yupi.springbootinit.model.entity.ServerHostsRevenueStats;
import com.yupi.springbootinit.mapper.ServerHostsRevenueStatsMapper;
import com.yupi.springbootinit.service.ServerHostsRevenueStatsService;
/*
* @author: ziin
* @date: 2026/1/9 19:06
*/
@Service
@Slf4j
@Transactional(rollbackFor = Exception.class)
public class ServerHostsRevenueStatsServiceImpl extends ServiceImpl<ServerHostsRevenueStatsMapper, ServerHostsRevenueStats> implements ServerHostsRevenueStatsService{
@Override
public CompletableFuture<Void> processHostsRevenueStats(List<ServerHostsRevenueStats> statsList) {
try {
// 处理 history 字段,确保为有效的 JSON
for (ServerHostsRevenueStats stats : statsList) {
if (StrUtil.isBlank(stats.getHistory())) {
stats.setHistory("[]");
}
}
saveBatch(statsList);
log.info("主播收益统计数据存储成功,数量: {}", statsList.size());
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
log.error("主播收益统计数据存储失败", e);
return CompletableFuture.failedFuture(e);
}
}
}

View File

@@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.yupi.springbootinit.mapper.ServerHostsRevenueStatsMapper">
<resultMap id="BaseResultMap" type="com.yupi.springbootinit.model.entity.ServerHostsRevenueStats">
<!--@mbg.generated-->
<!--@Table server_hosts_revenue_stats-->
<id column="id" jdbcType="BIGINT" property="id" />
<result column="tenant_id" jdbcType="BIGINT" property="tenantId" />
<result column="history" jdbcType="VARCHAR" property="history" />
<result column="display_id" jdbcType="VARCHAR" property="displayId" />
<result column="today_revenue" jdbcType="DECIMAL" property="todayRevenue" />
<result column="total_revenue" jdbcType="DECIMAL" property="totalRevenue" />
<result column="last_days_count" jdbcType="INTEGER" property="lastDaysCount" />
<result column="updated_at" jdbcType="TIMESTAMP" property="updatedAt" />
<result column="created_at" jdbcType="TIMESTAMP" property="createdAt" />
</resultMap>
<sql id="Base_Column_List">
<!--@mbg.generated-->
id, tenant_id, history, display_id, today_revenue, total_revenue, last_days_count,
updated_at, created_at
</sql>
</mapper>