feat(mq): 新增 WebAI 消息队列支持
为支持 WebAI 登录场景,新增 web.ai.headers.exchange 交换机及对应发送方法; 在 HostInfoServiceImpl 中增加 Redis 前缀判断与 webAISend 调用逻辑。
This commit is contained in:
@@ -24,6 +24,9 @@ public class RabbitMQConfig {
|
|||||||
|
|
||||||
public static final String AI_CHAT_EXCHANGE_NAME = "ai.chat.headers.exchange";
|
public static final String AI_CHAT_EXCHANGE_NAME = "ai.chat.headers.exchange";
|
||||||
public static final String BIG_BROTHER_EXCHANGE_NAME = "big.brother.headers.exchange";
|
public static final String BIG_BROTHER_EXCHANGE_NAME = "big.brother.headers.exchange";
|
||||||
|
public static final String WEB_AI_EXCHANGE_NAME = "web.ai.headers.exchange";
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
//创建队列
|
//创建队列
|
||||||
//true:表示持久化
|
//true:表示持久化
|
||||||
@@ -63,6 +66,13 @@ public class RabbitMQConfig {
|
|||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public HeadersExchange webAiHeadersExchange() {
|
||||||
|
return ExchangeBuilder.headersExchange(WEB_AI_EXCHANGE_NAME)
|
||||||
|
.durable(true)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public RabbitAdmin rabbitAdmin(ConnectionFactory cf) {
|
public RabbitAdmin rabbitAdmin(ConnectionFactory cf) {
|
||||||
|
|||||||
@@ -62,6 +62,7 @@ public class MQSender {
|
|||||||
return m;
|
return m;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void brotherSend(Long tenantId, Object payload) {
|
public void brotherSend(Long tenantId, Object payload) {
|
||||||
// 发送消息,把 userId 放进 header
|
// 发送消息,把 userId 放进 header
|
||||||
rabbitTemplate.convertAndSend(RabbitMQConfig.BIG_BROTHER_EXCHANGE_NAME, "", payload, m -> {
|
rabbitTemplate.convertAndSend(RabbitMQConfig.BIG_BROTHER_EXCHANGE_NAME, "", payload, m -> {
|
||||||
@@ -70,5 +71,13 @@ public class MQSender {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void webAISend(Long tenantId, Object payload) {
|
||||||
|
// 发送消息,把 userId 放进 header
|
||||||
|
rabbitTemplate.convertAndSend(RabbitMQConfig.WEB_AI_EXCHANGE_NAME, "", payload, m -> {
|
||||||
|
m.getMessageProperties().getHeaders().put("tenantId", tenantId);
|
||||||
|
return m;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -79,9 +79,16 @@ public class HostInfoServiceImpl extends ServiceImpl<NewHostsMapper, NewHosts> i
|
|||||||
newHosts.forEach(newHost -> {
|
newHosts.forEach(newHost -> {
|
||||||
mqSender.send(newHost.getTenantId(),newHost);
|
mqSender.send(newHost.getTenantId(),newHost);
|
||||||
});
|
});
|
||||||
|
|
||||||
log.info("发送消息到队列{}, 消息数量: {}", newHosts.get(0).getTenantId(), newHosts.size());
|
log.info("发送消息到队列{}, 消息数量: {}", newHosts.get(0).getTenantId(), newHosts.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (redisUtils.hasKeyByPrefix("webAI_login:"+ newHosts.get(0).getTenantId())) {
|
||||||
|
newHosts.forEach(newHost -> {
|
||||||
|
mqSender.webAISend(newHost.getTenantId(),newHost);
|
||||||
|
});
|
||||||
|
log.info("发送消息到队列WebAI{}, 消息数量: {}", newHosts.get(0).getTenantId(), newHosts.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
stopWatch.stop();
|
stopWatch.stop();
|
||||||
|
|||||||
Reference in New Issue
Block a user