// // KBStreamFetcher.m // #import "KBStreamFetcher.h" @interface KBStreamFetcher () @property (nonatomic, strong) NSURLSession *session; @property (nonatomic, strong) NSURLSessionDataTask *task; @property (nonatomic, strong) NSMutableData *buffer; // 网络原始字节累加 @property (nonatomic, assign) NSStringEncoding textEncoding; // 推断得到的文本编码(默认 UTF-8) @property (nonatomic, assign) BOOL isSSE; // 是否为 SSE 响应 @property (nonatomic, strong) NSMutableString *sseTextBuffer; // SSE 文本缓冲(已解码) @property (nonatomic, assign) NSInteger decodedPrefixBytes; // 已解码并写入 sseTextBuffer 的字节数(SSE) @property (nonatomic, assign) NSInteger deliveredCharCount; // 已回传的字符数(非 SSE,用于做增量) @property (nonatomic, assign) BOOL hasEmitted; // 是否已经输出过正文(用于“首段删 1 个 \t”) @property (nonatomic, strong) NSMutableArray *pendingQueue; // 待回调的分片(节流输出) @property (nonatomic, strong) NSTimer *flushTimer; // 定时从队列取出一条回调 @property (nonatomic, strong, nullable) NSError *finishError; // 结束时的错误(需要等队列清空再回调) @end // 计算数据中以 UTF-8 编码可完整解码的“前缀字节长度”,避免切断多字节字符 static NSUInteger kb_validUTF8PrefixLen(NSData *data) { const unsigned char *bytes = (const unsigned char *)data.bytes; NSUInteger n = data.length; if (n == 0) return 0; NSInteger i = (NSInteger)n - 1; while (i >= 0 && (bytes[i] & 0xC0) == 0x80) { i--; } // 10xxxxxx 续字节 if (i < 0) return 0; // 全是续字节,等下次 unsigned char b = bytes[i]; NSUInteger expected = 1; if ((b & 0x80) == 0x00) expected = 1; // 0xxxxxxx else if ((b & 0xE0) == 0xC0) expected = 2; // 110xxxxx else if ((b & 0xF0) == 0xE0) expected = 3; // 1110xxxx else if ((b & 0xF8) == 0xF0) expected = 4; // 11110xxx else return (NSUInteger)i; // 非法起始,截到 i 之前 NSUInteger remain = n - (NSUInteger)i; return (remain >= expected) ? n : (NSUInteger)i; } @implementation KBStreamFetcher + (instancetype)fetcherWithURL:(NSURL *)url { KBStreamFetcher *f = [[self alloc] init]; f.url = url; return f; } - (instancetype)init { if (self = [super init]) { _acceptEventStream = NO; _disableCompression = YES; _treatSlashTAsTab = YES; _trimLeadingTabOnce = YES; _requestTimeout = 30.0; _textEncoding = NSUTF8StringEncoding; _buffer = [NSMutableData data]; _sseTextBuffer = [NSMutableString string]; _pendingQueue = [NSMutableArray array]; _flushInterval = 0.10; _splitLargeDeltasOnWhitespace = YES; } return self; } - (void)start { if (!self.url) return; [self cancel]; NSURLSessionConfiguration *cfg = [NSURLSessionConfiguration defaultSessionConfiguration]; cfg.requestCachePolicy = NSURLRequestReloadIgnoringLocalCacheData; cfg.timeoutIntervalForRequest = self.requestTimeout; cfg.timeoutIntervalForResource = MAX(self.requestTimeout, 60.0); self.session = [NSURLSession sessionWithConfiguration:cfg delegate:self delegateQueue:[NSOperationQueue mainQueue]]; NSMutableURLRequest *req = [NSMutableURLRequest requestWithURL:self.url]; req.HTTPMethod = @"GET"; if (self.disableCompression) { [req setValue:@"identity" forHTTPHeaderField:@"Accept-Encoding"]; } if (self.acceptEventStream) { [req setValue:@"text/event-stream" forHTTPHeaderField:@"Accept"]; } [req setValue:@"no-cache" forHTTPHeaderField:@"Cache-Control"]; [req setValue:@"keep-alive" forHTTPHeaderField:@"Connection"]; [self.extraHeaders enumerateKeysAndObjectsUsingBlock:^(NSString *k, NSString *v, BOOL *stop){ [req setValue:v forHTTPHeaderField:k]; }]; // 状态复位 [self.buffer setLength:0]; [self.sseTextBuffer setString:@""]; self.isSSE = NO; self.textEncoding = NSUTF8StringEncoding; self.decodedPrefixBytes = 0; self.deliveredCharCount = 0; self.hasEmitted = NO; [self.pendingQueue removeAllObjects]; [self.flushTimer invalidate]; self.flushTimer = nil; self.finishError = nil; self.task = [self.session dataTaskWithRequest:req]; [self.task resume]; } - (void)cancel { [self.task cancel]; self.task = nil; [self.session invalidateAndCancel]; self.session = nil; [self.buffer setLength:0]; [self.sseTextBuffer setString:@""]; self.decodedPrefixBytes = 0; self.deliveredCharCount = 0; self.hasEmitted = NO; [self.pendingQueue removeAllObjects]; [self.flushTimer invalidate]; self.flushTimer = nil; self.finishError = nil; } #pragma mark - NSURLSessionDataDelegate - (void)URLSession:(NSURLSession *)session dataTask:(NSURLSessionDataTask *)dataTask didReceiveResponse:(NSURLResponse *)response completionHandler:(void (^)(NSURLSessionResponseDisposition))completionHandler { self.isSSE = NO; self.textEncoding = NSUTF8StringEncoding; if ([response isKindOfClass:[NSHTTPURLResponse class]]) { NSHTTPURLResponse *r = (NSHTTPURLResponse *)response; NSString *ct = r.allHeaderFields[@"Content-Type"] ?: r.allHeaderFields[@"content-type"]; if ([ct isKindOfClass:[NSString class]]) { NSString *lower = [ct lowercaseString]; if ([lower containsString:@"text/event-stream"]) self.isSSE = YES; NSRange pos = [lower rangeOfString:@"charset="]; if (pos.location != NSNotFound) { NSString *charset = [[lower substringFromIndex:pos.location + pos.length] componentsSeparatedByString:@";"][0]; if ([charset containsString:@"utf-8"] || [charset containsString:@"utf8"]) { self.textEncoding = NSUTF8StringEncoding; } else if ([charset containsString:@"iso-8859-1"] || [charset containsString:@"latin1"]) { self.textEncoding = NSISOLatin1StringEncoding; } } } } [self.sseTextBuffer setString:@""]; self.decodedPrefixBytes = 0; if (completionHandler) completionHandler(NSURLSessionResponseAllow); } - (void)URLSession:(NSURLSession *)session dataTask:(NSURLSessionDataTask *)dataTask didReceiveData:(NSData *)data { if (data.length == 0) return; [self.buffer appendData:data]; NSUInteger validLen = (self.textEncoding == NSUTF8StringEncoding) ? kb_validUTF8PrefixLen(self.buffer) : self.buffer.length; if (validLen == 0) return; // 末尾可能卡着半个字符 if (self.isSSE) { if ((NSUInteger)self.decodedPrefixBytes < validLen) { NSRange rng = NSMakeRange((NSUInteger)self.decodedPrefixBytes, validLen - (NSUInteger)self.decodedPrefixBytes); NSString *piece = [[NSString alloc] initWithBytes:(const char *)self.buffer.bytes + rng.location length:rng.length encoding:self.textEncoding]; if (piece.length > 0) { [self.sseTextBuffer appendString:piece]; self.decodedPrefixBytes = (NSInteger)validLen; } } // 统一换行并按 SSE 事件 \n\n 切开 if (self.sseTextBuffer.length > 0) { NSString *normalized = [self.sseTextBuffer stringByReplacingOccurrencesOfString:@"\r\n" withString:@"\n"]; [self.sseTextBuffer setString:normalized]; while (1) { NSRange sep = [self.sseTextBuffer rangeOfString:@"\n\n"]; // 完整事件 if (sep.location == NSNotFound) break; NSString *event = [self.sseTextBuffer substringToIndex:sep.location]; [self.sseTextBuffer deleteCharactersInRange:NSMakeRange(0, sep.location + sep.length)]; // 合并 data: 行为正文 NSArray *lines = [event componentsSeparatedByString:@"\n"]; NSMutableString *payload = [NSMutableString string]; for (NSString *ln in lines) { if ([ln hasPrefix:@"data:"]) { NSString *v = [ln substringFromIndex:5]; if (v.length > 0 && [v hasPrefix:@" "]) v = [v substringFromIndex:1]; [payload appendString:v ?: @""]; [payload appendString:@"\n"]; // 多 data 行合并 } } if (payload.length > 0 && [payload hasSuffix:@"\n"]) { [payload deleteCharactersInRange:NSMakeRange(payload.length - 1, 1)]; } if (payload.length > 0) { [self enqueueChunk:payload]; } } } return; } // 非 SSE:直接对“可解码前缀”做增量输出 NSString *prefix = [[NSString alloc] initWithBytes:self.buffer.bytes length:validLen encoding:self.textEncoding]; if (!prefix) return; if (self.deliveredCharCount < (NSInteger)prefix.length) { NSString *delta = [prefix substringFromIndex:self.deliveredCharCount]; self.deliveredCharCount = prefix.length; if (self.splitLargeDeltasOnWhitespace && delta.length > 16) { // 按空格切词逐条回调(保留空格,使观感更自然) NSArray *parts = [delta componentsSeparatedByString:@" "]; for (NSUInteger i = 0; i < parts.count; i++) { NSString *w = parts[i]; if (w.length == 0) { [self enqueueChunk:@" "]; continue; } if (i + 1 < parts.count) { [self enqueueChunk:[w stringByAppendingString:@" "]]; } else { [self enqueueChunk:w]; } } } else { [self enqueueChunk:delta]; } } } - (void)URLSession:(NSURLSession *)session task:(NSURLSessionTask *)task didCompleteWithError:(NSError *)error { if (!error && self.isSSE && self.sseTextBuffer.length > 0) { // 处理最后一条未以 \n\n 结束的事件 NSString *normalized = [self.sseTextBuffer stringByReplacingOccurrencesOfString:@"\r\n" withString:@"\n"]; NSArray *lines = [normalized componentsSeparatedByString:@"\n"]; NSMutableString *payload = [NSMutableString string]; for (NSString *ln in lines) { if ([ln hasPrefix:@"data:"]) { NSString *v = [ln substringFromIndex:5]; if (v.length > 0 && [v hasPrefix:@" "]) v = [v substringFromIndex:1]; [payload appendString:v ?: @""]; [payload appendString:@"\n"]; } } if (payload.length > 0 && [payload hasSuffix:@"\n"]) { [payload deleteCharactersInRange:NSMakeRange(payload.length - 1, 1)]; } if (payload.length > 0) { [self emitChunk:payload]; } } // 若队列还有待输出内容,等队列清空再回调 finish if (self.pendingQueue.count > 0) { self.finishError = error; [self startFlushTimerIfNeeded]; } else { if (self.onFinish) dispatch_async(dispatch_get_main_queue(), ^{ self.onFinish(error); }); [self cancel]; } } #pragma mark - Helpers - (void)emitChunk:(NSString *)rawText { if (rawText.length == 0) return; NSString *text = rawText; if (self.treatSlashTAsTab) { text = [text stringByReplacingOccurrencesOfString:@"/t" withString:@"\t"]; } if (!self.hasEmitted && self.trimLeadingTabOnce) { // 跳过前导空白,只删除“一个”起始的 \t NSUInteger i = 0; while (i < text.length) { unichar c = [text characterAtIndex:i]; if (c == ' ' || c == '\r' || c == '\n') { i++; continue; } break; } if (i < text.length && [text characterAtIndex:i] == '\t') { NSMutableString *m = [text mutableCopy]; [m deleteCharactersInRange:NSMakeRange(i, 1)]; text = m; } } if (text.length == 0) return; if (self.onChunk) dispatch_async(dispatch_get_main_queue(), ^{ self.onChunk(text); }); self.hasEmitted = YES; } #pragma mark - Queue/Flush - (void)enqueueChunk:(NSString *)s { if (s.length == 0) return; [self.pendingQueue addObject:s]; [self startFlushTimerIfNeeded]; } - (void)startFlushTimerIfNeeded { if (self.flushTimer) return; __weak typeof(self) weakSelf = self; self.flushTimer = [NSTimer scheduledTimerWithTimeInterval:MAX(0.01, self.flushInterval) repeats:YES block:^(NSTimer * _Nonnull t) { __strong typeof(weakSelf) self = weakSelf; if (!self) { [t invalidate]; return; } if (self.pendingQueue.count == 0) { [t invalidate]; self.flushTimer = nil; if (self.finishError || self.finishError == nil) { NSError *err = self.finishError; self.finishError = nil; if (self.onFinish) dispatch_async(dispatch_get_main_queue(), ^{ self.onFinish(err); }); [self cancel]; } return; } NSString *first = self.pendingQueue.firstObject; [self.pendingQueue removeObjectAtIndex:0]; [self emitChunk:first]; }]; } @end