diff --git a/Module/DeviceInfo.py b/Module/DeviceInfo.py index 3c0b351..699c641 100644 --- a/Module/DeviceInfo.py +++ b/Module/DeviceInfo.py @@ -1,12 +1,4 @@ -# -*- coding: utf-8 -*- -""" -极简稳定版设备监督器(DeviceInfo):加详细 print 日志 - - 每个关键节点都会 print,便于人工观察执行到哪一步 - - 保留核心逻辑:监听上下线 / 启动 WDA / 起 iproxy / 通知前端 - - 并发提速:_add_device 异步化(受控并发) - - iproxy 守护:本地端口 + /status 探活,不通则自愈重启;连续失败达阈值才移除 -""" -import datetime + import http.client import json import os @@ -18,13 +10,11 @@ import time from concurrent.futures import ThreadPoolExecutor from pathlib import Path from typing import Dict, Optional - import psutil import tidevice import wda from tidevice import Usbmux, ConnectionType from tidevice._device import BaseDevice - from Entity.DeviceModel import DeviceModel from Entity.Variables import WdaAppBundleId, wdaScreenPort, wdaFunctionPort from Module.FlaskSubprocessManager import FlaskSubprocessManager @@ -119,7 +109,6 @@ class DeviceInfo: ADD_STABLE_SEC = float(os.getenv("ADD_STABLE_SEC", "2.0")) REMOVE_GRACE_SEC = float(os.getenv("REMOVE_GRACE_SEC", "6.0")) - WDA_READY_TIMEOUT = float(os.getenv("WDA_READY_TIMEOUT", "35.0")) def __init__(self) -> None: # 防止多次初始化(因为 __init__ 每次调用 DeviceInfo() 都会执行) @@ -170,7 +159,7 @@ class DeviceInfo: if getattr(self, "_add_executor", None) is None: from concurrent.futures import ThreadPoolExecutor import os - max_workers = int(os.getenv("DEVICE_ADD_WORKERS", "6")) + max_workers = 6 self._add_executor = ThreadPoolExecutor( max_workers=max_workers, thread_name_prefix="dev-add" @@ -231,59 +220,29 @@ class DeviceInfo: except Exception: pass - # =============== iproxy 健康检查 / 自愈 =============== - def _iproxy_tcp_probe(self, port: int, timeout: float = 0.6) -> bool: - """快速 TCP 探测:能建立连接即认为本地监听正常。""" + def _iproxy_health_ok(self, port: int) -> bool: try: - with socket.create_connection(("127.0.0.1", int(port)), timeout=timeout): - return True - except Exception: - return False - - def _iproxy_http_status_ok_quick(self, port: int, timeout: float = 1.2) -> bool: - """ - 轻量 HTTP 探测:GET /status - - 成功返回 2xx/3xx 视为 OK - - 4xx/5xx 也说明链路畅通(服务可交互),这里统一认为 OK(避免误判) - """ - try: - conn = http.client.HTTPConnection("127.0.0.1", int(port), timeout=timeout) - conn.request("GET", "/status") + conn = http.client.HTTPConnection("127.0.0.1", int(port), timeout=1.5) + conn.request("GET", "/") resp = conn.getresponse() - _ = resp.read(128) - code = getattr(resp, "status", 0) + + status = getattr(resp, "status", 0) + ctype = resp.getheader("Content-Type", "") or "" + conn.close() - # 任何能返回 HTTP 的,都说明“有服务可交互” - return 100 <= code <= 599 - except Exception: + + ok = (200 <= status < 400) and ("multipart" in ctype.lower()) + + if not ok: + LogManager.error( + f"[iproxy] 健康检查失败: status={status}, ctype={ctype!r}, port={port}" + ) + return ok + + except Exception as e: + LogManager.error(f"[iproxy] 健康检查异常 port={port}: {e}") return False - def _video_stream_ok(self, udid: str, port: int) -> bool: - try: - conn = http.client.HTTPConnection("127.0.0.1", int(port), timeout=1.2) - conn.request("GET", "/screen") # 你们实际的视频流接口 - resp = conn.getresponse() - _ = resp.read(64) - conn.close() - return 200 <= resp.status < 500 # 任何可交互状态都算活着 - except: - return False - - def _iproxy_health_ok(self, udid: str, port: int) -> bool: - # 1) 监听检测:不通直接 False - if not self._iproxy_tcp_probe(port, timeout=0.6): - LogManager.error("检测到有设备视频流异常") - return False - # 2) 业务探测:/status 慢可能是 WDA 卡顿;失败不等同于“端口坏” - if not self._iproxy_http_status_ok_quick(port, timeout=1.2): - LogManager.error("检测到有设备视频流异常") - return False - # 3) 视频流接口健康才算真健康 - if not self._video_stream_ok(udid, port): - LogManager.error("检测到有设备视频流异常") - return False - - return True def _restart_iproxy(self, udid: str, port: int) -> bool: """干净重启 iproxy:先杀旧的,再启动新的,并等待监听。""" @@ -366,8 +325,7 @@ class DeviceInfo: print(f"[WDA-guard] 已有现成端口 {port},等待 WDA 在该端口就绪 {udid}") ok = self._wait_wda_ready_on_port( udid, - local_port=port, - total_timeout_sec=self.WDA_READY_TIMEOUT, + local_port=wdaFunctionPort, ) if not ok: print(f"[WDA-guard] WDA 在端口 {port} 未在超时内就绪 {udid}") @@ -384,204 +342,75 @@ class DeviceInfo: # =============== 一轮检查:先自愈,仍失败才考虑移除 ================= def check_iproxy_ports(self): - """ - 后台守护 iproxy 健康状态的看门狗线程: - - 定期遍历当前所有设备对应的 iproxy 进程 + 端口 - - 先做健康检查(TCP + HTTP) - - 如不健康: - 1)先尝试重启 iproxy(第一层自愈) - 2)连续多次失败后,尝试重启 WDA(第二层自愈) - 3)两层自愈都失败,多次以后将设备标记为:ready=False, streamBroken=True - 但不移除设备,避免“列表里忽隐忽现”的抖动 - """ - import os - import time - - # 失败计数达到多少次,触发 WDA 重启(第二级自愈) - WDA_RESTART_THRESHOLD = int(os.getenv("WDA_RESTART_THRESHOLD", "2")) - # 总失败次数达到多少次,认为这台设备当前整体不可用,降级 ready/streamBroken - FAIL_THRESHOLD = int(os.getenv("IPROXY_FAIL_THRESHOLD", "3")) - - # 给整个系统一点启动缓冲时间 - time.sleep(int(os.getenv("IPROXY_CHECK_START_DELAY", "10"))) - print("[iproxy-check] iproxy 守护线程已启动") + print("[Guard] iproxy+WDA 守护线程启动") while True: try: - # 复制一份当前快照,避免遍历过程中被修改 with self._lock: - items = list(self._iproxy.items()) - # 同时需要端口映射 - ports_map = dict(self._port_by_udid) + udids = list(self._models.keys()) + + for udid in udids: + with self._lock: + port = self._port_by_udid.get(udid) + fail = self._iproxy_fail_count.get(udid, 0) - for device_id, proc in items: - # 没端口信息,说明这台设备还没完成初始化/已经被清理,跳过 - port = ports_map.get(device_id) if not port: continue - # 进程对象可能已经被 kill/None,做个防御 - if proc is None: - print(f"[iproxy-check] 发现 {device_id} 没有 iproxy 进程对象,记录一次失败") - fails = self._iproxy_fail_count.get(device_id, 0) + 1 - self._iproxy_fail_count[device_id] = fails - continue + # ==== 第一层:视频流健康检查 ==== + ok_stream = self._iproxy_health_ok(port) - # ---------- 第一步:健康探测 ---------- - ok = False - try: - ok = self._iproxy_health_ok(device_id, port) - except Exception as e: - ok = False - print(f"[iproxy-check] 健康检查异常 deviceId={device_id} port={port}: {e}") - - if ok: - # 健康 → 失败计数清零,如之前标记过 streamBroken/ready=False,可考虑恢复 - if self._iproxy_fail_count.get(device_id): - print(f"[iproxy-check] 设备恢复健康,清零失败计数 deviceId={device_id} port={port}") - self._iproxy_fail_count[device_id] = 0 - - # 如果之前降级过,这里顺便恢复 ready/streamBroken + if ok_stream: + # 成功 → 清零失败计数 with self._lock: - m = self._models.get(device_id) - need_report = False - if m: - prev_ready = getattr(m, "ready", True) - prev_broken = getattr(m, "streamBroken", False) - if (not prev_ready) or prev_broken: - m.ready = True - if prev_broken: - try: - delattr(m, "streamBroken") - except Exception: - setattr(m, "streamBroken", False) - need_report = True - if need_report: - try: - self._manager_send() - except Exception as e: - print(f"[iproxy-check] 上报设备恢复异常 deviceId={device_id}: {e}") - # 这台设备没事了,检查下一台 + self._iproxy_fail_count[udid] = 0 continue - # ---------- 第二步:不健康 → 尝试第一层自愈(重启 iproxy) ---------- - print(f"[iproxy-check] 探活失败,准备自愈重启 iproxy deviceId={device_id} port={port}") - healed = False - try: - healed = self._restart_iproxy(device_id, port) - except Exception as e: - healed = False - print(f"[iproxy-check] _restart_iproxy 调用异常 deviceId={device_id} port={port}: {e}") + # ------ 以下为失败处理 ------ + fail += 1 + with self._lock: + self._iproxy_fail_count[udid] = fail - ok2 = False - if healed: - try: - ok2 = self._iproxy_health_ok(device_id, port) - except Exception as e: - ok2 = False - print(f"[iproxy-check] 自愈后健康检查异常 deviceId={device_id} port={port}: {e}") + print(f"[Guard] 第 {fail} 次失败 udid={udid}, port={port}") - if ok2: - print(f"[iproxy-check] iproxy 自愈成功 deviceId={device_id} port={port}") - self._iproxy_fail_count[device_id] = 0 - # 有可能之前是降级状态,这里也做一次恢复 - with self._lock: - m = self._models.get(device_id) - need_report = False - if m: - prev_ready = getattr(m, "ready", True) - prev_broken = getattr(m, "streamBroken", False) - if (not prev_ready) or prev_broken: - m.ready = True - if prev_broken: - try: - delattr(m, "streamBroken") - except Exception: - setattr(m, "streamBroken", False) - need_report = True - if need_report: - try: - self._manager_send() - except Exception as e: - print(f"[iproxy-check] 上报 iproxy 自愈恢复异常 deviceId={device_id}: {e}") - continue + # ==== 第 1~2 次失败 → 优先重启 iproxy ==== + if fail in (1, 2): + print(f"[Guard] 尝试重启 iproxy(第 {fail} 次){udid}") + if self._restart_iproxy(udid, port): + time.sleep(1.5) + continue - # ---------- 第三步:iporxy 自愈失败,累计失败计数 ---------- - fails = self._iproxy_fail_count.get(device_id, 0) + 1 - self._iproxy_fail_count[device_id] = fails - print(f"[iproxy-check] iproxy 自愈失败 ×{fails} deviceId={device_id} port={port}") + # ==== 第 3~4 次失败 → 检查 WDA 状态 ==== + if fail in (3, 4): + print(f"[Guard] 检查 WDA 状态 {udid}") + wda_ok = self._wait_wda_ready_on_port(udid, wdaFunctionPort) - # ---------- 第四步:第二级自愈 → 重启 WDA ---------- - if fails >= WDA_RESTART_THRESHOLD: - print(f"[iproxy-check] 连续失败 {fails} 次,尝试重启 WDA deviceId={device_id}") - - wda_ok = False - try: - wda_ok = self._restart_wda(device_id) - except Exception as e: - wda_ok = False - print(f"[iproxy-check] 调用 _restart_wda 异常 deviceId={device_id}: {e}") - - if wda_ok: - # WDA 重启成功后,再做一次健康检查 - ok3 = False - try: - ok3 = self._iproxy_health_ok(device_id, port) - except Exception as e: - ok3 = False - print(f"[iproxy-check] WDA 重启后健康检查异常 deviceId={device_id} port={port}: {e}") - - if ok3: - print(f"[iproxy-check] WDA 重启后恢复正常 deviceId={device_id} port={port}") - self._iproxy_fail_count[device_id] = 0 - with self._lock: - m = self._models.get(device_id) - need_report = False - if m: - prev_ready = getattr(m, "ready", True) - prev_broken = getattr(m, "streamBroken", False) - if (not prev_ready) or prev_broken: - m.ready = True - if prev_broken: - try: - delattr(m, "streamBroken") - except Exception: - setattr(m, "streamBroken", False) - need_report = True - if need_report: - try: - self._manager_send() - except Exception as e: - print(f"[iproxy-check] 上报 WDA 自愈恢复异常 deviceId={device_id}: {e}") - # 这台设备已经恢复,继续下一台 + if not wda_ok: + print(f"[Guard] WDA 异常 → 尝试重启 WDA(第 {fail} 次){udid}") + if self._restart_wda(udid): + time.sleep(1.2) + continue + else: + print(f"[Guard] WDA 正常,但视频流挂了 → 再重启 iproxy") + if self._restart_iproxy(udid, port): + time.sleep(1.5) continue - else: - print(f"[iproxy-check] WDA 重启后仍不健康 deviceId={device_id} port={port}") - # ---------- 第五步:自愈 + 重启 WDA 都不行 → 按 FAIL_THRESHOLD 降级 ---------- - if fails >= FAIL_THRESHOLD: - with self._lock: - m = self._models.get(device_id) - if m: - m.ready = False - setattr(m, "streamBroken", True) + # ==== 第 5 次失败 → 移除设备 ==== + if fail >= 5: + print(f"[Guard] 连续 5 次失败,移除设备 {udid}") try: - if m: - print( - f"[iproxy-check] 连续失败 {fails} 次,降级设备(保留在线) deviceId={device_id} port={port}" - ) - self._manager_send() + self._remove_device(udid) except Exception as e: - print(f"[iproxy-check] 上报降级异常 deviceId={device_id}: {e}") - - # 每轮检查间隔,可按需调整 - time.sleep(int(os.getenv("IPROXY_CHECK_INTERVAL", "5"))) + print(f"[Guard] 移除异常: {e}") + continue except Exception as e: - # 整个循环防御,避免线程因为异常退出 - print(f"[iproxy-check] 守护线程异常: {e}") - time.sleep(5) + print(f"[Guard] 守护线程异常: {e}") + + time.sleep(2.0) + def listen(self): LogManager.method_info("进入主循环", "listen", udid="system") @@ -606,8 +435,6 @@ class DeviceInfo: for udid in online - known: if (now - self._first_seen.get(udid, now)) >= self.ADD_STABLE_SEC: - print(datetime.datetime.now().strftime("%H:%M:%S")) - print(f"[Add] 检测到新设备: {udid}") try: self._add_device(udid) # ← 并发包装器 except Exception as e: @@ -628,30 +455,16 @@ class DeviceInfo: time.sleep(1) - def _wait_wda_ready_on_port(self, udid: str, local_port: int, total_timeout_sec: float = None) -> bool: - """在给定的本地映射端口上等待 /status 就绪。""" - import http.client, time - if total_timeout_sec is None: - total_timeout_sec = self.WDA_READY_TIMEOUT - deadline = _monotonic() + total_timeout_sec - attempt = 0 - while _monotonic() < deadline: - attempt += 1 - try: - conn = http.client.HTTPConnection("127.0.0.1", local_port, timeout=1.8) - conn.request("GET", "/status") - resp = conn.getresponse() - _ = resp.read(128) - code = getattr(resp, "status", 0) - ok = 200 <= code < 400 - print(f"[WDA] /status@{local_port} 第{attempt}次 code={code}, ok={ok} {udid}") - if ok: - return True - except Exception as e: - print(f"[WDA] /status@{local_port} 异常({attempt}): {e}") - time.sleep(0.5) - print(f"[WDA] /status@{local_port} 等待超时 {udid}") - return False + # 检测设备wda状态 + def _wait_wda_ready_on_port(self, udid: str, local_port: int) -> bool: + try: + dev = wda.USBClient(udid, local_port) + info = dev.status() # 调用成功即可说明 WDA 正常 + return info["ready"] + except Exception as e: + print(f"[WDA] status 异常({udid}): {e}") + return False + def _send_snapshot_to_flask(self): """把当前 _models 的全量快照发送给 Flask 进程""" @@ -755,7 +568,7 @@ class DeviceInfo: major = 0 # 直接用“正式端口”探测 /status,避免再启一次临时 iproxy - if not self._wait_wda_ready_on_port(udid, local_port=port, total_timeout_sec=3.0): + if not self._wait_wda_ready_on_port(udid, local_port=wdaFunctionPort): # 如果还没起来,按你原逻辑拉起 WDA 再等 if major >= 17: print("进入 iOS17+ 设备的分支") @@ -782,7 +595,7 @@ class DeviceInfo: except Exception as e: print(f"[WDA] app_start 异常: {e}") - if not self._wait_wda_ready_on_port(udid, local_port=port, total_timeout_sec=self.WDA_READY_TIMEOUT): + if not self._wait_wda_ready_on_port(udid, local_port=wdaFunctionPort): print(f"[WDA] WDA 未在超时内就绪, 放弃新增 {udid}") # 清理已起的正式 iproxy try: @@ -819,7 +632,6 @@ class DeviceInfo: print(f"[Manager] 准备发送设备数据到前端 {udid}") self._manager_send() - print(datetime.datetime.now().strftime("%H:%M:%S")) print(f"[Add] 设备添加成功 {udid}, port={port}, {w}x{h}@{s}") def _remove_device(self, udid: str): @@ -873,61 +685,6 @@ class DeviceInfo: print(f"[Trust] 设备 {udid} 未信任") return False - def _wda_http_status_ok_once(self, udid: str, timeout_sec: float = 1.8) -> bool: - """只做一次 /status 探测。任何异常都返回 False,不让外层炸掉。""" - tmp_port = None - proc = None - try: - tmp_port = self._alloc_port() # 这里可能抛异常 - print(f"[WDA] 启动临时 iproxy 以检测 /status {udid}") - proc = self._spawn_iproxy(udid, local_port=tmp_port, remote_port=wdaScreenPort) - if not proc: - print("[WDA] 启动临时 iproxy 失败") - return False - if not self._wait_until_listening(tmp_port, 3.0): - print(f"[WDA] 临时端口未监听 {tmp_port}") - return False - - # 最多两次快速探测 - for i in (1, 2): - try: - import http.client - conn = http.client.HTTPConnection("127.0.0.1", tmp_port, timeout=timeout_sec) - conn.request("GET", "/status") - resp = conn.getresponse() - _ = resp.read(128) - code = getattr(resp, "status", 0) - ok = 200 <= code < 400 - print(f"[WDA] /status 第{i}次 code={code}, ok={ok}") - if ok: - return True - except Exception as e: - print(f"[WDA] /status 异常({i}): {e}") - time.sleep(0.25) - return False - - except Exception as e: - import traceback - print(f"[WDA][probe] 异常:{e}\n{traceback.format_exc()}") - return False - - finally: - if proc: - self._kill(proc) - if tmp_port is not None: - self._release_port(tmp_port) - - def _wait_wda_ready_http(self, udid: str, total_timeout_sec: float) -> bool: - print(f"[WDA] 等待 WDA Ready (超时 {total_timeout_sec}s) {udid}") - deadline = _monotonic() + total_timeout_sec - while _monotonic() < deadline: - if self._wda_http_status_ok_once(udid): - print(f"[WDA] WDA 就绪 {udid}") - return True - time.sleep(0.6) - print(f"[WDA] WDA 等待超时 {udid}") - return False - def _screen_info(self, udid: str): try: # 避免 c.home() 可能触发的阻塞,直接取 window_size @@ -1022,21 +779,6 @@ class DeviceInfo: print(f"[Proc] 结束进程异常: {e}") def _manager_send(self): - # try: - # if self._manager.send(model.toDict()): - # print(f"[Manager] 已发送前端数据 {model.deviceId}") - # return - # except Exception as e: - # print(f"[Manager] 首次发送异常: {e}") - # - # # 自愈:拉起一次并重试一次(不要用 and 连接) - # try: - # self._manager.start() # 不关心返回值 - # if self._manager.send(model.toDict()): - # print(f"[Manager] 重试发送成功 {model.deviceId}") - # return - # except Exception as e: - # print(f"[Manager] 重试发送异常: {e}") """对外统一的“通知 Flask 有设备变动”的入口(无参数)。 作用:把当前所有设备的全量快照发给 Flask。 """