diff --git a/Module/DeviceInfo.py b/Module/DeviceInfo.py index 301cde0..4df70e7 100644 --- a/Module/DeviceInfo.py +++ b/Module/DeviceInfo.py @@ -4,14 +4,16 @@ import signal import subprocess import threading import time -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import ThreadPoolExecutor, as_completed, TimeoutError from pathlib import Path from typing import Dict, Optional, List import random import socket import http.client import psutil -import hashlib # 仍保留,如需后续扩展 +import hashlib # 保留扩展 +import platform + import tidevice import wda from tidevice import Usbmux, ConnectionType @@ -24,13 +26,34 @@ from Module.IOSActivator import IOSActivator from Utils.LogManager import LogManager +def _monotonic() -> float: + """统一用 monotonic 计时,避免系统时钟跳变影响定时/退避。""" + return time.monotonic() + + class DeviceInfo: - # --- 时序参数(更稳) --- - REMOVE_GRACE_SEC = 8.0 # 设备离线宽限期(秒) - ADD_STABLE_SEC = 2.5 # 设备上线稳定期(秒) - ORPHAN_COOLDOWN = 8.0 # 拓扑变更后暂停孤儿清理(秒) - HEAL_INTERVAL = 5.0 # 健康巡检间隔(秒) + # --- 时序参数(支持环境变量覆盖) --- + REMOVE_GRACE_SEC = float(os.getenv("REMOVE_GRACE_SEC", "8.0")) # 设备离线宽限期 + ADD_STABLE_SEC = float(os.getenv("ADD_STABLE_SEC", "2.5")) # 设备上线稳定期 + ORPHAN_COOLDOWN = float(os.getenv("ORPHAN_COOLDOWN", "8.0")) # 拓扑变更后暂停孤儿清理 + HEAL_INTERVAL = float(os.getenv("HEAL_INTERVAL", "5.0")) # 健康巡检间隔 + + # 端口策略(支持环境变量覆盖) + PORT_RAND_LOW_1 = int(os.getenv("PORT_RAND_LOW_1", "9111")) + PORT_RAND_HIGH_1 = int(os.getenv("PORT_RAND_HIGH_1", "9499")) + PORT_RAND_LOW_2 = int(os.getenv("PORT_RAND_LOW_2", "20000")) + PORT_RAND_HIGH_2 = int(os.getenv("PORT_RAND_HIGH_2", "48000")) + PORT_SCAN_START = int(os.getenv("PORT_SCAN_START", "49152")) + PORT_SCAN_LIMIT = int(os.getenv("PORT_SCAN_LIMIT", "10000")) + + # 自愈退避 + BACKOFF_MAX_SEC = float(os.getenv("BACKOFF_MAX_SEC", "15.0")) + BACKOFF_MIN_SEC = float(os.getenv("BACKOFF_MIN_SEC", "1.5")) + BACKOFF_GROWTH = float(os.getenv("BACKOFF_GROWTH", "1.7")) + + # WDA Ready 等待(HTTP 轮询方式,不触发 xctest) + WDA_READY_TIMEOUT = float(os.getenv("WDA_READY_TIMEOUT", "35.0")) def __init__(self): # 自增端口游标仅作兜底扫描使用 @@ -54,6 +77,10 @@ class DeviceInfo: self._first_seen: Dict[str, float] = {} # udid -> ts(首次在线) self._last_topology_change_ts = 0.0 + # 短缓存:设备信任、WDA运行态(仅作节流) + self._trusted_cache: Dict[str, float] = {} # udid -> expire_ts + self._wda_ok_cache: Dict[str, float] = {} # udid -> expire_ts + LogManager.info("DeviceInfo init 完成;日志已启用", udid="system") # ---------------- 主循环 ---------------- @@ -62,7 +89,7 @@ class DeviceInfo: LogManager.method_info("进入主循环", method, udid="system") orphan_gc_tick = 0 while True: - now = time.time() + now = _monotonic() try: usb = Usbmux().device_list() online_now = {d.udid for d in usb if d.conn_type == ConnectionType.USB} @@ -95,12 +122,18 @@ class DeviceInfo: if to_add: LogManager.info(f"新增设备稳定上线:{to_add}", udid="system") futures = {self._pool.submit(self._add_device, u): u for u in to_add} - for f in as_completed(futures, timeout=45): - try: - f.result() - self._last_topology_change_ts = time.time() - except Exception as e: - LogManager.error(f"异步连接失败:{e}", udid="system") + try: + for f in as_completed(futures, timeout=45): + try: + f.result() + self._last_topology_change_ts = _monotonic() + except Exception as e: + LogManager.error(f"异步连接失败:{e}", udid="system") + except TimeoutError: + for fut, u in futures.items(): + if not fut.done(): + fut.cancel() + LogManager.method_warning("新增设备任务超时已取消", method, udid=u) # 定期健康检查 + 自愈 self._check_and_heal_tunnels(interval=self.HEAL_INTERVAL) @@ -109,7 +142,7 @@ class DeviceInfo: orphan_gc_tick += 1 if orphan_gc_tick >= 10: orphan_gc_tick = 0 - if (time.time() - self._last_topology_change_ts) >= self.ORPHAN_COOLDOWN: + if (_monotonic() - self._last_topology_change_ts) >= self.ORPHAN_COOLDOWN: self._cleanup_orphan_iproxy() time.sleep(1) @@ -123,20 +156,48 @@ class DeviceInfo: LogManager.method_warning("未信任设备,跳过", method, udid=udid) return - r = self.startWda(udid) - if r is False: - LogManager.method_error("启动 WDA 失败,放弃新增", method, udid=udid) - return + # 获取系统主版本 + try: + dev = tidevice.Device(udid) + system_version_major = int(dev.product_version.split(".")[0]) + except Exception as e: + LogManager.method_warning(f"读取系统版本失败:{e}", method, udid=udid) + system_version_major = 0 # 保底 - # iOS 17+ 激活/信任阶段更抖,稍等更稳 - time.sleep(5) + # === iOS>17:先“被动探测”WDA,未运行则交给 IOSActivator,并通过 HTTP 轮询等待 === + if system_version_major > 17: + if self._wda_is_running(udid): + LogManager.method_info("检测到 WDA 已运行,直接映射", method, udid=udid) + else: + LogManager.method_info("WDA 未运行,调用 IOSActivator(pymobiledevice3 自动挂载)", method, udid=udid) + try: + ios = IOSActivator() + threading.Thread(target=ios.activate, args=(udid,), daemon=True).start() + except Exception as e: + LogManager.method_error(f"IOSActivator 启动异常:{e}", method, udid=udid) + return + # 关键:HTTP 轮询等待 WDA Ready(默认最多 35s),不会触发 xctest + if not self._wait_wda_ready_http(udid, total_timeout_sec=self.WDA_READY_TIMEOUT): + LogManager.method_error("WDA 未在超时内就绪(iOS>17 分支)", method, udid=udid) + return + else: + # iOS <= 17:保持原逻辑(app_start + 简单等待) + try: + dev = tidevice.Device(udid) + LogManager.method_info(f"app_start WDA: {WdaAppBundleId}", method, udid=udid) + dev.app_start(WdaAppBundleId) + time.sleep(3) + except Exception as e: + LogManager.method_error(f"WDA 启动异常:{e}", method, udid=udid) + return + # 获取屏幕信息 w, h, s = self._screen_info(udid) if w == 0 or h == 0 or s == 0: LogManager.method_warning("未获取到屏幕信息,放弃新增", method, udid=udid) return - # 不复用端口:直接起一个新端口 + # 启动 iproxy(不复用端口:直接新端口) proc = self._start_iproxy(udid, port=None) if not proc: LogManager.method_error("启动 iproxy 失败,放弃新增", method, udid=udid) @@ -152,7 +213,7 @@ class DeviceInfo: LogManager.method_info(f"设备添加完成,port={port}, {w}x{h}@{s}", method, udid=udid) self._manager_send(model) - # ---------------- 移除设备 ---------------- + # ---------------- 移除设备(修复:总是发送离线通知) ---------------- def _remove_device(self, udid: str): method = "_remove_device" LogManager.method_info("开始移除设备", method, udid=udid) @@ -161,45 +222,118 @@ class DeviceInfo: proc = self._procs.pop(udid, None) pid = self._pid_by_udid.pop(udid, None) self._port_by_udid.pop(udid, None) + # 清缓存,防止误判 + self._trusted_cache.pop(udid, None) + self._wda_ok_cache.pop(udid, None) + self._last_seen.pop(udid, None) + self._first_seen.pop(udid, None) - if not model: - LogManager.method_warning("未找到设备模型,可能重复移除", method, udid=udid) - return - - model.type = 2 self._kill(proc) if pid: self._kill_pid_gracefully(pid) - self._manager_send(model) - LogManager.method_info("设备移除完毕", method, udid=udid) + + if model is None: + # 构造一个最小的“离线模型”通知前端 + try: + offline = DeviceModel(deviceId=udid, screenPort=-1, width=0, height=0, scale=0.0, type=2) + offline.ready = False + self._manager_send(offline) + LogManager.method_info("设备移除完毕(无原模型,已发送离线通知)", method, udid=udid) + except Exception as e: + LogManager.method_warning(f"离线通知(构造模型)异常:{e}", method, udid=udid) + return + + model.type = 2 + model.ready = False + model.screenPort = -1 + try: + self._manager_send(model) + finally: + LogManager.method_info("设备移除完毕(已发送离线通知)", method, udid=udid) # ---------------- 工具函数 ---------------- def _trusted(self, udid: str) -> bool: + # 30s 短缓存,减少 IO + now = _monotonic() + exp = self._trusted_cache.get(udid, 0.0) + if exp > now: + return True try: BaseDevice(udid).get_value("DeviceName") + self._trusted_cache[udid] = now + 30.0 return True except Exception: return False - def startWda(self, udid): - method = "startWda" - LogManager.method_info("进入启动流程", method, udid=udid) + # ======= WDA 探测/等待(仅走 iproxy+HTTP,不触发 xctest) ======= + def _wda_http_status_ok(self, udid: str, timeout_sec: float = 1.2) -> bool: + """临时 iproxy 转发到 wdaFunctionPort,GET /status 成功视为 OK。""" + method = "_wda_http_status_ok" + tmp_port = self._pick_new_port() + proc = None try: - dev = tidevice.Device(udid) - systemVersion = int(dev.product_version.split(".")[0]) - if systemVersion > 17: - LogManager.method_info(f"iOS 主版本 {systemVersion},使用 IOSActivator", method, udid=udid) - ios = IOSActivator() - threading.Thread(target=ios.activate, args=(udid,), daemon=True).start() - else: - LogManager.method_info(f"app_start WDA: {WdaAppBundleId}", method, udid=udid) - dev.app_start(WdaAppBundleId) - LogManager.method_info("WDA 启动完成,等待稳定...", method, udid=udid) - time.sleep(3) + cmd = [self._iproxy_path, "-u", udid, str(tmp_port), str(wdaFunctionPort)] + proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + if not self._wait_until_listening(tmp_port, initial_timeout=0.8): + LogManager.method_info(f"WDA探测:临时端口未监听({tmp_port})", method, udid=udid) + return False + + conn = http.client.HTTPConnection("127.0.0.1", tmp_port, timeout=timeout_sec) + try: + conn.request("GET", "/status") + resp = conn.getresponse() + _ = resp.read(256) + code = getattr(resp, "status", 0) + ok = 200 <= code < 400 + LogManager.method_info(f"WDA探测:/status code={code}, ok={ok}", method, udid=udid) + return ok + except Exception as e: + LogManager.method_info(f"WDA探测异常:{e}", method, udid=udid) + return False + finally: + try: + conn.close() + except Exception: + pass + finally: + if proc: + try: + p = psutil.Process(proc.pid) + p.terminate() + p.wait(timeout=0.6) + except Exception: + try: + p.kill() + except Exception: + pass + + def _wait_wda_ready_http(self, udid: str, total_timeout_sec: float = None, interval_sec: float = 0.6) -> bool: + """ + 通过 _wda_http_status_ok 轮询等待 WDA Ready。 + total_timeout_sec 默认取环境变量 WDA_READY_TIMEOUT(默认 35s)。 + """ + method = "_wait_wda_ready_http" + if total_timeout_sec is None: + total_timeout_sec = self.WDA_READY_TIMEOUT + deadline = _monotonic() + total_timeout_sec + while _monotonic() < deadline: + if self._wda_http_status_ok(udid, timeout_sec=1.2): + LogManager.method_info("WDA 就绪(HTTP轮询)", method, udid=udid) + return True + time.sleep(interval_sec) + LogManager.method_warning(f"WDA 等待超时(HTTP轮询,{total_timeout_sec}s)", method, udid=udid) + return False + + def _wda_is_running(self, udid: str, cache_sec: float = 2.0) -> bool: + """轻量速查,走 HTTP /status(短缓存节流),避免触发 xctest。""" + now = _monotonic() + exp = self._wda_ok_cache.get(udid, 0.0) + if exp > now: return True - except Exception as e: - LogManager.method_error(f"WDA 启动异常:{e}", method, udid=udid) - return False + ok = self._wda_http_status_ok(udid, timeout_sec=1.2) + if ok: + self._wda_ok_cache[udid] = now + cache_sec + return ok def _screen_info(self, udid: str): method = "_screen_info" @@ -215,7 +349,7 @@ class DeviceInfo: return 0, 0, 0 # ---------------- 端口/进程:不复用端口 ---------------- - def _is_port_bindable(self, port: int, host: str = "127.0.0.1") -> bool: + def _is_port_free(self, port: int, host: str = "127.0.0.1") -> bool: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -228,36 +362,38 @@ class DeviceInfo: def _pick_new_port(self, tries: int = 40) -> int: method = "_pick_new_port" - # 先在 9111~9499 随机尝试 - for _ in range(tries // 2): - p = random.randint(9111, 9499) - if self._is_port_bindable(p): - LogManager.method_info(f"端口候选可用(9k段):{p}", method, udid="system") + for _ in range(max(1, tries // 2)): + p = random.randint(self.PORT_RAND_LOW_1, self.PORT_RAND_HIGH_1) + if self._is_port_free(p): + LogManager.method_info(f"端口候选可用(首段):{p}", method, udid="system") return p else: - LogManager.method_info(f"端口候选占用(9k段):{p}", method, udid="system") - # 再在 20000~48000 随机尝试 + LogManager.method_info(f"端口候选占用(首段):{p}", method, udid="system") for _ in range(tries): - p = random.randint(20000, 48000) - if self._is_port_bindable(p): - LogManager.method_info(f"端口候选可用(20k-48k):{p}", method, udid="system") + p = random.randint(self.PORT_RAND_LOW_2, self.PORT_RAND_HIGH_2) + if self._is_port_free(p): + LogManager.method_info(f"端口候选可用(次段):{p}", method, udid="system") return p else: - LogManager.method_info(f"端口候选占用(20k-48k):{p}", method, udid="system") + LogManager.method_info(f"端口候选占用(次段):{p}", method, udid="system") LogManager.method_warning("随机端口尝试耗尽,改顺序扫描", method, udid="system") - return self._pick_free_port(start=49152, limit=10000) + return self._pick_free_port(start=self.PORT_SCAN_START, limit=self.PORT_SCAN_LIMIT) - def _wait_until_listening(self, port: int, timeout: float = 2.0) -> bool: + def _wait_until_listening(self, port: int, initial_timeout: float = 2.0) -> bool: + """自适应等待端口监听:2s -> 3s -> 5s(最多约10s)。""" method = "_wait_until_listening" - deadline = time.time() + timeout - while time.time() < deadline: - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.settimeout(0.2) - if s.connect_ex(("127.0.0.1", port)) == 0: - LogManager.method_info(f"端口已开始监听:{port}", method, udid="system") - return True - time.sleep(0.05) - LogManager.method_warning(f"监听验收超时:{port}", method, udid="system") + timeouts = [initial_timeout, 3.0, 5.0] + for to in timeouts: + deadline = _monotonic() + to + while _monotonic() < deadline: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.settimeout(0.2) + if s.connect_ex(("127.0.0.1", port)) == 0: + LogManager.method_info(f"端口已开始监听:{port}", method, udid="system") + return True + time.sleep(0.05) + LogManager.method_info(f"监听验收阶段超时:{port},扩展等待", method, udid="system") + LogManager.method_warning(f"监听验收最终超时:{port}", method, udid="system") return False def _start_iproxy(self, udid: str, port: Optional[int] = None) -> Optional[subprocess.Popen]: @@ -269,13 +405,12 @@ class DeviceInfo: LogManager.method_info(f"发现旧 iproxy,准备结束:pid={old_pid}", method, udid=udid) self._kill_pid_gracefully(old_pid) self._pid_by_udid.pop(udid, None) - time.sleep(0.2) attempts = 0 while attempts < 3: attempts += 1 local_port = port if (attempts == 1 and port is not None) else self._pick_new_port() - if not self._is_port_bindable(local_port): + if not self._is_port_free(local_port): LogManager.method_info(f"[attempt {attempts}] 端口竞争,换候选:{local_port}", method, udid=udid) continue @@ -301,10 +436,10 @@ class DeviceInfo: startupinfo=startupinfo ) except Exception as e: - LogManager.method_warning(f"创建进程失败:{e}", method, udid=udid) + LogManager.method_warning(f"创建 iproxy 进程失败:{e}", method, udid=udid) continue - if not self._wait_until_listening(local_port, timeout=2.0): + if not self._wait_until_listening(local_port, initial_timeout=2.0): LogManager.method_warning(f"[attempt {attempts}] iproxy 未监听,重试换端口", method, udid=udid) self._kill(proc) continue @@ -329,20 +464,26 @@ class DeviceInfo: if not proc: return try: - proc.terminate() - proc.wait(timeout=2) - LogManager.method_info("进程已正常终止", method, udid="system") - except Exception: + p = psutil.Process(proc.pid) + p.terminate() try: - os.kill(proc.pid, signal.SIGKILL) + p.wait(timeout=2.0) + LogManager.method_info("进程已正常终止", method, udid="system") + except psutil.TimeoutExpired: + p.kill() LogManager.method_warning("进程被强制杀死", method, udid="system") - except Exception as e: - LogManager.method_warning(f"强杀失败:{e}", method, udid="system") + except Exception as e: + LogManager.method_warning(f"结束进程异常:{e}", method, udid="system") # ---------------- 自愈:直接换新端口重启 + 指数退避 ---------------- + def _next_backoff(self, prev_backoff: float) -> float: + if prev_backoff <= 0: + return self.BACKOFF_MIN_SEC + return min(prev_backoff * self.BACKOFF_GROWTH, self.BACKOFF_MAX_SEC) + def _restart_iproxy(self, udid: str): method = "_restart_iproxy" - now = time.time() + now = _monotonic() next_allowed = self._heal_backoff.get(udid, 0.0) if now < next_allowed: delta = round(next_allowed - now, 2) @@ -354,7 +495,6 @@ class DeviceInfo: if proc: LogManager.method_info(f"为重启准备清理旧 iproxy,pid={proc.pid}", method, udid=udid) self._kill(proc) - time.sleep(0.2) model = self._models.get(udid) if not model: LogManager.method_warning("模型不存在,取消自愈", method, udid=udid) @@ -362,13 +502,13 @@ class DeviceInfo: proc2 = self._start_iproxy(udid, port=None) if not proc2: - backoff_old = max(1.5, next_allowed - now + 1.0) if next_allowed > now else 1.5 - backoff = min(backoff_old * 1.7, 15.0) + prev = max(0.0, next_allowed - now) + backoff = self._next_backoff(prev) self._heal_backoff[udid] = now + backoff LogManager.method_warning(f"重启失败,扩展退避 {round(backoff,2)}s", method, udid=udid) return - # 成功后短退避 + # 成功后短退避(抑制频繁重启) self._heal_backoff[udid] = now + 1.2 # 通知前端新端口 @@ -388,23 +528,19 @@ class DeviceInfo: conn.request("HEAD", "/") resp = conn.getresponse() _ = resp.read(128) + code = getattr(resp, "status", 0) conn.close() - return True + return 200 <= code < 400 except Exception: return False def _health_check_wda(self, udid: str) -> bool: - method = "_health_check_wda" - try: - c = wda.USBClient(udid, wdaFunctionPort) - st = c.status() - return bool(st) - except Exception: - return False + # 使用 HTTP 探测(带短缓存),避免触发 xctest + return self._wda_is_running(udid, cache_sec=1.0) def _check_and_heal_tunnels(self, interval: float = 5.0): method = "_check_and_heal_tunnels" - now = time.time() + now = _monotonic() if now - self._last_heal_check_ts < interval: return self._last_heal_check_ts = now @@ -429,16 +565,20 @@ class DeviceInfo: LogManager.method_warning(f"检测到不健康,触发重启;port={port}", method, udid=udid) self._restart_iproxy(udid) - # ---------------- Windows 专用:列出所有 iproxy 命令行 ---------------- + # ---------------- Windows/*nix:列出所有 iproxy 命令行 ---------------- def _get_all_iproxy_cmdlines(self) -> List[str]: method = "_get_all_iproxy_cmdlines" lines: List[str] = [] with self._lock: live_pids = set(self._pid_by_udid.values()) + + is_windows = os.name == "nt" + target_name = "iproxy.exe" if is_windows else "iproxy" + for p in psutil.process_iter(attrs=["name", "cmdline", "pid"]): try: name = (p.info.get("name") or "").lower() - if name != "iproxy.exe": + if name != target_name: continue if p.info["pid"] in live_pids: continue @@ -464,12 +604,14 @@ class DeviceInfo: for ln in self._get_all_iproxy_cmdlines(): parts = ln.split() try: + if "-u" not in parts: + continue udid = parts[parts.index('-u') + 1] pid = int(parts[-1]) if pid not in live_pids and udid not in live_udids: self._kill_pid_gracefully(pid) cleaned += 1 - LogManager.method_warning(f"孤儿 iproxy 已清理:udid={udid}, pid={pid}", method, udid="system") + LogManager.method_warning(f"孤儿 iproxy 已清理:udid={udid}, pid={pid}", method, udid=udid) except (ValueError, IndexError): continue @@ -492,16 +634,6 @@ class DeviceInfo: LogManager.method_warning(f"kill 进程异常:pid={pid}, err={e}", method, udid="system") # ---------------- 端口工具(兜底) ---------------- - def _is_port_free(self, port: int) -> bool: - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - s.settimeout(0.2) - try: - s.bind(("127.0.0.1", port)) - return True - except OSError: - return False - def _pick_free_port(self, start: int = None, limit: int = 2000) -> int: method = "_pick_free_port" p = self._port if start is None else start @@ -525,13 +657,22 @@ class DeviceInfo: LogManager.method_warning(f"通知管理器异常:{e}", method, udid=model.deviceId) def _find_iproxy(self) -> str: + """优先环境变量 IPROXY_PATH;否则按平台在 resources/iproxy 查找。""" method = "_find_iproxy" + + env_path = os.getenv("IPROXY_PATH") + if env_path and Path(env_path).is_file(): + LogManager.method_info(f"使用环境变量指定的 iproxy 路径:{env_path}", method, udid="system") + return env_path + base = Path(__file__).resolve().parent.parent - name = "iproxy.exe" + is_windows = os.name == "nt" + name = "iproxy.exe" if is_windows else "iproxy" path = base / "resources" / "iproxy" / name LogManager.method_info(f"查找 iproxy 路径:{path}", method, udid="system") if path.is_file(): return str(path) + err = f"iproxy 不存在: {path}" LogManager.method_error(err, method, udid="system") raise FileNotFoundError(err) \ No newline at end of file diff --git a/Module/__pycache__/DeviceInfo.cpython-312.pyc b/Module/__pycache__/DeviceInfo.cpython-312.pyc index 5b87331..fb33a67 100644 Binary files a/Module/__pycache__/DeviceInfo.cpython-312.pyc and b/Module/__pycache__/DeviceInfo.cpython-312.pyc differ