Files
keyboard/CustomKeyboard/Network/KBStreamFetcher.m
2025-11-12 17:55:59 +08:00

381 lines
17 KiB
Objective-C
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//
// KBStreamFetcher.m
//
#import "KBStreamFetcher.h"
@interface KBStreamFetcher () <NSURLSessionDataDelegate>
@property (nonatomic, strong) NSURLSession *session;
@property (nonatomic, strong) NSURLSessionDataTask *task;
@property (nonatomic, strong) NSMutableData *buffer; // 网络原始字节累加
@property (nonatomic, assign) NSStringEncoding textEncoding; // 推断得到的文本编码(默认 UTF-8
@property (nonatomic, assign) BOOL isSSE; // 是否为 SSE 响应
@property (nonatomic, strong) NSMutableString *sseTextBuffer; // SSE 文本缓冲(已解码)
@property (nonatomic, assign) NSInteger decodedPrefixBytes; // 已解码并写入 sseTextBuffer 的字节数SSE
@property (nonatomic, assign) NSInteger deliveredCharCount; // 已回传的字符数(非 SSE用于做增量
@property (nonatomic, assign) BOOL hasEmitted; // 是否已经输出过正文(用于“首段删 1 个 \t”
@property (nonatomic, assign) BOOL lastChunkEndedWithTab; // 上一个已输出分片是否以 "\t" 结尾(用于跨分片去除“\t 后空格”)
@property (nonatomic, strong) NSMutableArray<NSString *> *pendingQueue; // 待回调的分片(节流输出)
@property (nonatomic, strong) NSTimer *flushTimer; // 定时从队列取出一条回调
@property (nonatomic, strong, nullable) NSError *finishError; // 结束时的错误(需要等队列清空再回调)
// Metrics
@property (nonatomic, assign) CFAbsoluteTime tStart; // start() 被调用的时刻
@property (nonatomic, assign) CFAbsoluteTime tFirstByte; // 第一次拿到可解码内容
@property (nonatomic, assign) CFAbsoluteTime tFinish; // 完成/失败时刻
@property (nonatomic, assign) NSInteger emittedChunkCount; // 已输出分片数量
@end
// 计算数据中以 UTF-8 编码可完整解码的“前缀字节长度”,避免切断多字节字符
static NSUInteger kb_validUTF8PrefixLen(NSData *data) {
const unsigned char *bytes = (const unsigned char *)data.bytes;
NSUInteger n = data.length;
if (n == 0) return 0;
NSInteger i = (NSInteger)n - 1;
while (i >= 0 && (bytes[i] & 0xC0) == 0x80) { i--; } // 10xxxxxx 续字节
if (i < 0) return 0; // 全是续字节,等下次
unsigned char b = bytes[i];
NSUInteger expected = 1;
if ((b & 0x80) == 0x00) expected = 1; // 0xxxxxxx
else if ((b & 0xE0) == 0xC0) expected = 2; // 110xxxxx
else if ((b & 0xF0) == 0xE0) expected = 3; // 1110xxxx
else if ((b & 0xF8) == 0xF0) expected = 4; // 11110xxx
else return (NSUInteger)i; // 非法起始,截到 i 之前
NSUInteger remain = n - (NSUInteger)i;
return (remain >= expected) ? n : (NSUInteger)i;
}
@implementation KBStreamFetcher
+ (instancetype)fetcherWithURL:(NSURL *)url {
KBStreamFetcher *f = [[self alloc] init];
f.url = url;
return f;
}
- (instancetype)init {
if (self = [super init]) {
_acceptEventStream = NO;
_disableCompression = YES;
_treatSlashTAsTab = YES;
_trimLeadingTabOnce = YES;
_requestTimeout = 30.0;
_textEncoding = NSUTF8StringEncoding;
_buffer = [NSMutableData data];
_sseTextBuffer = [NSMutableString string];
_pendingQueue = [NSMutableArray array];
_flushInterval = 0.1;
_splitLargeDeltasOnWhitespace = YES;
_loggingEnabled = YES;
}
return self;
}
- (void)start {
if (!self.url) return;
[self cancel];
NSURLSessionConfiguration *cfg = [NSURLSessionConfiguration defaultSessionConfiguration];
cfg.requestCachePolicy = NSURLRequestReloadIgnoringLocalCacheData;
cfg.timeoutIntervalForRequest = self.requestTimeout;
cfg.timeoutIntervalForResource = MAX(self.requestTimeout, 60.0);
self.session = [NSURLSession sessionWithConfiguration:cfg delegate:self delegateQueue:[NSOperationQueue mainQueue]];
NSMutableURLRequest *req = [NSMutableURLRequest requestWithURL:self.url];
req.HTTPMethod = @"GET";
if (self.disableCompression) { [req setValue:@"identity" forHTTPHeaderField:@"Accept-Encoding"]; }
if (self.acceptEventStream) { [req setValue:@"text/event-stream" forHTTPHeaderField:@"Accept"]; }
[req setValue:@"no-cache" forHTTPHeaderField:@"Cache-Control"];
[req setValue:@"keep-alive" forHTTPHeaderField:@"Connection"];
[self.extraHeaders enumerateKeysAndObjectsUsingBlock:^(NSString *k, NSString *v, BOOL *stop){ [req setValue:v forHTTPHeaderField:k]; }];
// 状态复位
[self.buffer setLength:0];
[self.sseTextBuffer setString:@""];
self.isSSE = NO;
self.textEncoding = NSUTF8StringEncoding;
self.decodedPrefixBytes = 0;
self.deliveredCharCount = 0;
self.hasEmitted = NO;
self.lastChunkEndedWithTab = NO;
[self.pendingQueue removeAllObjects];
[self.flushTimer invalidate]; self.flushTimer = nil;
self.finishError = nil;
self.tStart = CFAbsoluteTimeGetCurrent();
self.tFirstByte = 0;
self.tFinish = 0;
self.emittedChunkCount = 0;
if (self.loggingEnabled) {
NSLog(@"[KBStream] start url=%@ acceptSSE=%@ disableCompression=%@ flush=%.0fms splitWords=%@",
self.url.absoluteString,
self.acceptEventStream?@"YES":@"NO",
self.disableCompression?@"YES":@"NO",
self.flushInterval*1000.0,
self.splitLargeDeltasOnWhitespace?@"YES":@"NO");
}
self.task = [self.session dataTaskWithRequest:req];
[self.task resume];
}
- (void)cancel {
[self.task cancel];
self.task = nil;
[self.session invalidateAndCancel];
self.session = nil;
[self.buffer setLength:0];
[self.sseTextBuffer setString:@""];
self.decodedPrefixBytes = 0;
self.deliveredCharCount = 0;
self.hasEmitted = NO;
self.lastChunkEndedWithTab = NO;
[self.pendingQueue removeAllObjects];
[self.flushTimer invalidate]; self.flushTimer = nil;
self.finishError = nil;
}
#pragma mark - NSURLSessionDataDelegate
- (void)URLSession:(NSURLSession *)session dataTask:(NSURLSessionDataTask *)dataTask didReceiveResponse:(NSURLResponse *)response completionHandler:(void (^)(NSURLSessionResponseDisposition))completionHandler {
self.isSSE = NO;
self.textEncoding = NSUTF8StringEncoding;
if ([response isKindOfClass:[NSHTTPURLResponse class]]) {
NSHTTPURLResponse *r = (NSHTTPURLResponse *)response;
NSString *ct = r.allHeaderFields[@"Content-Type"] ?: r.allHeaderFields[@"content-type"];
if ([ct isKindOfClass:[NSString class]]) {
NSString *lower = [ct lowercaseString];
if ([lower containsString:@"text/event-stream"]) self.isSSE = YES;
NSRange pos = [lower rangeOfString:@"charset="];
if (pos.location != NSNotFound) {
NSString *charset = [[lower substringFromIndex:pos.location + pos.length] componentsSeparatedByString:@";"][0];
if ([charset containsString:@"utf-8"] || [charset containsString:@"utf8"]) {
self.textEncoding = NSUTF8StringEncoding;
} else if ([charset containsString:@"iso-8859-1"] || [charset containsString:@"latin1"]) {
self.textEncoding = NSISOLatin1StringEncoding;
}
}
}
}
[self.sseTextBuffer setString:@""];
self.decodedPrefixBytes = 0;
if (completionHandler) completionHandler(NSURLSessionResponseAllow);
}
- (void)URLSession:(NSURLSession *)session dataTask:(NSURLSessionDataTask *)dataTask didReceiveData:(NSData *)data {
if (data.length == 0) return;
[self.buffer appendData:data];
NSUInteger validLen = (self.textEncoding == NSUTF8StringEncoding)
? kb_validUTF8PrefixLen(self.buffer)
: self.buffer.length;
if (validLen > 0 && self.tFirstByte == 0) {
self.tFirstByte = CFAbsoluteTimeGetCurrent();
if (self.loggingEnabled) {
NSLog(@"[KBStream] first-bytes after %.0fms (encoding=%@, SSE=%@)",
(self.tFirstByte - self.tStart)*1000.0,
(self.textEncoding==NSUTF8StringEncoding?@"UTF-8":@"Other"),
self.isSSE?@"YES":@"NO");
}
}
if (validLen == 0) return; // 末尾可能卡着半个字符
if (self.isSSE) {
if ((NSUInteger)self.decodedPrefixBytes < validLen) {
NSRange rng = NSMakeRange((NSUInteger)self.decodedPrefixBytes, validLen - (NSUInteger)self.decodedPrefixBytes);
NSString *piece = [[NSString alloc] initWithBytes:(const char *)self.buffer.bytes + rng.location
length:rng.length
encoding:self.textEncoding];
if (piece.length > 0) {
[self.sseTextBuffer appendString:piece];
self.decodedPrefixBytes = (NSInteger)validLen;
}
}
// 统一换行并按 SSE 事件 \n\n 切开
if (self.sseTextBuffer.length > 0) {
NSString *normalized = [self.sseTextBuffer stringByReplacingOccurrencesOfString:@"\r\n" withString:@"\n"];
[self.sseTextBuffer setString:normalized];
while (1) {
NSRange sep = [self.sseTextBuffer rangeOfString:@"\n\n"]; // 完整事件
if (sep.location == NSNotFound) break;
NSString *event = [self.sseTextBuffer substringToIndex:sep.location];
[self.sseTextBuffer deleteCharactersInRange:NSMakeRange(0, sep.location + sep.length)];
// 合并 data: 行为正文
NSArray<NSString *> *lines = [event componentsSeparatedByString:@"\n"];
NSMutableString *payload = [NSMutableString string];
for (NSString *ln in lines) {
if ([ln hasPrefix:@"data:"]) {
NSString *v = [ln substringFromIndex:5];
if (v.length > 0 && [v hasPrefix:@" "]) v = [v substringFromIndex:1];
[payload appendString:v ?: @""];
}
}
if (payload.length > 0) { [self enqueueChunk:payload]; }
}
}
return;
}
// 非 SSE直接对“可解码前缀”做增量输出
NSString *prefix = [[NSString alloc] initWithBytes:self.buffer.bytes length:validLen encoding:self.textEncoding];
if (!prefix) return;
if (self.deliveredCharCount < (NSInteger)prefix.length) {
NSString *delta = [prefix substringFromIndex:self.deliveredCharCount];
self.deliveredCharCount = prefix.length;
if (self.splitLargeDeltasOnWhitespace && delta.length > 16) {
// 按空格切词逐条回调(保留空格,使观感更自然)
NSArray<NSString *> *parts = [delta componentsSeparatedByString:@" "];
for (NSUInteger i = 0; i < parts.count; i++) {
NSString *w = parts[i];
if (w.length == 0) { [self enqueueChunk:@" "]; continue; }
if (i + 1 < parts.count) {
[self enqueueChunk:[w stringByAppendingString:@" "]];
} else {
[self enqueueChunk:w];
}
}
} else {
[self enqueueChunk:delta];
}
}
}
- (void)URLSession:(NSURLSession *)session task:(NSURLSessionTask *)task didCompleteWithError:(NSError *)error {
if (!error && self.isSSE && self.sseTextBuffer.length > 0) {
// 处理最后一条未以 \n\n 结束的事件
NSString *normalized = [self.sseTextBuffer stringByReplacingOccurrencesOfString:@"\r\n" withString:@"\n"];
NSArray<NSString *> *lines = [normalized componentsSeparatedByString:@"\n"];
NSMutableString *payload = [NSMutableString string];
for (NSString *ln in lines) {
if ([ln hasPrefix:@"data:"]) {
NSString *v = [ln substringFromIndex:5];
if (v.length > 0 && [v hasPrefix:@" "]) v = [v substringFromIndex:1];
[payload appendString:v ?: @""];
}
}
if (payload.length > 0) {
NSString *delta = nil;
if ((NSInteger)payload.length >= self.deliveredCharCount) {
delta = [payload substringFromIndex:self.deliveredCharCount];
} else {
delta = payload;
}
self.deliveredCharCount = payload.length;
if (delta.length > 0) { [self emitChunk:delta]; }
}
}
self.tFinish = CFAbsoluteTimeGetCurrent();
if (self.loggingEnabled) {
double t0 = (self.tFirstByte>0? (self.tFirstByte - self.tStart)*1000.0 : -1);
double t1 = (self.tFirstByte>0? (self.tFinish - self.tFirstByte)*1000.0 : -1);
double tt = (self.tFinish - self.tStart)*1000.0;
NSLog(@"[KBStream] finish chunks=%ld firstByte=%.0fms after start, tail=%.0fms, total=%.0fms error=%@",
(long)self.emittedChunkCount, t0, t1, tt, error);
}
// 若队列还有待输出内容,等队列清空再回调 finish
if (self.pendingQueue.count > 0) {
self.finishError = error;
[self startFlushTimerIfNeeded];
} else {
if (self.onFinish) dispatch_async(dispatch_get_main_queue(), ^{ self.onFinish(error); });
[self cancel];
}
}
#pragma mark - Helpers
- (void)emitChunk:(NSString *)rawText {
if (rawText.length == 0) return;
NSString *text = rawText;
// 0) 规范化换行与段起始:去掉位于片段开头的 \r/\n将 "\n\t"、"\r\n\t"、"\r\t" 归一为 "\t"
text = [text stringByReplacingOccurrencesOfString:@"\r\n\t" withString:@"\t"];
text = [text stringByReplacingOccurrencesOfString:@"\n\t" withString:@"\t"];
text = [text stringByReplacingOccurrencesOfString:@"\r\t" withString:@"\t"];
while (text.length > 0) {
unichar c0 = [text characterAtIndex:0];
if (c0 == '\n' || c0 == '\r') { text = [text substringFromIndex:1]; continue; }
break;
}
// 1) 统一处理 “/t” -> “\t”
if (self.treatSlashTAsTab) {
text = [text stringByReplacingOccurrencesOfString:@"/t" withString:@"\t"];
}
// 2) 仅在整体首段:去掉一个起始的 "\t",以及其后紧邻的一个空格(若存在)
if (!self.hasEmitted && self.trimLeadingTabOnce) {
if (text.length > 0 && [text characterAtIndex:0] == '\t') {
NSUInteger start = 1;
if (start < text.length && [text characterAtIndex:start] == ' ') start++;
text = [text substringFromIndex:start];
}
}
// 3) 从第二段开始:去掉每个段首的一个空格(即 “\t ” -> “\t”跨分片也处理
if (text.length > 0) {
// 跨分片:若上个分片以 \t 结尾,本分片起始的一个或多个空格去掉一个
if (self.lastChunkEndedWithTab) {
NSUInteger j = 0;
while (j < text.length && [text characterAtIndex:j] == ' ') { j++; }
if (j > 0) {
text = [text substringFromIndex:1]; // 仅去一个空格
}
}
// 同一分片内:将 “\t ” 规范化为 “\t”仅去一个空格
text = [text stringByReplacingOccurrencesOfString:@"\t " withString:@"\t"];
}
if (text.length == 0) { self.lastChunkEndedWithTab = NO; return; }
self.emittedChunkCount += 1;
if (self.loggingEnabled) {
NSLog(@"[KBStream] chunk#%ld len=%lu text=\"%@\"",
(long)self.emittedChunkCount, (unsigned long)text.length, KBPrintableSnippet(text, 160));
}
if (self.onChunk) dispatch_async(dispatch_get_main_queue(), ^{ self.onChunk(text); });
self.hasEmitted = YES;
// 记录末尾是否为分段分隔符 \t用于跨分片处理
unichar lastc = [text characterAtIndex:text.length - 1];
self.lastChunkEndedWithTab = (lastc == '\t');
}
#pragma mark - Queue/Flush
- (void)enqueueChunk:(NSString *)s {
if (s.length == 0) return;
[self.pendingQueue addObject:s];
[self startFlushTimerIfNeeded];
}
- (void)startFlushTimerIfNeeded {
if (self.flushTimer) return;
__weak typeof(self) weakSelf = self;
self.flushTimer = [NSTimer scheduledTimerWithTimeInterval:MAX(0.01, self.flushInterval)
repeats:YES
block:^(NSTimer * _Nonnull t) {
__strong typeof(weakSelf) self = weakSelf; if (!self) { [t invalidate]; return; }
if (self.pendingQueue.count == 0) {
[t invalidate]; self.flushTimer = nil;
if (self.finishError || self.finishError == nil) {
NSError *err = self.finishError; self.finishError = nil;
if (self.onFinish) dispatch_async(dispatch_get_main_queue(), ^{ self.onFinish(err); });
[self cancel];
}
return;
}
NSString *first = self.pendingQueue.firstObject;
[self.pendingQueue removeObjectAtIndex:0];
[self emitChunk:first];
}];
}
#pragma mark - Logging helpers
static NSString *KBPrintableSnippet(NSString *s, NSUInteger maxLen) {
if (!s) return @"";
NSString *x = [s stringByReplacingOccurrencesOfString:@"\n" withString:@"\\n"];
if (x.length > maxLen) {
x = [[x substringToIndex:maxLen] stringByAppendingString:@""];
}
return x;
}
@end