diff --git a/Module/FlaskService.py b/Module/FlaskService.py index 02bf6aa..fb9cd99 100644 --- a/Module/FlaskService.py +++ b/Module/FlaskService.py @@ -1,7 +1,9 @@ +# -*- coding: utf-8 -*- import json import os import socket import threading +import time from pathlib import Path from queue import Queue from typing import Any, Dict @@ -27,140 +29,223 @@ CORS(app) app.config['JSON_AS_ASCII'] = False # Flask jsonify 不转义中文/emoji app.config['JSONIFY_MIMETYPE'] = "application/json; charset=utf-8" +# ============ 设备状态内存表 ============ listData = [] listLock = threading.Lock() +# 历史遗留:不再消费队列,改为socket线程直接落地 dataQueue = Queue() +# ---- 黏性快照(避免瞬时空) ---- +_last_nonempty_snapshot = [] +_last_snapshot_ts = 0.0 +_STICKY_TTL_SEC = 10.0 # 在瞬时空时,回退到上一份非空快照10秒 +_empty_logged = False +_recovered_logged = False -def start_socket_listener(): - port = int(os.getenv('FLASK_COMM_PORT', 0)) - LogManager.info(f"Received port from environment: {port}") - print(f"Received port from environment: {port}") - if port <= 0: - LogManager.info("未获取到通信端口,跳过Socket监听") - print("未获取到通信端口,跳过Socket监听") - return +# ===== 设备集合变化跟踪 ===== +change_version = 0 +_device_ids_snapshot = set() +_last_device_count = 0 + +def _log_device_changes(action: str): + """记录设备集合增删变化""" + global _device_ids_snapshot, change_version + curr_ids = {d.get("deviceId") for d in listData if _is_online(d)} + added = curr_ids - _device_ids_snapshot + removed = _device_ids_snapshot - curr_ids + if added or removed: + change_version += 1 + try: + LogManager.info(f"[DEVICE][CHANGED][{action}] rev={change_version} count={len(curr_ids)} added={list(added)} removed={list(removed)}") + except Exception: + print(f"[DEVICE][CHANGED][{action}] rev={change_version} count={len(curr_ids)} added={list(added)} removed={list(removed)}") + _device_ids_snapshot = curr_ids + +def _normalize_type(v) -> int: + """把各种表示在线/离线的值,规范成 1/0""" + if isinstance(v, bool): + return 1 if v else 0 + if isinstance(v, (int, float)): + return 1 if int(v) == 1 else 0 + if isinstance(v, str): + s = v.strip().lower() + if s.isdigit(): + return 1 if int(s) == 1 else 0 + if s in ("true", "online", "on", "yes"): + return 1 + return 0 + return 1 if v else 0 + +def _is_online(d: Dict[str, Any]) -> bool: + return _normalize_type(d.get("type", 1)) == 1 + +def _apply_device_event(obj: Dict[str, Any]): + """把单条设备上线/下线事件落到 listData,并打印关键日志""" try: - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - # 设置端口复用,避免端口被占用时无法绑定 - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - # 尝试绑定端口 - try: - s.bind(('127.0.0.1', port)) - print(f"[INFO] Socket successfully bound to port {port}") - LogManager.info(f"[INFO] Socket successfully bound to port {port}") - except Exception as bind_error: - print(f"[ERROR]端口绑定失败: {bind_error}") - LogManager.info(f"[ERROR]端口绑定失败: {bind_error}") - return - - # 开始监听 - s.listen() - LogManager.info(f"[INFO] Socket listener started on port {port}, waiting for connections...") - print(f"[INFO] Socket listener started on port {port}, waiting for connections...") - while True: - try: - conn, addr = s.accept() - except Exception as e: - LogManager.error(f"[ERROR] accept 失败: {e}") - continue - - # 独立线程处理单条连接,避免单客户端异常拖垮监听线程 - threading.Thread(target=_handle_conn, args=(conn, addr), daemon=True).start() - # while True: - # try: - # LogManager.info(f"[INFO] Waiting for a new connection on port {port}...") - # print(f"[INFO] Waiting for a new connection on port {port}...") - # conn, addr = s.accept() - # LogManager.info(f"[INFO] Connection accepted from: {addr}") - # print(f"[INFO] Connection accepted from: {addr}") - # - # raw_data = conn.recv(1024).decode('utf-8').strip() - # LogManager.info(f"[INFO] Raw data received: {raw_data}") - # print(f"[INFO] Raw data received: {raw_data}") - # - # data = json.loads(raw_data) - # LogManager.info(f"[INFO] Parsed data: {data}") - # print(f"[INFO] Parsed data: {data}") - # dataQueue.put(data) - # except Exception as conn_error: - # LogManager.error(f"[ERROR]连接处理失败: {conn_error}") - # print(f"[ERROR]连接处理失败: {conn_error}") + dev_id = obj.get("deviceId") + typ = _normalize_type(obj.get("type", 1)) + obj["type"] = typ # 写回规范后的值,避免后续被误判 + if dev_id is None: + LogManager.warning(f"[DEVICE][WARN] missing deviceId in obj={obj}") + return + with listLock: + before = len(listData) + # 删除同 udid 旧记录 + listData[:] = [d for d in listData if d.get("deviceId") != dev_id] + if typ == 1: + listData.append(obj) # 上线 + LogManager.info(f"[DEVICE][UPSERT] id={dev_id} type={typ} size={len(listData)} (replaced={before - (len(listData)-1)})") + _log_device_changes("UPSERT") + else: + LogManager.warning(f"[DEVICE][REMOVE] id={dev_id} type={typ} size={len(listData)} (removed_prev={before - len(listData)})") + _log_device_changes("REMOVE") except Exception as e: - LogManager.error(f"[ERROR]Socket服务启动失败: {e}") - print(f"[ERROR]Socket服务启动失败: {e}") - + LogManager.error(f"[DEVICE][APPLY_EVT][ERROR] {e}") +# ============ 设备事件 socket 监听 ============ def _handle_conn(conn: socket.socket, addr): + """统一的连接处理函数(外部全局,避免内嵌函数覆盖)""" try: + LogManager.info(f"[SOCKET][ACCEPT] from={addr}") with conn: - # 1. 循环收包直到拿到完整 JSON buffer = "" while True: data = conn.recv(1024) if not data: # 对端关闭 break buffer += data.decode('utf-8', errors='ignore') - # 2. 尝试切出完整 JSON(简单按行,也可按长度头、分隔符) + # 按行切 JSON;发送端每条以 '\n' 结尾 while True: line, sep, buffer = buffer.partition('\n') - if not sep: # 没找到完整行 + if not sep: break line = line.strip() - if not line: # 空行跳过 + if not line: continue try: obj = json.loads(line) except json.JSONDecodeError as e: - LogManager.warning(f"[WARN] 非法 JSON 丢弃: {line[:100]} {e}") + LogManager.warning(f"[SOCKET][WARN] 非法 JSON 丢弃: {line[:120]} err={e}") continue - # 3. 收到合法数据,塞进队列 - dataQueue.put(obj) - LogManager.info(f"[INFO] 收到合法消息: {obj}") + dev_id = obj.get("deviceId") + typ = _normalize_type(obj.get("type", 1)) + obj["type"] = typ + LogManager.info(f"[SOCKET][RECV] deviceId={dev_id} type={typ} keys={list(obj.keys())}") + _apply_device_event(obj) + LogManager.info(f"[SOCKET][APPLY] deviceId={dev_id} type={typ}") except Exception as e: - LogManager.error(f"[ERROR] 连接处理异常: {e}") + LogManager.error(f"[SOCKET][ERROR] 连接处理异常: {e}") +def start_socket_listener(): + """启动设备事件监听(与 HTTP 端口无关,走 FLASK_COMM_PORT)""" + # 统一使用 FLASK_COMM_PORT,默认 34566 + port = int(os.getenv('FLASK_COMM_PORT', 34566)) + LogManager.info(f"Received port from environment: {port}") + print(f"Received port from environment: {port}") -# 在独立线程中启动Socket服务 + if port <= 0: + LogManager.info("未获取到通信端口,跳过Socket监听") + print("未获取到通信端口,跳过Socket监听") + return + + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + try: + s.bind(('127.0.0.1', port)) + print(f"[INFO] Socket successfully bound to port {port}") + LogManager.info(f"[INFO] Socket successfully bound to port {port}") + except Exception as bind_error: + print(f"[ERROR]端口绑定失败: {bind_error}") + LogManager.info(f"[ERROR]端口绑定失败: {bind_error}") + return + + s.listen() + LogManager.info(f"[INFO] Socket listener started on port {port}, waiting for connections...") + print(f"[INFO] Socket listener started on port {port}, waiting for connections...") + + while True: + try: + conn, addr = s.accept() + except Exception as e: + LogManager.error(f"[ERROR] accept 失败: {e}") + continue + threading.Thread(target=_handle_conn, args=(conn, addr), daemon=True).start() + +# 监听存活看门狗:端口不可连就拉起 +def _listener_watchdog(): + import socket as _s + port = int(os.getenv('FLASK_COMM_PORT', 34566)) + while True: + try: + ok = False + try: + with _s.socket(_s.AF_INET, _s.SOCK_STREAM) as c: + c.settimeout(1.0) + c.connect(('127.0.0.1', port)) + ok = True + except Exception: + ok = False + if not ok: + LogManager.warning(f"[SOCKET][WATCHDOG] listener not reachable on {port}, restarting...") + try: + threading.Thread(target=start_socket_listener, daemon=True).start() + except Exception as e: + LogManager.error(f"[SOCKET][WATCHDOG] restart failed: {e}") + except Exception as e: + LogManager.error(f"[SOCKET][WATCHDOG] error: {e}") + time.sleep(5) + +# 独立线程启动 Socket 服务 + 看门狗 listener_thread = threading.Thread(target=start_socket_listener, daemon=True) listener_thread.start() +watchdog_thread = threading.Thread(target=_listener_watchdog, daemon=True) +watchdog_thread.start() - -# 获取设备列表 +# ============ API 路由 ============ @app.route('/deviceList', methods=['GET']) def deviceList(): + global _last_device_count, change_version + global _last_nonempty_snapshot, _last_snapshot_ts, _STICKY_TTL_SEC + global _empty_logged, _recovered_logged try: with listLock: - # 1. 消费完队列 - while not dataQueue.empty(): - obj = dataQueue.get() - if obj["type"] == 1: - # 上线:先踢掉同 deviceId 的旧记录(端口可能变) - listData[:] = [d for d in listData if d.get("deviceId") != obj.get("deviceId")] - listData.append(obj) - else: - # 下线:只要同 deviceId 就删,不管端口 - listData[:] = [d for d in listData if d.get("deviceId") != obj.get("deviceId")] + # 宽容判定在线(字符串'1'/'true'/True 都算) + data = [d for d in listData if _is_online(d)] + now = time.time() - # 2. 兜底:只保留在线 - listData[:] = [d for d in listData if d.get('type') == 1] + # 记录最近一次非空快照 + if data: + _last_nonempty_snapshot = data.copy() + _last_snapshot_ts = now + if _recovered_logged: + LogManager.info(f"[API][deviceList][RECOVERED] count={len(data)} rev={change_version}") + _recovered_logged = False + _empty_logged = False + else: + # 瞬时空:在 TTL 内回退上一份非空快照 + if _last_nonempty_snapshot and (now - _last_snapshot_ts) <= _STICKY_TTL_SEC: + LogManager.warning(f"[API][deviceList][STICKY] serving last non-empty snapshot count={len(_last_nonempty_snapshot)} age={now - _last_snapshot_ts:.1f}s rev={change_version}") + return ResultData(data=_last_nonempty_snapshot).toJson() + if not _empty_logged: + LogManager.error(f"[API][deviceList][DROP_TO_EMPTY] last_count={_last_device_count} rev={change_version}") + _empty_logged = True + _recovered_logged = True - return ResultData(data=listData.copy()).toJson() + _last_device_count = len(data) + LogManager.info(f"[API][deviceList] return_count={len(data)} rev={change_version}") + return ResultData(data=data).toJson() except Exception as e: - LogManager.error("获取设备列表失败:", e) + LogManager.error(f"[API][deviceList] error={e}") return ResultData(data=[]).toJson() - -# 传递token @app.route('/passToken', methods=['POST']) def passToken(): data = request.get_json() print(data) return ResultData(data="").toJson() - # 获取设备应用列表 @app.route('/deviceAppList', methods=['POST']) def deviceAppList(): @@ -169,7 +254,6 @@ def deviceAppList(): apps = ControlUtils.getDeviceAppList(udid) return ResultData(data=apps).toJson() - # 打开指定app @app.route('/launchApp', methods=['POST']) def launchApp(): @@ -205,7 +289,6 @@ def tapAction(): session.tap(x, y) return ResultData(data="").toJson() - # 拖拽事件 @app.route('/swipeAction', methods=['POST']) def swipeAction(): @@ -224,7 +307,6 @@ def swipeAction(): session.swipe(sx, sy, ex, ey, duration) return ResultData(data="").toJson() - # 长按事件 @app.route('/longPressAction', methods=['POST']) def longPressAction(): @@ -238,7 +320,6 @@ def longPressAction(): session.tap_hold(x, y, 1.0) return ResultData(data="").toJson() - # 养号 @app.route('/growAccount', methods=['POST']) def growAccount(): @@ -254,7 +335,6 @@ def growAccount(): code, msg = ThreadManager.add(udid, thread, event) return ResultData(data="", code=code, message=msg).toJson() - # 观看直播 @app.route("/watchLiveForGrowth", methods=['POST']) def watchLiveForGrowth(): @@ -267,7 +347,6 @@ def watchLiveForGrowth(): ThreadManager.add(udid, thread, event) return ResultData(data="").toJson() - # 停止脚本 @app.route("/stopScript", methods=['POST']) def stopScript(): @@ -277,7 +356,6 @@ def stopScript(): code, msg = ThreadManager.stop(udid) return ResultData(code=code, data=[], message=msg).toJson() - # 关注打招呼 @app.route('/passAnchorData', methods=['POST']) def passAnchorData(): @@ -292,7 +370,6 @@ def passAnchorData(): acList = data.get("anchorList", []) Variables.commentList = data.get("comment") - LogManager.info(f"[INFO] 获取数据: {idList} {acList}") AiUtils.save_aclist_flat_append(acList) @@ -322,7 +399,6 @@ def passAnchorData(): LogManager.error(e) return ResultData(data="", code=1001).toJson() - @app.route('/followAndGreetUnion', methods=['POST']) def followAndGreetUnion(): try: @@ -361,14 +437,12 @@ def followAndGreetUnion(): LogManager.error(e) return ResultData(data="", code=1001).toJson() - # 获取私信数据 @app.route("/getPrologueList", methods=['GET']) def getPrologueList(): import Entity.Variables as Variables return ResultData(data=Variables.prologueList).toJson() - # 添加临时数据 # 批量追加主播到 JSON 文件 @app.route("/addTempAnchorData", methods=['POST']) @@ -385,7 +459,6 @@ def addTempAnchorData(): AiUtils.save_aclist_flat_append(data, "log/acList.json") return ResultData(data="ok").toJson() - # 获取当前屏幕上的聊天信息 @app.route("/getChatTextInfo", methods=['POST']) def getChatTextInfo(): @@ -432,7 +505,6 @@ def getChatTextInfo(): ] return ResultData(data=data, message="解析失败").toJson() - # 监控消息 @app.route("/replyMessages", methods=['POST']) def monitorMessages(): @@ -449,7 +521,6 @@ def monitorMessages(): ThreadManager.add(udid, thread, event) return ResultData(data="").toJson() - # 上传日志 @app.route("/setLoginInfo", methods=['POST']) def upLoadLogLogs(): @@ -463,7 +534,6 @@ def upLoadLogLogs(): else: return ResultData(data="", message="日志上传失败").toJson() - # 获取当前的主播列表数据 @app.route("/anchorList", methods=['POST']) def queryAnchorList(): @@ -481,7 +551,6 @@ def queryAnchorList(): data = [] return ResultData(data=data).toJson() - # 修改当前的主播列表数据 @app.route("/updateAnchorList", methods=['POST']) def updateAnchorList(): @@ -537,7 +606,6 @@ def updateAnchorList(): else: return ResultData(data=0, message="未找到符合条件的记录").toJson() - # 删除主播 @app.route("/deleteAnchorWithIds", methods=['POST']) def deleteAnchorWithIds(): @@ -546,7 +614,6 @@ def deleteAnchorWithIds(): deleted = AiUtils.delete_anchors_by_ids(ids) return ResultData(data={"deleted": deleted}).toJson() - # 配置ai人设 @app.route("/aiConfig", methods=['POST']) def aiConfig(): @@ -585,14 +652,12 @@ def aiConfig(): IOSAIStorage.overwrite(dict, "aiConfig.json") return ResultData(data="").toJson() - # 查询主播聊天发送的最后一条信息 @app.route("/select_last_message", methods=['GET']) def select_last_message(): data = JsonUtils.query_all_json_items() return ResultData(data=data).toJson() - # 修改消息(已读改成未读) @app.route("/update_last_message", methods=['POST']) def update_last_message(): @@ -611,7 +676,6 @@ def update_last_message(): return ResultData(data=updated_count, message="修改成功").toJson() return ResultData(data=updated_count, message="修改失败").toJson() - # 删除已读消息 @app.route("/delete_last_message", methods=['POST']) def delete_last_message(): @@ -630,7 +694,6 @@ def delete_last_message(): return ResultData(data=updated_count, message="修改失败").toJson() - # 停止所有任务 @app.route("/stopAllTask", methods=['POST']) def stopAllTask(): @@ -638,7 +701,6 @@ def stopAllTask(): code, msg = ThreadManager.batch_stop(idList) return ResultData(code, [], msg).toJson() - # 切换账号 @app.route('/changeAccount', methods=['POST']) def changeAccount(): @@ -657,7 +719,6 @@ def changeAccount(): # thread.start() return ResultData(data="", code=code, message=msg).toJson() - @app.route('/test', methods=['POST']) def test(): import wda @@ -723,4 +784,5 @@ def getAiConfig(): return ResultData(data=data).toJson() if __name__ == '__main__': - app.run("0.0.0.0", port=5000, debug=True, use_reloader=False) + # 注意:这里建议 debug=False,避免未来有人改成 use_reloader=True 导致多进程 + app.run("0.0.0.0", port=5000, debug=False, use_reloader=False, threaded=True) \ No newline at end of file diff --git a/Module/__pycache__/DeviceInfo.cpython-312.pyc b/Module/__pycache__/DeviceInfo.cpython-312.pyc index 3b0eedc..fd0ed3e 100644 Binary files a/Module/__pycache__/DeviceInfo.cpython-312.pyc and b/Module/__pycache__/DeviceInfo.cpython-312.pyc differ diff --git a/Module/__pycache__/FlaskService.cpython-312.pyc b/Module/__pycache__/FlaskService.cpython-312.pyc index 58ce35b..4008149 100644 Binary files a/Module/__pycache__/FlaskService.cpython-312.pyc and b/Module/__pycache__/FlaskService.cpython-312.pyc differ diff --git a/Module/__pycache__/FlaskSubprocessManager.cpython-312.pyc b/Module/__pycache__/FlaskSubprocessManager.cpython-312.pyc index f993302..2d5c1bf 100644 Binary files a/Module/__pycache__/FlaskSubprocessManager.cpython-312.pyc and b/Module/__pycache__/FlaskSubprocessManager.cpython-312.pyc differ diff --git a/Module/__pycache__/Main.cpython-312.pyc b/Module/__pycache__/Main.cpython-312.pyc index 9c0f17f..9e23e1d 100644 Binary files a/Module/__pycache__/Main.cpython-312.pyc and b/Module/__pycache__/Main.cpython-312.pyc differ diff --git a/Utils/__pycache__/ThreadManager.cpython-312.pyc b/Utils/__pycache__/ThreadManager.cpython-312.pyc index c2b09b7..c6d0596 100644 Binary files a/Utils/__pycache__/ThreadManager.cpython-312.pyc and b/Utils/__pycache__/ThreadManager.cpython-312.pyc differ diff --git a/script/ScriptManager.py b/script/ScriptManager.py index 5bfe692..cf20d3a 100644 --- a/script/ScriptManager.py +++ b/script/ScriptManager.py @@ -1111,23 +1111,18 @@ class ScriptManager(): " or @value='Inbox' or @label='Inbox' or @name='Inbox']" ) - print("11111111111111") # 查找所有收件箱节点 inbox_nodes = session.xpath(xpath_query).find_elements() - print("222222222222222") if len(inbox_nodes) < 2: LogManager.method_error(f"当前页面不再收件箱页面,重启", "检测消息", udid) raise Exception("当前页面不再收件箱页面,重启") - print("33333333333333") m = re.search(r'(\d+)', el.label) # 抓到的第一个数字串 count = int(m.group(1)) if m else 0 - print("444444444444444444") if not count: LogManager.method_info(f"当前收件箱的总数量{count}", "检测消息", udid) break - # print("5555555555555555555555") # 新粉丝 xp_new_fan_badge = ( @@ -1170,7 +1165,6 @@ class ScriptManager(): "[@value and translate(@value,'0123456789','')='']" "/ancestor::XCUIElementTypeCell[1]" ) - print("6666666666666666") try: # 如果 2 秒内找不到,会抛异常 @@ -1184,7 +1178,6 @@ class ScriptManager(): print("当前屏幕没有找到 用户消息 未读徽标数字", udid) user_text = None info_count += 1 - print("777777777777777777777") if user_text: @@ -1218,11 +1211,11 @@ class ScriptManager(): last_in = item['text'] if last_out is None and item['dir'] == 'out': last_out = item['text'] - if last_in or last_out: # 任一条拿到就提前停 + if last_in and last_out: # 任一条拿到就提前停 break # 2. 只有两条都空才重试 - if not last_in and not last_out: + if not last_in or not last_out: attempt += 1 if attempt == 3: break # 三次用完,放弃 diff --git a/script/__pycache__/ScriptManager.cpython-312.pyc b/script/__pycache__/ScriptManager.cpython-312.pyc index cc81b14..78f3251 100644 Binary files a/script/__pycache__/ScriptManager.cpython-312.pyc and b/script/__pycache__/ScriptManager.cpython-312.pyc differ