# -*- coding: utf-8 -*- import json import logging import os import socket import threading import time from pathlib import Path from queue import Queue from typing import Any, Dict, List import anyio import wda from quart import Quart, request, g from quart_cors import cors import Entity.Variables as ev from Entity import Variables from Entity.ResultData import ResultData from Entity.Variables import addModelToAnchorList, wdaFunctionPort from Utils.AiUtils import AiUtils from Utils.ControlUtils import ControlUtils from Utils.IOSAIStorage import IOSAIStorage from Utils.JsonUtils import JsonUtils from Utils.LogManager import LogManager from Utils.ThreadManager import ThreadManager from script.ScriptManager import ScriptManager for name in ('werkzeug', 'werkzeug.serving'): log = logging.getLogger(name) log.disabled = True log.propagate = False log.handlers.clear() app = Quart(__name__) # ⭐ 这里用 Quart,而不是 Flask app = cors(app, allow_origin="*") # 允许所有来源跨域 app.config['JSON_AS_ASCII'] = False # Flask jsonify 不转义中文/emoji app.config['JSONIFY_MIMETYPE'] = "application/json; charset=utf-8" # ============ 设备状态内存表 ============ listData = [] listLock = threading.Lock() # 历史遗留:不再消费队列,改为socket线程直接落地 dataQueue = Queue() # ---- 黏性快照(避免瞬时空) ---- _last_nonempty_snapshot = [] _last_snapshot_ts = 0.0 _STICKY_TTL_SEC = 10.0 # 在瞬时空时,回退到上一份非空快照10秒 _empty_logged = False _recovered_logged = False # ===== 设备集合变化跟踪 ===== change_version = 0 _device_ids_snapshot = set() _last_device_count = 0 def _log_device_changes(action: str): """记录设备集合增删变化""" global _device_ids_snapshot, change_version curr_ids = {d.get("deviceId") for d in listData if _is_online(d)} added = curr_ids - _device_ids_snapshot removed = _device_ids_snapshot - curr_ids if added or removed: change_version += 1 try: LogManager.info(f"[DEVICE][CHANGED][{action}] rev={change_version} count={len(curr_ids)} added={list(added)} removed={list(removed)}") except Exception: print(f"[DEVICE][CHANGED][{action}] rev={change_version} count={len(curr_ids)} added={list(added)} removed={list(removed)}") _device_ids_snapshot = curr_ids def _normalize_type(v) -> int: """把各种表示在线/离线的值,规范成 1/0""" if isinstance(v, bool): return 1 if v else 0 if isinstance(v, (int, float)): return 1 if int(v) == 1 else 0 if isinstance(v, str): s = v.strip().lower() if s.isdigit(): return 1 if int(s) == 1 else 0 if s in ("true", "online", "on", "yes"): return 1 return 0 return 1 if v else 0 def _is_online(d: Dict[str, Any]) -> bool: return _normalize_type(d.get("type", 1)) == 1 def _apply_device_snapshot(devices: List[Dict[str, Any]]): """接收 DeviceInfo 送来的全量设备列表,直接覆盖 listData""" global listData try: normed = [] for d in devices: # 拷贝一份,避免引用共享 d = dict(d) d["type"] = _normalize_type(d.get("type", 1)) # 规范成 0/1 normed.append(d) with listLock: before = len(listData) listData[:] = normed # 全量覆盖 _log_device_changes("SNAPSHOT") try: LogManager.info(f"[DEVICE][SNAPSHOT] size={len(normed)} (was={before})") except Exception: print(f"[DEVICE][SNAPSHOT] size={len(normed)} (was={before})") except Exception as e: LogManager.error(f"[DEVICE][SNAPSHOT][ERROR] {e}") def _apply_device_event(obj: Dict[str, Any]): """把单条设备上线/下线事件落到 listData,并打印关键日志""" try: dev_id = obj.get("deviceId") typ = _normalize_type(obj.get("type", 1)) obj["type"] = typ # 写回规范后的值,避免后续被误判 if dev_id is None: LogManager.warning(f"[DEVICE][WARN] missing deviceId in obj={obj}") return with listLock: before = len(listData) # 删除同 udid 旧记录 listData[:] = [d for d in listData if d.get("deviceId") != dev_id] if typ == 1: listData.append(obj) # 上线 LogManager.info(f"[DEVICE][UPSERT] id={dev_id} type={typ} size={len(listData)} (replaced={before - (len(listData)-1)})") _log_device_changes("UPSERT") else: LogManager.warning(f"[DEVICE][REMOVE] id={dev_id} type={typ} size={len(listData)} (removed_prev={before - len(listData)})") _log_device_changes("REMOVE") except Exception as e: LogManager.error(f"[DEVICE][APPLY_EVT][ERROR] {e}") # ============ 设备事件 socket 监听 ============ def _handle_conn(conn: socket.socket, addr): """统一的连接处理函数(拆 JSON 行 → 正常化 type → 应用到 listData)""" try: with conn: try: conn.settimeout(3.0) # 避免永久阻塞 except Exception: pass buffer = "" while True: try: data = conn.recv(1024) if not data: # 对端关闭 break buffer += data.decode('utf-8', errors='ignore') # 按行切 JSON;发送端每条以 '\n' 结尾 while True: line, sep, buffer = buffer.partition('\n') if not sep: break line = line.strip() if not line: continue try: obj = json.loads(line) except json.JSONDecodeError as e: LogManager.warning(f"[SOCKET][WARN] 非法 JSON 丢弃: {line[:120]} err={e}") continue # === 新增:如果是全量快照(包含 devices 字段) === if "devices" in obj: devs = obj.get("devices") or [] LogManager.info(f"[SOCKET][RECV][SNAPSHOT] size={len(devs)} keys={list(obj.keys())}") try: _apply_device_snapshot(devs) LogManager.info(f"[SOCKET][APPLY][SNAPSHOT] size={len(devs)}") except Exception as e: LogManager.error(f"[DEVICE][APPLY_SNAPSHOT][ERROR] {e}") continue # 处理完这一条,继续下一条 JSON # === 否则按原来的单条设备事件处理(兼容旧逻辑) === dev_id = obj.get("deviceId") typ = _normalize_type(obj.get("type", 1)) obj["type"] = typ # 规范 1/0 LogManager.info(f"[SOCKET][RECV] deviceId={dev_id} type={typ} keys={list(obj.keys())}") try: _apply_device_event(obj) # 保持你原来的增删逻辑 LogManager.info(f"[SOCKET][APPLY] deviceId={dev_id} type={typ}") except Exception as e: # 单条业务异常不让线程死 LogManager.error(f"[DEVICE][APPLY_EVT][ERROR] {e}") except (socket.timeout, ConnectionResetError, BrokenPipeError): # 连接级异常:关闭该连接,回到 accept break except Exception as e: LogManager.warning(f"[SOCKET][WARN] recv error: {e}") break except Exception as e: LogManager.error(f"[SOCKET][ERROR] 连接处理异常: {e}") def start_socket_listener(): """启动设备事件监听(仅走 FLASK_COMM_PORT;增强健壮性,不改业务)""" # 统一使用 FLASK_COMM_PORT,默认 34566 port = int(os.getenv('FLASK_COMM_PORT', 34566)) LogManager.info(f"Received port from environment: {port}") print(f"Received port from environment: {port}") if port <= 0: LogManager.info("未获取到通信端口,跳过Socket监听") print("未获取到通信端口,跳过Socket监听") return backoff = 0.5 # 自愈退避,起于 0.5s,上限 8s while True: s = None try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) except Exception as e: LogManager.warning(f"[SOCKET][WARN] setsockopt SO_REUSEADDR failed: {e}") try: s.bind(('127.0.0.1', port)) print(f"[INFO] Socket successfully bound to port {port}") LogManager.info(f"[INFO] Socket successfully bound to port {port}") except Exception as bind_error: print(f"[ERROR]端口绑定失败: {bind_error}") LogManager.info(f"[ERROR]端口绑定失败: {bind_error}") # 绑定失败通常是端口未释放/竞争,退避后重试 time.sleep(backoff) backoff = min(backoff * 2, 8.0) continue s.listen(256) try: s.settimeout(1.5) # accept 超时,便于检查自愈循环 except Exception: pass LogManager.info(f"[INFO] Socket listener started on port {port}, waiting for connections...") print(f"[INFO] Socket listener started on port {port}, waiting for connections...") # 监听成功 → 退避复位 backoff = 0.5 while True: try: conn, addr = s.accept() except socket.timeout: # 定期“透气”,避免永久卡死;继续等待 continue except Exception as e: # 发生 accept 级错误:重建 socket(进入外层 while 自愈) LogManager.error(f"[ERROR] accept 失败: {e}") break # 每个连接独立线程处理,保持你原来的做法 threading.Thread(target=_handle_conn, args=(conn, addr), daemon=True).start() except Exception as e: # 任意未兜住的异常,记录并进入退避自愈 LogManager.error(f"[SOCKET][ERROR] 监听主循环异常: {e}") time.sleep(backoff) backoff = min(backoff * 2, 8.0) finally: try: if s: s.close() except Exception: pass # 独立线程启动 Socket 服务 + 看门狗 def bootstrap_server_side_effects(): # 仅在真正的 Flask 进程里启动副作用(监听、定时器、MQ 等) listener_thread = threading.Thread(target=start_socket_listener, daemon=True) listener_thread.start() # 获取app def get_app(): return app @app.before_request def _log_request_start(): g._start_ts = time.time() LogManager.info( text=f"[HTTP] START {request.method} {request.path}", udid="flask" ) @app.after_request def _log_request_end(response): cost = time.time() - getattr(g, "_start_ts", time.time()) LogManager.info( text=f"[HTTP] END {request.method} {request.path} {response.status_code} in {cost:.3f}s", udid="flask" ) return response # ============ API 路由 ============ @app.route('/deviceList', methods=['GET']) async def deviceList(): global _last_device_count, change_version global _last_nonempty_snapshot, _last_snapshot_ts, _STICKY_TTL_SEC global _empty_logged, _recovered_logged try: with listLock: # 宽容判定在线(字符串'1'/'true'/True 都算) data = [d for d in listData if _is_online(d)] now = time.time() # 记录最近一次非空快照 if data: _last_nonempty_snapshot = data.copy() _last_snapshot_ts = now if _recovered_logged: LogManager.info(f"[API][deviceList][RECOVERED] count={len(data)} rev={change_version}") _recovered_logged = False _empty_logged = False else: # 瞬时空:在 TTL 内回退上一份非空快照 if _last_nonempty_snapshot and (now - _last_snapshot_ts) <= _STICKY_TTL_SEC: LogManager.warning(f"[API][deviceList][STICKY] serving last non-empty snapshot count={len(_last_nonempty_snapshot)} age={now - _last_snapshot_ts:.1f}s rev={change_version}") return ResultData(data=_last_nonempty_snapshot).toJson() if not _empty_logged: LogManager.error(f"[API][deviceList][DROP_TO_EMPTY] last_count={_last_device_count} rev={change_version}") _empty_logged = True _recovered_logged = True _last_device_count = len(data) LogManager.info(f"[API][deviceList] return_count={len(data)} rev={change_version}") return ResultData(data=data).toJson() except Exception as e: LogManager.error(f"[API][deviceList] error={e}") return ResultData(data=[]).toJson() @app.route('/passToken', methods=['POST']) async def passToken(): data = await request.get_json() print(json.dumps(data)) return ResultData(data="").toJson() # 获取设备应用列表 @app.route('/deviceAppList', methods=['POST']) async def deviceAppList(): param = await request.get_json() udid = param["udid"] apps = ControlUtils.getDeviceAppList(udid) return ResultData(data=apps).toJson() # 打开指定app @app.route('/launchApp', methods=['POST']) async def launchApp(): body = await request.get_json() udid = body.get("udid") bundleId = body.get("bundleId") t = wda.USBClient(udid, wdaFunctionPort) t.session().app_start(bundleId) return ResultData(data="").toJson() # 回到首页 @app.route('/toHome', methods=['POST']) async def toHome(): body = await request.get_json() udid = body.get("udid") client = wda.USBClient(udid, wdaFunctionPort) client.home() return ResultData(data="").toJson() # 点击事件 @app.route('/tapAction', methods=['POST']) async def tapAction(): body = await request.get_json() udid = body.get("udid") client = wda.USBClient(udid, wdaFunctionPort) print("-----------------------") print(client) print("-----------------------") session = client.session() session.appium_settings({"snapshotMaxDepth": 0}) x = body.get("x") y = body.get("y") session.tap(x, y) return ResultData(data="").toJson() # 拖拽事件 @app.route('/swipeAction', methods=['POST']) async def swipeAction(): body = await request.get_json() udid = body.get("udid") duration = body.get("duration") # 时长 sx = body.get("sx") # 起始X点 sy = body.get("sy") # 起始Y点 ex = body.get("ex") # 结束X点 ey = body.get("ey") # 结束Y点 client = wda.USBClient(udid, wdaFunctionPort) session = client.session() session.appium_settings({"snapshotMaxDepth": 0}) session.swipe(sx, sy, ex, ey, duration) return ResultData(data="").toJson() # 长按事件 @app.route('/longPressAction', methods=['POST']) async def longPressAction(): body = await request.get_json() udid = body.get("udid") x = body.get("x") y = body.get("y") client = wda.USBClient(udid, wdaFunctionPort) session = client.session() session.appium_settings({"snapshotMaxDepth": 5}) session.tap_hold(x, y, 1.0) return ResultData(data="").toJson() # 养号 @app.route('/growAccount', methods=['POST']) async def growAccount(): body = await request.get_json() udid = body.get("udid") Variables.commentList = body.get("comment") isComment = body.get("isComment") manager = ScriptManager() event = threading.Event() # 启动脚本 thread = threading.Thread(target=manager.growAccount, args=(udid, isComment, event,)) # 添加到线程管理 code, msg = ThreadManager.add(udid, thread, event) return ResultData(data="", code=code, message=msg).toJson() # 观看直播 @app.route("/watchLiveForGrowth", methods=['POST']) async def watchLiveForGrowth(): body = await request.get_json() udid = body.get("udid") manager = ScriptManager() event = threading.Event() thread = threading.Thread(target=manager.watchLiveForGrowth, args=(udid, event)) # 添加到线程管理 ThreadManager.add(udid, thread, event) return ResultData(data="").toJson() # 停止脚本 @app.route("/stopScript", methods=['POST']) async def stopScript(): body = await request.get_json() udid = body.get("udid") LogManager.method_info(f"接口收到 /stopScript udid={udid}", method="task") code, msg = ThreadManager.stop(udid) return ResultData(code=code, data=[], message=msg).toJson() # 关注打招呼 @app.route('/passAnchorData', methods=['POST']) async def passAnchorData(): try: LogManager.method_info("关注打招呼", "关注打招呼") data: Dict[str, Any] = await request.get_json() # 设备列表 idList = data.get("deviceList", []) # 主播列表 acList = data.get("anchorList", []) Variables.commentList = data.get("comment") isComment = data.get("isComment") LogManager.info(f"[INFO] 获取数据: {idList} {acList}") AiUtils.save_aclist_flat_append(acList) # 是否需要回复 needReply = data.get("needReply", False) # 获取打招呼数据 ev.prologueList = data.get("prologueList", []) needTranslate = data.get("needTranslate", False) # 添加主播数据 addModelToAnchorList(acList) failed_ids = [] # 启动线程,执行脚本(单个设备异常不影响其它设备) for udid in idList: try: manager = ScriptManager() event = threading.Event() thread = threading.Thread( target=manager.safe_greetNewFollowers, args=(udid, needReply, isComment, needTranslate, event,), ) ThreadManager.add(udid, thread, event) except Exception as e: failed_ids.append(udid) LogManager.error(f"[passAnchorData] 设备 {udid} 启动脚本失败: {e}") # 如果所有设备都失败,可以考虑返回错误码 if failed_ids and len(failed_ids) == len(idList): return ResultData( data="", code=1001, message=f"所有设备启动失败: {failed_ids}" ).toJson() # 部分失败也算整体成功,只是记录一下 if failed_ids: LogManager.warning(f"[passAnchorData] 部分设备启动失败: {failed_ids}") return ResultData(data="").toJson() except Exception as e: LogManager.error(e) return ResultData(data="", code=1001).toJson() @app.route('/followAndGreetUnion', methods=['POST']) async def followAndGreetUnion(): try: LogManager.method_info("关注打招呼", "关注打招呼(联盟号)") data: Dict[str, Any] = await request.get_json() # 设备列表 idList = data.get("deviceList", []) # 主播列表 acList = data.get("anchorList", []) LogManager.info(f"[INFO] 获取数据: {idList} {acList}") AiUtils.save_aclist_flat_append(acList) # 是否需要回复 needReply = data.get("needReply", True) needTranslate = data.get("needTranslate", False) # 获取打招呼数据 ev.prologueList = data.get("prologueList", []) # 添加主播数据 addModelToAnchorList(acList) failed_ids = [] # 启动线程,执行脚本(单个设备异常不影响其它设备) for udid in idList: try: manager = ScriptManager() event = threading.Event() thread = threading.Thread( target=manager.safe_followAndGreetUnion, args=(udid, needReply, needTranslate, event), ) ThreadManager.add(udid, thread, event) except Exception as e: failed_ids.append(udid) LogManager.error(f"[followAndGreetUnion] 设备 {udid} 启动脚本失败: {e}") # 如果所有设备都失败,可以返回错误码 if failed_ids and len(failed_ids) == len(idList): return ResultData( data="", code=1001, message=f"所有设备启动失败: {failed_ids}", ).toJson() # 部分失败也算整体成功,只是记录一下 if failed_ids: LogManager.warning(f"[followAndGreetUnion] 部分设备启动失败: {failed_ids}") return ResultData(data="").toJson() except Exception as e: LogManager.error(f"[followAndGreetUnion] 接口级异常: {e}") return ResultData(data="", code=1001).toJson() # 获取私信数据 @app.route("/getPrologueList", methods=['GET']) async def getPrologueList(): import Entity.Variables as Variables return ResultData(data=Variables.prologueList).toJson() # 添加临时数据 # 批量追加主播到 JSON 文件 @app.route("/addTempAnchorData", methods=['POST']) async def addTempAnchorData(): """ 请求体支持: - 单个对象:{"anchorId": "xxx", "country": "CN"} - 对象数组:[{"anchorId": "xxx", "country": "CN"}, {"anchorId": "yyy", "country": "US"}] """ data = await request.get_json() if not data: return ResultData(code=400, message="请求数据为空").toJson() # 追加到 JSON 文件 AiUtils.save_aclist_flat_append(data, "data/acList.json") return ResultData(data="ok").toJson() # 获取当前屏幕上的聊天信息 @app.route("/getChatTextInfo", methods=['POST']) async def getChatTextInfo(): data = await request.get_json() udid = data.get("udid") client = wda.USBClient(udid,wdaFunctionPort) session = client.session() xml = session.source() try: result = AiUtils.extract_messages_from_xml(xml) last_in = None last_out = None for item in reversed(result): # 从后往前找 if item.get('type') != 'msg': continue if last_in is None and item['dir'] == 'in': last_in = item['text'] if last_out is None and item['dir'] == 'out': last_out = item['text'] if last_in is not None and last_out is not None: break print(f"检测出对方的最后一条数据:{last_in},{type(last_in)}") print(f"检测出我的最后一条数据:{last_out},{type(last_out)}") return ResultData(data=result).toJson() except Exception as e: LogManager.error(f"获取屏幕翻译出现错误:{e}", "获取屏幕翻译") data = [ { 'type': 'massage', 'dir': 'in', 'text': '当前页面无法获取聊天记录,请在tiktok聊天页面进行获取!!!' }, { 'type': 'massage', 'dir': 'in', 'text': 'Unable to retrieve chat messages on the current screen. Please navigate to the TikTok chat page and try again!!!' } ] return ResultData(data=data, message="解析失败").toJson() # 监控消息 @app.route("/replyMessages", methods=['POST']) async def monitorMessages(): LogManager.method_info("开始监控消息,监控消息脚本启动", "监控消息") body = await request.get_json() udid = body.get("udid") # Variables.commentList = body.get("comment") manager = ScriptManager() event = threading.Event() thread = threading.Thread(target=manager.replyMessages, args=(udid, event)) LogManager.method_info("创建监控消息脚本线程成功", "监控消息") # 添加到线程管理 ThreadManager.add(udid, thread, event) return ResultData(data="").toJson() # 上传日志 @app.route("/setLoginInfo", methods=['POST']) async def upLoadLogLogs(): data = await request.get_json() # 解析 JSON token = data.get("token") userId = data.get("userId") tenantId = data.get("tenantId") ok = LogManager.upload_all_logs("http://47.79.98.113:8101/api/log/upload", token, userId, tenantId) if ok: return ResultData(data="日志上传成功").toJson() else: return ResultData(data="", message="日志上传失败").toJson() # 获取当前的主播列表数据 @app.route("/anchorList", methods=['POST']) async def queryAnchorList(): # 项目根目录(当前文件在 infos 下,回退两层到根目录) root_dir = Path(__file__).resolve().parent.parent file_path = root_dir / "data" / "acList.json" data = [] if file_path.exists(): try: with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) except Exception as e: LogManager.error(f"[anchorList] 读取失败: {e}") data = [] return ResultData(data=data).toJson() # 修改当前的主播列表数据 @app.route("/updateAnchorList", methods=['POST']) async def updateAnchorList(): """ invitationType: 1 普票 2 金票 state: 1 通行(True) / 0 不通行(False) """ data = await request.get_json(force=True, silent=True) or {} invitationType = data.get("invitationType") state = bool(data.get("state")) # 转成布尔 # 要更新成的值 new_status = 1 if state else 0 # 用工具类解析路径,避免 cwd 影响 file_path = AiUtils._resolve_path("data/acList.json") # 加载 JSON try: doc = json.loads(file_path.read_text(encoding="utf-8-sig")) except Exception as e: LogManager.error(f"[updateAnchorList] 读取失败: {e}") return ResultData(code=1001, message=f"暂无数据").toJson() # 定位 anchorList if isinstance(doc, list): acList = doc wrapper = None elif isinstance(doc, dict) and isinstance(doc.get("anchorList"), list): acList = doc["anchorList"] wrapper = doc else: return ResultData(code=500, message="文件格式不合法").toJson() # 遍历并更新 updated = 0 for item in acList: if isinstance(item, dict) and item.get("invitationType") == invitationType: item["state"] = new_status updated += 1 # 写回(保持原始结构) try: file_path.parent.mkdir(parents=True, exist_ok=True) to_write = wrapper if wrapper is not None else acList file_path.write_text(json.dumps(to_write, ensure_ascii=False, indent=2), encoding="utf-8") except Exception as e: LogManager.error(f"[updateAnchorList] 写入失败: {e}") return ResultData(code=500, message=f"写入失败: {e}").toJson() if updated: return ResultData(data=updated, message=f"已更新 {updated} 条记录").toJson() else: return ResultData(data=0, message="未找到符合条件的记录").toJson() # 删除主播 @app.route("/deleteAnchorWithIds", methods=['POST']) async def deleteAnchorWithIds(): ls: list[dict] = await request.get_json() # [{"anchorId": "xxx"}, ...] ids = [d.get("anchorId") for d in ls if d.get("anchorId")] deleted = AiUtils.delete_anchors_by_ids(ids) return ResultData(data={"deleted": deleted}).toJson() # 配置ai人设 @app.route("/aiConfig", methods=['POST']) async def aiConfig(): data = await request.get_json() agentName = data.get("agentName") guildName = data.get("guildName") contactTool = data.get("contactTool") contact = data.get("contact") age = data.get("age") sex = data.get("sex") height = data.get("height") weight = data.get("weight") body_features = data.get("body_features") nationality = data.get("nationality") personality = data.get("personality") strengths = data.get("strengths") dict = { "agentName": agentName, "guildName": guildName, "contactTool": contactTool, "contact": contact, "age": age, "sex": sex, "height": height, "weight": weight, "body_features": body_features, "nationality": nationality, "personality": personality, "strengths": strengths, "api-key": "app-sdRfZy2by9Kq7uJg7JdOSVr8" } # JsonUtils.write_json("aiConfig", dict) IOSAIStorage.overwrite(dict, "aiConfig.json") return ResultData(data="").toJson() # 查询主播聊天发送的最后一条信息 @app.route("/select_last_message", methods=['GET']) async def select_last_message(): data = JsonUtils.query_all_json_items() return ResultData(data=data).toJson() # 修改消息(已读改成未读) @app.route("/update_last_message", methods=['POST']) async def update_last_message(): data = await request.get_json() # 解析 JSON sender = data.get("sender") udid = data.get("device") text = data.get("text") updated_count = JsonUtils.update_json_items( match={"sender": sender, "text": text}, # 匹配条件 patch={"status": 1}, # 修改内容 filename="log/last_message.json", # 要修改的文件 multi=True # 只改第一条匹配的 ) if updated_count > 0: return ResultData(data=updated_count, message="修改成功").toJson() return ResultData(data=updated_count, message="修改失败").toJson() # 删除已读消息 @app.route("/delete_last_message", methods=['POST']) async def delete_last_message(): data = await request.get_json() # 解析 JSON sender = data.get("sender") udid = data.get("device") text = data.get("text") updated_count = JsonUtils.delete_json_items( match={"sender": sender, "text": text}, # 匹配条件 filename="log/last_message.json", # 要修改的文件 multi=True # 只改第一条匹配的 ) if updated_count > 0: return ResultData(data=updated_count, message="修改成功").toJson() return ResultData(data=updated_count, message="修改失败").toJson() # 停止所有任务 @app.route("/stopAllTask", methods=['POST']) async def stopAllTask(): idList = await request.get_json() code, msg, data = ThreadManager.batch_stop(idList) return ResultData(code, data, msg).toJson() # 切换账号 @app.route('/changeAccount', methods=['POST']) async def changeAccount(): body = await request.get_json() udid = body.get("udid") if not udid: return ResultData(data="", code=400, message="缺少 udid").toJson() manager = ScriptManager() threading.Event() # 启动脚本 code, msg = manager.changeAccount(udid) # thread = threading.Thread(target=, args=(udid,)) # # 添加到线程管理 # thread.start() return ResultData(data="", code=code, message=msg).toJson() # 查看设备网络状态 @app.route('/getDeviceNetStatus', methods=['POST']) async def getDeviceNetStatus(): body = await request.get_json() udid = body.get("udid") # 同步且超级慢的逻辑 → 丢到线程池,不阻塞事件循环 def _work(): client = wda.USBClient(udid, wdaFunctionPort) r = client.getNetWorkStatus() return r.get("value") value = await anyio.to_thread.run_sync(_work) return ResultData(data=value, code=200).toJson() # 获取ai配置 @app.route("/getAiConfig", methods=['GET']) def getAiConfig(): data = IOSAIStorage.load("aiConfig.json") return ResultData(data=data).toJson() # 重新开启tiktok @app.route("/restartTikTok", methods=['POST']) async def restartTikTok(): json = await request.get_json() udid = json.get("udid") client = wda.USBClient(udid, wdaFunctionPort) session = client.session() ControlUtils.closeTikTok(session, udid) time.sleep(1) ControlUtils.openTikTok(session, udid) return ResultData(data="").toJson() # 健康检查 @app.get("/health") async def health(): return {"status": "ok"} if __name__ == '__main__': # 只有“明确是 Flask 进程”才跑副作用(通过 APP_ROLE 控制) app.run("0.0.0.0", port=5000, debug=False, use_reloader=False, threaded=True)