feat(mq): 新增直播明细消息队列与处理链路

- 新增 ServerLiveHostDetail 实体、Mapper、Service 及实现类
- MQSender 新增 liveHostDetailSend 方法,MQReceiver 新增 liveHostDetailReceive 消费者
- RabbitMQConfig 补充 LIVE_HOST_DETAIL 队列定义
- HostInfoController 新增 add_live_host_detail 接口,支持批量接收直播明细数据
- .gitignore 忽略 CLAUDE.md 日志文件
This commit is contained in:
2025-12-17 21:44:37 +08:00
parent dfccc03df8
commit c108742555
10 changed files with 251 additions and 4 deletions

1
.gitignore vendored
View File

@@ -147,3 +147,4 @@ fabric.properties
!/.xcodemap/
/tk-data-save.log
/CLAUDE.md

View File

@@ -19,6 +19,7 @@ import org.springframework.context.annotation.Configuration;
public class RabbitMQConfig {
private static final String QUEUE = "HOST_INFO_QUEUE";
private static final String LIVE_HOST_DETAIL_QUEUE = "LIVE_HOST_DETAIL";
public static final String EXCHANGE_NAME = "user.headers.exchange";
@@ -37,6 +38,11 @@ public class RabbitMQConfig {
return new Queue(QUEUE,true);
}
@Bean
public Queue liveHostDetailQueue(){
return new Queue(LIVE_HOST_DETAIL_QUEUE,true);
}
//
// @Bean
// public MessageConverter messageConverter(){

View File

@@ -2,15 +2,13 @@ package com.yupi.springbootinit.controller;
import com.yupi.springbootinit.common.BaseResponse;
import com.yupi.springbootinit.common.ResultUtils;
import com.yupi.springbootinit.model.dto.host.HostInfoDTO;
import com.yupi.springbootinit.model.dto.host.QueryCountDTO;
import com.yupi.springbootinit.model.entity.NewHosts;
import com.yupi.springbootinit.model.entity.ServerLiveHostDetail;
import com.yupi.springbootinit.model.vo.country.CountryInfoVO;
import com.yupi.springbootinit.model.vo.hosts.NewHostsVO;
import com.yupi.springbootinit.rabbitMQ.MQSender;
import com.yupi.springbootinit.service.HostInfoService;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.annotations.Param;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
@@ -39,6 +37,12 @@ public class HostInfoController {
return ResultUtils.success(true);
}
@PostMapping("add_live_host_detail")
public BaseResponse<Boolean> addLiveHostDetail(@RequestBody List<ServerLiveHostDetail> liveHostDetails){
mqSender.liveHostDetailSend(liveHostDetails);
return ResultUtils.success(true);
}
@GetMapping("country_info")
public BaseResponse<List<String>> getCountryInfo(@RequestParam(name = "countryName") String countryName){
List<CountryInfoVO> countryInfo = hostInfoService.getCountryInfo(countryName);

View File

@@ -0,0 +1,12 @@
package com.yupi.springbootinit.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.yupi.springbootinit.model.entity.ServerLiveHostDetail;
/*
* @author: ziin
* @date: 2025/12/17 20:45
*/
public interface ServerLiveHostDetailMapper extends BaseMapper<ServerLiveHostDetail> {
}

View File

@@ -0,0 +1,114 @@
package com.yupi.springbootinit.model.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import java.util.Date;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
/*
* @author: ziin
* @date: 2025/12/17 20:45
*/
/**
* 主播单场直播明细表
*/
@Data
@TableName(value = "server_live_host_detail")
public class ServerLiveHostDetail {
/**
* 主键
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
* 当前用户标识
*/
@TableField(value = "user_id")
private Long userId;
/**
* 主播名称
*/
@TableField(value = "hosts_id")
private String hostsId;
/**
* 粉丝团人数
*/
@TableField(value = "fans_club_count")
private Integer fansClubCount;
/**
* 礼物之比
*/
@TableField(value = "lighted_vs_total_gifts")
private String lightedVsTotalGifts;
/**
* 直播开始时间(格式化)
*/
@TableField(value = "start_time_formatted")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date startTimeFormatted;
/**
* 直播结束时间(格式化)
*/
@TableField(value = "end_time_formatted")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date endTimeFormatted;
/**
* 获得的点赞数量
*/
@TableField(value = "like_count")
private Integer likeCount;
/**
* 持续时间(格式化)
*/
@TableField(value = "duration_formatted")
private String durationFormatted;
/**
* 记录创建时间
*/
@TableField(value = "create_time")
private Date createTime;
/**
* 记录更新时间
*/
@TableField(value = "update_time")
private Date updateTime;
/**
* 租户 Id
*/
@TableField(value = "tenant_id")
private Long tenantId;
/**
* 是否删除
*/
@TableField(value = "deleted")
private Byte deleted;
/**
* 更新人
*/
@TableField(value = "updater")
private String updater;
/**
* 创建人
*/
@TableField(value = "creator")
private String creator;
}

View File

@@ -7,8 +7,10 @@ import com.yupi.springbootinit.common.ErrorCode;
import com.yupi.springbootinit.exception.BusinessException;
import com.yupi.springbootinit.model.entity.NewHosts;
import com.yupi.springbootinit.model.entity.ServerBigBrother;
import com.yupi.springbootinit.model.entity.ServerLiveHostDetail;
import com.yupi.springbootinit.service.HostInfoService;
import com.yupi.springbootinit.service.ServerBigBrotherService;
import com.yupi.springbootinit.service.ServerLiveHostDetailService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
@@ -29,6 +31,9 @@ public class MQReceiver {
@Resource
private ServerBigBrotherService serverBigBrotherService;
@Resource
private ServerLiveHostDetailService serverLiveHostDetailService;
// //方法:接收消息
// @RabbitListener(queues = "HOST_INFO_QUEUE")
@@ -62,7 +67,7 @@ public class MQReceiver {
}
}
@RabbitListener(queues = "BIG_BROTHER_QUEUE",id = "bigbrother", autoStartup = "false")
@RabbitListener(queues = "BIG_BROTHER_QUEUE",id = "bigbrother", autoStartup = "true")
@Async("taskExecutor")
public void bigBrotherReceive(ServerBigBrother bigBrotherList, Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
@@ -79,4 +84,18 @@ public class MQReceiver {
throw new BusinessException(ErrorCode.QUEUE_CONSUMPTION_FAILURE);
}
}
@RabbitListener(queues = "LIVE_HOST_DETAIL",id = "liveHostDetail", autoStartup = "true")
@Async("taskExecutor")
public void liveHostDetailReceive(List<ServerLiveHostDetail> details, Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
serverLiveHostDetailService.processLiveHostDetails(details);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, false);
log.error("直播明细消息消费失败,数量: {}", details.size(), e);
throw new BusinessException(ErrorCode.QUEUE_CONSUMPTION_FAILURE);
}
}
}

