diff --git a/src/main/java/com/yolo/keyborad/config/AppConfig.java b/src/main/java/com/yolo/keyborad/config/AppConfig.java index f24783c..f6c2d2c 100644 --- a/src/main/java/com/yolo/keyborad/config/AppConfig.java +++ b/src/main/java/com/yolo/keyborad/config/AppConfig.java @@ -15,6 +15,8 @@ public class AppConfig { private QdrantConfig qdrantConfig = new QdrantConfig(); + private LLmConfig llmConfig = new LLmConfig(); + @Data public static class UserRegisterProperties { @@ -31,4 +33,19 @@ public class AppConfig { //向量搜索时的返回数量限制 private Integer vectorSearchLimit = 1; } + + @Data + public static class LLmConfig { + //LLM系统提示语 + private String systemPrompt = """ + Format rules: + - Return EXACTLY 3 replies. + - Use "" as the separator. + - reply1reply2reply3 + """; + + //聊天消息最大长度 + private Integer maxMessageLength = 1000; + } + } diff --git a/src/main/java/com/yolo/keyborad/controller/ChatController.java b/src/main/java/com/yolo/keyborad/controller/ChatController.java index 2c78dc1..644645a 100644 --- a/src/main/java/com/yolo/keyborad/controller/ChatController.java +++ b/src/main/java/com/yolo/keyborad/controller/ChatController.java @@ -1,6 +1,5 @@ package com.yolo.keyborad.controller; -import cn.dev33.satoken.context.mock.SaTokenContextMockUtil; import cn.dev33.satoken.stp.StpUtil; import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.StrUtil; @@ -12,29 +11,19 @@ import com.yolo.keyborad.mapper.QdrantPayloadMapper; import com.yolo.keyborad.model.dto.chat.ChatReq; import com.yolo.keyborad.model.dto.chat.ChatSaveReq; import com.yolo.keyborad.model.dto.chat.ChatStreamMessage; -import com.yolo.keyborad.model.entity.KeyboardCharacter; -import com.yolo.keyborad.model.entity.KeyboardUserCallLog; -import com.yolo.keyborad.service.KeyboardCharacterService; -import com.yolo.keyborad.service.KeyboardUserCallLogService; +import com.yolo.keyborad.service.ChatService; import com.yolo.keyborad.service.impl.QdrantVectorService; import io.qdrant.client.grpc.JsonWithInt; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; -import org.springframework.ai.chat.client.ChatClient; -import org.springframework.ai.openai.OpenAiChatOptions; import org.springframework.ai.openai.OpenAiEmbeddingModel; import org.springframework.http.codec.ServerSentEvent; import org.springframework.web.bind.annotation.*; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; -import java.math.BigDecimal; -import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; +import java.util.Map; /* * @author: ziin @@ -46,12 +35,6 @@ import java.util.concurrent.atomic.AtomicReference; @Tag(name = "聊天", description = "聊天接口") public class ChatController { - // 最大消息长度限制 - private static final int MAX_MESSAGE_LENGTH = 1000; - - @Resource - private ChatClient client; - @Resource private OpenAiEmbeddingModel embeddingModel; @@ -59,174 +42,13 @@ public class ChatController { private QdrantVectorService qdrantVectorService; @Resource - private KeyboardCharacterService keyboardCharacterService; - - @Resource - private KeyboardUserCallLogService callLogService; + private ChatService chatService; @PostMapping("/talk") @Operation(summary = "聊天润色接口", description = "聊天润色接口") public Flux> talk(@RequestBody ChatReq chatReq){ - // 1. 参数校验 - if (chatReq == null) { - log.error("聊天请求参数为空"); - throw new BusinessException(ErrorCode.PARAMS_ERROR); - } - - if (chatReq.getCharacterId() == null) { - log.error("键盘人设ID为空"); - throw new BusinessException(ErrorCode.CHAT_CHARACTER_ID_EMPTY); - } - - if (StrUtil.isBlank(chatReq.getMessage())) { - log.error("聊天消息为空"); - throw new BusinessException(ErrorCode.CHAT_MESSAGE_EMPTY); - } - - if (chatReq.getMessage().length() > MAX_MESSAGE_LENGTH) { - log.error("聊天消息过长,长度: {}", chatReq.getMessage().length()); - throw new BusinessException(ErrorCode.CHAT_MESSAGE_TOO_LONG); - } - - // 2. 验证键盘人设是否存在 - KeyboardCharacter character = keyboardCharacterService.getById(chatReq.getCharacterId()); - if (character == null) { - log.error("键盘人设不存在,ID: {}", chatReq.getCharacterId()); - throw new BusinessException(ErrorCode.CHAT_CHARACTER_NOT_FOUND); - } - - // 初始化调用日志 - String requestId = IdUtil.fastSimpleUUID(); - long startTime = System.currentTimeMillis(); - AtomicReference modelRef = new AtomicReference<>(); - AtomicInteger inputTokens = new AtomicInteger(0); - AtomicInteger outputTokens = new AtomicInteger(0); - AtomicReference errorCodeRef = new AtomicReference<>(); - - // 3. LLM 流式输出 - Flux llmFlux = client - .prompt(character.getPrompt()) - .system(""" - Format rules: - - Return EXACTLY 3 replies. - - Use "" as the separator. - - reply1reply2reply3 - """) - .user(chatReq.getMessage()) - .options(OpenAiChatOptions.builder() - .user(StpUtil.getLoginIdAsString()) - .build()) - .stream() - .chatResponse() - .concatMap(response -> { - // 提取 metadata - if (response.getMetadata() != null) { - var metadata = response.getMetadata(); - if (metadata.getModel() != null) { - modelRef.set(metadata.getModel()); - } - if (metadata.getUsage() != null) { - var usage = metadata.getUsage(); - if (usage.getPromptTokens() != null) { - inputTokens.set(usage.getPromptTokens()); - } - if (usage.getCompletionTokens() != null) { - outputTokens.set(usage.getCompletionTokens()); - } - } - - - } - - // 获取内容 - String content = response.getResult().getOutput().getText(); - if (content == null || content.isEmpty()) { - return Flux.empty(); - } - - // 拆成单字符 - List chars = content.codePoints() - .mapToObj(cp -> new String(Character.toChars(cp))) - .toList(); - - // 按 3 个字符批量发送 - List batched = new ArrayList<>(); - StringBuilder sb = new StringBuilder(); - for (String ch : chars) { - sb.append(ch); - if (sb.length() >= 3) { - batched.add(sb.toString()); - sb.setLength(0); - } - } - if (!sb.isEmpty()) { - batched.add(sb.toString()); - } - - return Flux.fromIterable(batched) - .map(s -> new ChatStreamMessage("llm_chunk", s)); - }) - .doOnError(error -> { - log.error("LLM调用失败", error); - errorCodeRef.set("LLM_ERROR"); - }) - .onErrorResume(error -> - Flux.just(new ChatStreamMessage("error", "LLM服务暂时不可用,请稍后重试")) - ); - - // 4. 向量搜索Flux(一次性发送搜索结果) - Flux searchFlux = Mono - .fromCallable(() -> qdrantVectorService.searchText(chatReq.getMessage())) - .subscribeOn(Schedulers.boundedElastic()) // 避免阻塞 event-loop - .map(list -> new ChatStreamMessage("search_result", list)) - .doOnError(error -> log.error("向量搜索失败", error)) - .onErrorResume(error -> - Mono.just(new ChatStreamMessage("search_result", new ArrayList<>())) - ) - .flux(); - - // 5. 结束标记 - Flux doneFlux = - Flux.just(new ChatStreamMessage("done", null)); - - // 6. 合并所有Flux - Flux merged = - Flux.merge(llmFlux, searchFlux) - .concatWith(doneFlux); - String tokenValue = StpUtil.getTokenValue(); - // 7. SSE 包装并记录调用日志 - return merged - .doFinally(signalType -> { - // 异步保存调用日志 - Mono.fromRunnable(() -> { - try { - KeyboardUserCallLog callLog = new KeyboardUserCallLog(); - SaTokenContextMockUtil.setMockContext(()->{ - StpUtil.setTokenValueToStorage(tokenValue); - callLog.setUserId(StpUtil.getLoginIdAsLong()); - }); - callLog.setRequestId(requestId); - callLog.setFeature("chat_talk"); - callLog.setModel(modelRef.get()); - callLog.setInputTokens(inputTokens.get()); - callLog.setOutputTokens(outputTokens.get()); - callLog.setTotalTokens(inputTokens.get() + outputTokens.get()); - callLog.setSuccess(errorCodeRef.get() == null); - callLog.setLatencyMs((int) (System.currentTimeMillis() - startTime)); - callLog.setErrorCode(errorCodeRef.get()); - callLog.setCreatedAt(new Date()); - callLogService.save(callLog); - } catch (Exception e) { - log.error("保存调用日志失败", e); - } - }).subscribeOn(Schedulers.boundedElastic()).subscribe(); - }) - .map(msg -> - ServerSentEvent.builder(msg) - .event(msg.getType()) - .build() - ); + return chatService.talk(chatReq); } diff --git a/src/main/java/com/yolo/keyborad/service/ChatService.java b/src/main/java/com/yolo/keyborad/service/ChatService.java index 8c589ca..95e0f1f 100644 --- a/src/main/java/com/yolo/keyborad/service/ChatService.java +++ b/src/main/java/com/yolo/keyborad/service/ChatService.java @@ -1,8 +1,14 @@ package com.yolo.keyborad.service; +import com.yolo.keyborad.model.dto.chat.ChatReq; +import com.yolo.keyborad.model.dto.chat.ChatStreamMessage; +import org.springframework.http.codec.ServerSentEvent; +import reactor.core.publisher.Flux; + /* * @author: ziin * @date: 2025/12/8 15:16 */ public interface ChatService { + Flux> talk(ChatReq chatReq); } diff --git a/src/main/java/com/yolo/keyborad/service/impl/ChatServiceImpl.java b/src/main/java/com/yolo/keyborad/service/impl/ChatServiceImpl.java index a4b64f1..00a4812 100644 --- a/src/main/java/com/yolo/keyborad/service/impl/ChatServiceImpl.java +++ b/src/main/java/com/yolo/keyborad/service/impl/ChatServiceImpl.java @@ -1,12 +1,275 @@ package com.yolo.keyborad.service.impl; +import cn.dev33.satoken.context.mock.SaTokenContextMockUtil; +import cn.dev33.satoken.stp.StpUtil; +import cn.hutool.core.util.IdUtil; +import cn.hutool.core.util.StrUtil; +import com.yolo.keyborad.common.ErrorCode; +import com.yolo.keyborad.config.AppConfig; +import com.yolo.keyborad.config.NacosAppConfigCenter; +import com.yolo.keyborad.exception.BusinessException; +import com.yolo.keyborad.model.dto.chat.ChatReq; +import com.yolo.keyborad.model.dto.chat.ChatStreamMessage; +import com.yolo.keyborad.model.entity.KeyboardCharacter; +import com.yolo.keyborad.model.entity.KeyboardUserCallLog; import com.yolo.keyborad.service.ChatService; +import com.yolo.keyborad.service.KeyboardCharacterService; +import com.yolo.keyborad.service.KeyboardUserCallLogService; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.ai.chat.client.ChatClient; +import org.springframework.ai.openai.OpenAiChatOptions; +import org.springframework.http.codec.ServerSentEvent; import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; /* * @author: ziin * @date: 2025/12/8 15:17 */ @Service +@Slf4j public class ChatServiceImpl implements ChatService { + + // 最大消息长度限制 + private static final int MAX_MESSAGE_LENGTH = 1000; + + @Resource + private ChatClient client; + + @Resource + private QdrantVectorService qdrantVectorService; + + @Resource + private KeyboardCharacterService keyboardCharacterService; + + @Resource + private KeyboardUserCallLogService callLogService; + + private final NacosAppConfigCenter.DynamicAppConfig cfgHolder; + + public ChatServiceImpl(NacosAppConfigCenter.DynamicAppConfig cfgHolder) { + this.cfgHolder = cfgHolder; + } + + + /** + * 处理聊天对话,返回流式响应 + * + * @param chatReq 聊天请求对象,包含角色ID和消息内容 + * @return 返回SSE流式事件,包含LLM响应、向量搜索结果和结束标记 + * @throws BusinessException 当参数校验失败或角色不存在时抛出 + */ + @Override + public Flux> talk(ChatReq chatReq) { + AppConfig appConfig = cfgHolder.getRef().get(); + + // ============ 1. 参数校验 ============ + // 验证请求对象非空 + if (chatReq == null) { + log.error("聊天请求参数为空"); + throw new BusinessException(ErrorCode.PARAMS_ERROR); + } + + // 验证键盘人设ID非空 + if (chatReq.getCharacterId() == null) { + log.error("键盘人设ID为空"); + throw new BusinessException(ErrorCode.CHAT_CHARACTER_ID_EMPTY); + } + + // 验证消息内容非空 + if (StrUtil.isBlank(chatReq.getMessage())) { + log.error("聊天消息为空"); + throw new BusinessException(ErrorCode.CHAT_MESSAGE_EMPTY); + } + + // 验证消息长度不超过限制 + if (chatReq.getMessage().length() > appConfig.getLlmConfig().getMaxMessageLength()) { + log.error("聊天消息过长,长度: {}", chatReq.getMessage().length()); + throw new BusinessException(ErrorCode.CHAT_MESSAGE_TOO_LONG); + } + + // ============ 2. 验证键盘人设是否存在 ============ + KeyboardCharacter character = keyboardCharacterService.getById(chatReq.getCharacterId()); + if (character == null) { + log.error("键盘人设不存在,ID: {}", chatReq.getCharacterId()); + throw new BusinessException(ErrorCode.CHAT_CHARACTER_NOT_FOUND); + } + + // 获取应用配置 + + // ============ 初始化调用日志相关变量 ============ + // 生成唯一请求ID用于日志追踪 + String requestId = IdUtil.fastSimpleUUID(); + // 记录开始时间用于计算延迟 + long startTime = System.currentTimeMillis(); + // 原子引用保存模型信息 + AtomicReference modelRef = new AtomicReference<>(); + // 原子计数器保存输入token数 + AtomicInteger inputTokens = new AtomicInteger(0); + // 原子计数器保存输出token数 + AtomicInteger outputTokens = new AtomicInteger(0); + // 原子引用保存错误代码 + AtomicReference errorCodeRef = new AtomicReference<>(); + + // ============ 3. 构建LLM流式输出 ============ + Flux llmFlux = client + // 设置角色提示词 + .prompt(character.getPrompt()) + // 设置系统提示词 + .system(appConfig.getLlmConfig().getSystemPrompt()) + // 设置用户消息 + .user(chatReq.getMessage()) + // 配置OpenAI选项 + .options(OpenAiChatOptions.builder() + .user(StpUtil.getLoginIdAsString()) + .build()) + // 启用流式响应 + .stream() + .chatResponse() + // 处理每个响应块 + .concatMap(response -> { + // ===== 提取并保存元数据信息 ===== + if (response.getMetadata() != null) { + var metadata = response.getMetadata(); + // 保存模型名称 + if (metadata.getModel() != null) { + modelRef.set(metadata.getModel()); + } + // 保存token使用情况 + if (metadata.getUsage() != null) { + var usage = metadata.getUsage(); + if (usage.getPromptTokens() != null) { + inputTokens.set(usage.getPromptTokens()); + } + if (usage.getCompletionTokens() != null) { + outputTokens.set(usage.getCompletionTokens()); + } + } + } + + // ===== 获取响应内容 ===== + String content = response.getResult().getOutput().getText(); + // 如果内容为空则跳过 + if (content == null || content.isEmpty()) { + return Flux.empty(); + } + + // ===== 将内容拆分为单个字符(支持Unicode) ===== + List chars = content.codePoints() + .mapToObj(cp -> new String(Character.toChars(cp))) + .toList(); + + // ===== 按3个字符批量发送(优化传输效率) ===== + List batched = new ArrayList<>(); + StringBuilder sb = new StringBuilder(); + for (String ch : chars) { + sb.append(ch); + // 每累积3个字符就添加到批次列表 + if (sb.length() >= 3) { + batched.add(sb.toString()); + sb.setLength(0); + } + } + // 添加剩余字符 + if (!sb.isEmpty()) { + batched.add(sb.toString()); + } + + // ===== 将批次转换为消息流 ===== + return Flux.fromIterable(batched) + .map(s -> new ChatStreamMessage("llm_chunk", s)); + }) + // 记录LLM调用错误 + .doOnError(error -> { + log.error("LLM调用失败", error); + errorCodeRef.set("LLM_ERROR"); + }) + // 错误恢复:返回友好提示消息 + .onErrorResume(error -> + Flux.just(new ChatStreamMessage("error", "LLM服务暂时不可用,请稍后重试")) + ); + + // ============ 4. 构建向量搜索Flux(一次性发送搜索结果) ============ + Flux searchFlux = Mono + // 异步执行向量搜索 + .fromCallable(() -> qdrantVectorService.searchText(chatReq.getMessage())) + // 使用独立线程池避免阻塞事件循环 + .subscribeOn(Schedulers.boundedElastic()) + // 将搜索结果包装为消息 + .map(list -> new ChatStreamMessage("search_result", list)) + // 记录搜索失败日志 + .doOnError(error -> log.error("向量搜索失败", error)) + // 错误恢复:返回空列表 + .onErrorResume(error -> + Mono.just(new ChatStreamMessage("search_result", new ArrayList<>())) + ) + // 转换为Flux流 + .flux(); + + // ============ 5. 构建结束标记 ============ + Flux doneFlux = + Flux.just(new ChatStreamMessage("done", null)); + + // ============ 6. 合并所有Flux流 ============ + // merge: LLM响应和向量搜索并行执行 + // concatWith: 在流结束后添加结束标记 + Flux merged = + Flux.merge(llmFlux, searchFlux) + .concatWith(doneFlux); + + // 保存当前用户token值,用于异步日志记录 + String tokenValue = StpUtil.getTokenValue(); + + // ============ 7. SSE包装并记录调用日志 ============ + return merged + // 在流完成/取消/出错时执行 + .doFinally(signalType -> { + // ===== 异步保存调用日志 ===== + Mono.fromRunnable(() -> { + try { + // 构建调用日志对象 + KeyboardUserCallLog callLog = new KeyboardUserCallLog(); + // 设置Mock上下文以获取用户ID + SaTokenContextMockUtil.setMockContext(() -> { + StpUtil.setTokenValueToStorage(tokenValue); + callLog.setUserId(StpUtil.getLoginIdAsLong()); + }); + // 设置日志基本信息 + callLog.setRequestId(requestId); + callLog.setFeature("chat_talk"); + callLog.setModel(modelRef.get()); + // 设置token使用情况 + callLog.setInputTokens(inputTokens.get()); + callLog.setOutputTokens(outputTokens.get()); + callLog.setTotalTokens(inputTokens.get() + outputTokens.get()); + // 设置调用结果(无错误码表示成功) + callLog.setSuccess(errorCodeRef.get() == null); + // 计算并设置延迟时间(毫秒) + callLog.setLatencyMs((int) (System.currentTimeMillis() - startTime)); + callLog.setErrorCode(errorCodeRef.get()); + callLog.setCreatedAt(new Date()); + // 保存日志到数据库 + callLogService.save(callLog); + } catch (Exception e) { + log.error("保存调用日志失败", e); + } + }).subscribeOn(Schedulers.boundedElastic()).subscribe(); + }) + // 将消息包装为SSE事件 + .map(msg -> + ServerSentEvent.builder(msg) + .event(msg.getType()) + .build() + ); + } }