package com.yupi.springbootinit.rabbitMQ; import cn.hutool.core.date.DateTime; import com.rabbitmq.client.Channel; import com.yupi.springbootinit.common.ErrorCode; import com.yupi.springbootinit.exception.BusinessException; import com.yupi.springbootinit.model.entity.NewHosts; import com.yupi.springbootinit.model.entity.ServerBigBrother; import com.yupi.springbootinit.service.HostInfoService; import com.yupi.springbootinit.service.ServerBigBrotherService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.context.annotation.Lazy; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.io.IOException; import java.util.List; @Service @Slf4j public class MQReceiver { @Resource private HostInfoService hostInfoService; @Resource private ServerBigBrotherService serverBigBrotherService; // //方法:接收消息 // @RabbitListener(queues = "HOST_INFO_QUEUE") // public void receive(List hosts, Channel channel, Message message) throws IOException { // long deliveryTag = message.getMessageProperties().getDeliveryTag(); // try { // hostInfoService.processHosts(hosts); // channel.basicAck(deliveryTag,false); // log.info("deliveryTag:{}",deliveryTag); // log.info("接收到的消息------->" + hosts.size()); // }catch (Exception e){ // channel.basicNack(deliveryTag,false,true); // log.error("消息消费失败------->消息内容大小{}",hosts.size() ); // } // } @RabbitListener(queues = "HOST_INFO_QUEUE",id = "hosts", autoStartup = "false") @Async("taskExecutor") public void receive(List hosts, Channel channel, Message message) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { // 等待所有异步任务完成 hostInfoService.processHosts(hosts); // 这里会抛出异常 channel.basicAck(deliveryTag, false); // log.info("deliveryTag:{}", deliveryTag); // log.info("{} 消息消费内容大小-------> {}",DateTime.now(),hosts.size()); } catch (Exception e) { channel.basicNack(deliveryTag, false, false); log.error("消息消费失败------->消息内容大小{}", hosts.size(), e); throw new BusinessException(ErrorCode.QUEUE_CONSUMPTION_FAILURE); } } @RabbitListener(queues = "BIG_BROTHER_QUEUE",id = "bigbrother", autoStartup = "false") @Async("taskExecutor") public void bigBrotherReceive(ServerBigBrother bigBrotherList, Channel channel, Message message) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { // 等待所有异步任务完成 serverBigBrotherService.saveData(bigBrotherList); channel.basicAck(deliveryTag, false); // log.info("deliveryTag:{}", deliveryTag); // log.info("{} 消息消费内容大小-------> {}",DateTime.now(),hosts.size()); } catch (Exception e) { channel.basicNack(deliveryTag, false, false); log.error("消息消费失败"); log.error(e.getMessage()); throw new BusinessException(ErrorCode.QUEUE_CONSUMPTION_FAILURE); } } }