View File

@@ -6,6 +6,7 @@ import com.yupi.springbootinit.config.RabbitMQConfig;
import com.yupi.springbootinit.exception.BusinessException;
import com.yupi.springbootinit.model.entity.NewHosts;
import com.yupi.springbootinit.model.entity.ServerBigBrother;
import com.yupi.springbootinit.model.entity.ServerLiveHostDetail;
import io.swagger.models.auth.In;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -41,6 +42,15 @@ public class MQSender {
}
}
//方法:发送直播明细消息
public void liveHostDetailSend(List<ServerLiveHostDetail> list){
try {
rabbitTemplate.convertAndSend("LIVE_HOST_DETAIL",list);
}catch (Exception e){
throw new BusinessException(ErrorCode.QUEUE_ERROR);
}
}
//方法:发送消息
public void bigBrotherSend(ServerBigBrother bigBrothers){
try {

View File

@@ -0,0 +1,18 @@
package com.yupi.springbootinit.service;
import com.yupi.springbootinit.model.entity.ServerLiveHostDetail;
import com.baomidou.mybatisplus.extension.service.IService;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/*
* @author: ziin
* @date: 2025/12/17 20:45
*/
public interface ServerLiveHostDetailService extends IService<ServerLiveHostDetail>{
CompletableFuture<Void> processLiveHostDetails(List<ServerLiveHostDetail> details);
}

View File

@@ -0,0 +1,34 @@
package com.yupi.springbootinit.service.impl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.yupi.springbootinit.model.entity.ServerLiveHostDetail;
import com.yupi.springbootinit.mapper.ServerLiveHostDetailMapper;
import com.yupi.springbootinit.service.ServerLiveHostDetailService;
/*
* @author: ziin
* @date: 2025/12/17 20:45
*/
@Service
@Slf4j
@Transactional(rollbackFor = Exception.class)
public class ServerLiveHostDetailServiceImpl extends ServiceImpl<ServerLiveHostDetailMapper, ServerLiveHostDetail> implements ServerLiveHostDetailService{
@Override
public CompletableFuture<Void> processLiveHostDetails(List<ServerLiveHostDetail> details) {
try {
saveBatch(details);
log.info("直播明细数据存储成功,数量: {}", details.size());
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
log.error("直播明细数据存储失败", e);
return CompletableFuture.failedFuture(e);
}
}
}

View File

@@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.yupi.springbootinit.mapper.ServerLiveHostDetailMapper">
<resultMap id="BaseResultMap" type="com.yupi.springbootinit.model.entity.ServerLiveHostDetail">
<!--@mbg.generated-->
<!--@Table server_live_host_detail-->
<id column="id" jdbcType="BIGINT" property="id" />
<result column="userId" jdbcType="BIGINT" property="userid" />
<result column="hostsId" jdbcType="VARCHAR" property="hostsid" />
<result column="fans_club_count" jdbcType="INTEGER" property="fansClubCount" />
<result column="lighted_vs_total_gifts" jdbcType="VARCHAR" property="lightedVsTotalGifts" />
<result column="start_time_formatted" jdbcType="VARCHAR" property="startTimeFormatted" />
<result column="end_time_formatted" jdbcType="VARCHAR" property="endTimeFormatted" />
<result column="like_count" jdbcType="INTEGER" property="likeCount" />
<result column="duration_formatted" jdbcType="VARCHAR" property="durationFormatted" />
<result column="create_time" jdbcType="TIMESTAMP" property="createTime" />
<result column="update_time" jdbcType="TIMESTAMP" property="updateTime" />
<result column="tenant_id" jdbcType="BIGINT" property="tenantId" />
<result column="deleted" jdbcType="TINYINT" property="deleted" />
<result column="updater" jdbcType="VARCHAR" property="updater" />
<result column="creator" jdbcType="VARCHAR" property="creator" />
</resultMap>
<sql id="Base_Column_List">
<!--@mbg.generated-->
id, userId, hostsId, fans_club_count, lighted_vs_total_gifts, start_time_formatted,
end_time_formatted, like_count, duration_formatted, create_time, update_time, tenant_id,
deleted, updater, creator
</sql>
</mapper>