From 22da7425321fe267d67cdf19cdeef49b482d5ae5 Mon Sep 17 00:00:00 2001 From: milk <53408947@qq.com> Date: Mon, 24 Nov 2025 20:38:50 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Module/DeviceInfo.py | 221 ++++++++++++++++------------ Module/IOSActivator.py | 324 +++++++++++++++++++++++++---------------- 2 files changed, 323 insertions(+), 222 deletions(-) diff --git a/Module/DeviceInfo.py b/Module/DeviceInfo.py index 97ebf9b..854e752 100644 --- a/Module/DeviceInfo.py +++ b/Module/DeviceInfo.py @@ -7,11 +7,11 @@ import subprocess from typing import Dict, Optional import tidevice -# import wda # 目前先不用 wda 获取屏幕,避免触发 tidevice 的那套 WDA 启动 +import wda from tidevice import Usbmux, ConnectionType from Entity.DeviceModel import DeviceModel -from Entity.Variables import WdaAppBundleId, wdaScreenPort, wdaFunctionPort +from Entity.Variables import WdaAppBundleId, wdaFunctionPort from Module.FlaskSubprocessManager import FlaskSubprocessManager from Module.IOSActivator import IOSActivator from Utils.LogManager import LogManager @@ -21,6 +21,9 @@ class DeviceInfo: _instance = None _instance_lock = threading.Lock() + # 离线宽限期 + REMOVE_GRACE_SEC = float(os.getenv("REMOVE_GRACE_SEC", "6.0")) + def __new__(cls, *args, **kwargs): if not cls._instance: with cls._instance_lock: @@ -28,8 +31,6 @@ class DeviceInfo: cls._instance = super().__new__(cls) return cls._instance - REMOVE_GRACE_SEC = float(os.getenv("REMOVE_GRACE_SEC", "6.0")) - def __init__(self) -> None: if getattr(self, "_initialized", False): return @@ -55,7 +56,7 @@ class DeviceInfo: self._creationflags = 0 si = subprocess.STARTUPINFO() - si.dwFlags |= subprocess.STARTF_USESHOWWINDOW + si.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type: ignore[attr-defined] si.wShowWindow = 0 # SW_HIDE self._startupinfo = si @@ -63,9 +64,9 @@ class DeviceInfo: print("[Init] DeviceInfo 初始化完成") self._initialized = True - # --------------------------- + # ========================== # 主循环 - # --------------------------- + # ========================== def listen(self): LogManager.method_info("进入主循环", "listen", udid="system") print("[Listen] 开始监听设备上下线...") @@ -82,18 +83,18 @@ class DeviceInfo: with self._lock: known = set(self._models.keys()) - # ---------- 1. 新设备 ---------- + # 1. 新设备 for udid in online: self._last_seen[udid] = time.time() if udid not in known: try: self._add_device(udid) except Exception as e: - # 关键:单设备异常不能干掉整个循环 + # 单设备异常不能干掉整个循环 LogManager.warning(f"[Add] 处理设备 {udid} 异常: {e}", udid=udid) print(f"[Add] 处理设备 {udid} 异常: {e}") - # ---------- 2. 可能离线设备 ---------- + # 2. 可能离线的设备 now = time.time() for udid in list(known): if udid not in online: @@ -107,9 +108,9 @@ class DeviceInfo: time.sleep(1) - # --------------------------- + # ========================== # 添加设备 - # --------------------------- + # ========================== def _add_device(self, udid: str): with self._lock: if udid in self._models: @@ -125,20 +126,7 @@ class DeviceInfo: print(f"[Add] 获取系统版本失败 {udid}: {e}") version_major = 0 - # 启动 WDA - if version_major >= 17.0: - threading.Thread( - target=IOSActivator().activate_ios17, - args=(udid,), - daemon=True, - ).start() - else: - try: - tidevice.Device(udid).app_start(WdaAppBundleId) - except Exception as e: - print(f"[Add] 使用 tidevice 启动 WDA 失败 {udid}: {e}") - - # 分配投屏端口 & 写入模型 + # 分配投屏端口 & 写入模型(先插入,width/height=0,后面再异步更新) with self._lock: self.screenPort += 1 screen_port = self.screenPort @@ -149,7 +137,7 @@ class DeviceInfo: width=0, height=0, scale=0, - type=1 + type=1, ) self._models[udid] = model @@ -162,16 +150,106 @@ class DeviceInfo: except Exception as e: print(f"[iproxy] 启动失败 {udid}: {e}") - # 异步等待 WDA + 更新屏幕尺寸(后面你要真用尺寸,再把 _screen_info 换成 wda 版本即可) + # 启动 WDA + if version_major >= 17.0: + # iOS17+ 走 go-ios,传入回调:WDA 启动后再拿屏幕尺寸 + threading.Thread( + target=IOSActivator().activate_ios17, + args=(udid, self._on_wda_ready), + daemon=True, + ).start() + else: + # 旧版本直接用 tidevice 启动 WDA,然后异步获取屏幕尺寸 + try: + tidevice.Device(udid).app_start(WdaAppBundleId) + except Exception as e: + print(f"[Add] 使用 tidevice 启动 WDA 失败 {udid}: {e}") + else: + threading.Thread( + target=self._fetch_screen_and_notify, + args=(udid,), + daemon=True, + ).start() + + # ========================== + # WDA 启动回调(iOS17+) + # ========================== + def _on_wda_ready(self, udid: str): + print(f"[WDA] 回调触发,准备获取屏幕信息 udid={udid}") + # 稍微等一下再连,避免刚启动时不稳定 + time.sleep(1) threading.Thread( - target=self._async_wait_wda_and_update_screen, + target=self._fetch_screen_and_notify, args=(udid,), - daemon=True + daemon=True, ).start() - # --------------------------- - # 启动 iproxy(投屏) - # --------------------------- + # ========================== + # 通过 WDA 获取屏幕信息 + # ========================== + def _screen_info(self, udid: str): + try: + # 用 USBClient,通过 WDA 功能端口访问 + c = wda.USBClient(udid, wdaFunctionPort) + size = c.window_size() + + w = int(size.width) + h = int(size.height) + s = float(c.scale) # facebook-wda 的 scale 挂在 client 上 + + print(f"[Screen] 成功获取屏幕 {w}x{h} scale={s} {udid}") + return w, h, s + except Exception as e: + print(f"[Screen] 获取屏幕失败: {e} udid={udid}") + return 0, 0, 0.0 + + # ========================== + # 异步获取屏幕尺寸并通知 Flask + # ========================== + def _fetch_screen_and_notify(self, udid: str): + """ + 后台线程里多次尝试通过 WDA 获取屏幕尺寸, + 成功后更新 model 并发一次 snapshot。 + """ + max_retry = 15 + interval = 1.0 + + # 给 WDA 一点启动缓冲时间 + time.sleep(2.0) + + for _ in range(max_retry): + # 设备已移除就不再尝试 + with self._lock: + if udid not in self._models: + print(f"[Screen] 设备已移除,停止获取屏幕信息 udid={udid}") + return + + w, h, s = self._screen_info(udid) + if w > 0 and h > 0: + # 更新模型 + with self._lock: + m = self._models.get(udid) + if not m: + print(f"[Screen] 模型已不存在,无法更新 udid={udid}") + return + m.width = w + m.height = h + m.scale = s + + print(f"[Screen] 屏幕信息更新完成,准备推送到 Flask udid={udid}") + try: + self._manager_send() + except Exception as e: + print(f"[Screen] 发送屏幕更新到 Flask 失败 udid={udid}, err={e}") + return + + time.sleep(interval) + + print(f"[Screen] 多次尝试仍未获取到屏幕信息 udid={udid}") + + # ========================== + # iproxy 管理 + # ========================== def _start_iproxy(self, udid: str, local_port: int): iproxy_path = self._find_iproxy() @@ -183,14 +261,14 @@ class DeviceInfo: args = [ iproxy_path, - "-u", udid, - str(local_port), # 本地端口(投屏) - "9567" # 手机端口(go-ios screencast) + "-u", + udid, + str(local_port), # 本地端口(投屏) + "9567", # 手机端口(go-ios screencast) ] print(f"[iproxy] 启动进程: {args}") - # 不用 PIPE,防止没人读导致缓冲爆掉;窗口用前面配置隐藏 proc = subprocess.Popen( args, stdout=subprocess.DEVNULL, @@ -198,12 +276,8 @@ class DeviceInfo: creationflags=self._creationflags, startupinfo=self._startupinfo, ) - self._iproxy_process[udid] = proc - # --------------------------- - # 停止 iproxy - # --------------------------- def _stop_iproxy(self, udid: str): p = self._iproxy_process.get(udid) if not p: @@ -219,39 +293,13 @@ class DeviceInfo: self._iproxy_process.pop(udid, None) print(f"[iproxy] 已停止 {udid}") - # --------------------------- - # 异步等待 WDA 然后更新屏幕大小 - # --------------------------- - def _async_wait_wda_and_update_screen(self, udid: str): - print(f"[WDA] 等待 WDA 就绪 {udid}") - - try: - # 最长等待 20 秒(你后面要真用屏幕尺寸,再把 _screen_info 换回 wda 的实现) - for _ in range(20): - w, h, s = self._screen_info(udid) - if w > 0: - print(f"[WDA] 屏幕信息成功 {udid} {w}x{h} scale={s}") - with self._lock: - m = self._models.get(udid) - if m: - m.width = w - m.height = h - m.scale = s - self._manager_send() - return - time.sleep(1) - except Exception as e: - print(f"[WDA] 获取屏幕信息异常 {udid}: {e}") - - print(f"[WDA] 屏幕信息获取失败(超时) {udid}") - - # --------------------------- + # ========================== # 移除设备 - # --------------------------- + # ========================== def _remove_device(self, udid: str): print(f"[Remove] 移除设备 {udid}") - # 先停 iproxy(只管自己的,不影响其他设备) + # 先停 iproxy self._stop_iproxy(udid) with self._lock: @@ -260,34 +308,17 @@ class DeviceInfo: self._manager_send() - # --------------------------- - # WDA 屏幕查询(当前保持空实现) - # --------------------------- - def _screen_info(self, udid: str): - # 现在先不通过 wda 取屏幕,避免触发 tidevice 那套 WDA 启动逻辑 - # 你如果确认 go-ios 跑起来后用 facebook-wda 取尺寸是安全的,可以改回下面这种: - # - # try: - # c = wda.USBClient(udid, wdaFunctionPort) - # size = c.window_size() - # return int(size.width), int(size.height), float(c.scale) - # except Exception as e: - # print(f"[Screen] 获取屏幕信息异常: {e} {udid}") - # return 0, 0, 0.0 - return 0, 0, 0.0 - - # --------------------------- - # 找 iproxy - # --------------------------- + # ========================== + # 工具方法 + # ========================== def _find_iproxy(self) -> str: base = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) name = "iproxy.exe" if os.name == "nt" else "iproxy" - path = os.path.join(base, "resources", "iproxy", name) - return path + return os.path.join(base, "resources", "iproxy", name) - # --------------------------- - # 数据同步到 Flask - # --------------------------- + # ========================== + # 同步数据到 Flask + # ========================== def _manager_send(self): try: self._send_snapshot_to_flask() @@ -306,8 +337,8 @@ class DeviceInfo: devices = [m.toDict() for m in self._models.values()] payload = json.dumps({"devices": devices}, ensure_ascii=False) - port = int(os.getenv("FLASK_COMM_PORT", "34566")) + with socket.create_connection(("127.0.0.1", port), timeout=1.5) as s: s.sendall(payload.encode() + b"\n") diff --git a/Module/IOSActivator.py b/Module/IOSActivator.py index 04a58d4..3fd6baa 100644 --- a/Module/IOSActivator.py +++ b/Module/IOSActivator.py @@ -1,84 +1,96 @@ -import subprocess -import threading -import time import os import sys -from typing import Tuple, Optional +import time +import threading +import subprocess +from typing import Optional, Callable + from Entity.Variables import WdaAppBundleId class IOSActivator: """ - 专门给 iOS17+ 设备用的 go-ios 激活器: - - 外部先探测 WDA,不存在时再调用 activate_ios17 - - 内部流程:tunnel start -> pair(等待成功) -> image auto -> runwda + 给 iOS17+ 用的 go-ios 激活器(单例): + - 维护一条全局 tunnel 进程 + - 流程:tunnel start -> pair(重试) -> image auto(不致命) -> runwda(多次重试+日志判定成功) + - WDA 启动成功后触发回调 on_wda_ready(udid) """ + # ===== 单例 & 全局 tunnel ===== + _instance = None + _instance_lock = threading.Lock() + + _tunnel_proc: Optional[subprocess.Popen] = None + _tunnel_lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + with cls._instance_lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + def __init__( self, ios_path: Optional[str] = None, - pair_timeout: int = 60, # 配对最多等多久 - pair_retry_interval: int = 3, # 每次重试间隔 + pair_timeout: int = 60, # 配对最多等多久 + pair_retry_interval: int = 3, # 配对重试间隔 + runwda_max_retry: int = 3, # runwda 最大重试次数 + runwda_retry_interval: int = 3,# runwda 重试间隔 + runwda_wait_timeout: int = 25 # 单次 runwda 等待“成功日志”的超时时间 ): - """ - :param ios_path: ios.exe 的绝对路径,例如 E:\\code\\Python\\iOSAi\\resources\\ios.exe - 如果为 None,则自动从项目的 resources 目录中寻找 ios.exe - """ + if getattr(self, "_inited", False): + return - # ==== 统一获取 resources 目录(支持源码运行 + Nuitka EXE) ==== + # 运行路径处理(源码 / Nuitka EXE) if "__compiled__" in globals(): - # 被 Nuitka 编译后的 exe 运行时 - base_dir = os.path.dirname(sys.executable) # exe 所在目录 + base_dir = os.path.dirname(sys.executable) else: - # 开发环境,直接跑 .py - cur_file = os.path.abspath(__file__) # 当前 .py 文件所在目录 - base_dir = os.path.dirname(os.path.dirname(cur_file)) # 回到项目根 iOSAi + cur_file = os.path.abspath(__file__) + base_dir = os.path.dirname(os.path.dirname(cur_file)) resource_dir = os.path.join(base_dir, "resources") - - # 如果外部没有显式传 ios_path,就用 resources/ios.exe - if ios_path is None or ios_path == "": + if not ios_path: ios_path = os.path.join(resource_dir, "ios.exe") self.ios_path = ios_path self.pair_timeout = pair_timeout self.pair_retry_interval = pair_retry_interval + self.runwda_max_retry = runwda_max_retry + self.runwda_retry_interval = runwda_retry_interval + self.runwda_wait_timeout = runwda_wait_timeout + self._lock = threading.Lock() - # go-ios tunnel 的后台进程 - self._tunnel_proc: Optional[subprocess.Popen] = None - - # Windows: 避免弹黑框 + # Windows 隐藏黑框 self._creationflags = 0 - self._startupinfo = None # ⭐ 新增:统一控制窗口隐藏 + self._startupinfo = None if os.name == "nt": try: self._creationflags = subprocess.CREATE_NO_WINDOW # type: ignore[attr-defined] except Exception: self._creationflags = 0 - - # ⭐ 用 STARTUPINFO + STARTF_USESHOWWINDOW 彻底隐藏窗口 si = subprocess.STARTUPINFO() - si.dwFlags |= subprocess.STARTF_USESHOWWINDOW + si.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type: ignore[attr-defined] si.wShowWindow = 0 # SW_HIDE self._startupinfo = si - # ======================= - # 基础执行封装 - # ======================= + self._inited = True + + # ===== 通用同步命令执行 ===== def _run( self, args, desc: str = "", timeout: Optional[int] = None, check: bool = True, - ) -> Tuple[int, str, str]: - """ - 同步执行 ios.exe,等它返回。 - :return: (returncode, stdout, stderr) - """ + ): cmd = [self.ios_path] + list(args) + cmd_str = " ".join(cmd) + if desc: + print(f"[ios] 执行命令({desc}): {cmd_str}") + else: + print(f"[ios] 执行命令: {cmd_str}") try: proc = subprocess.run( @@ -88,7 +100,7 @@ class IOSActivator: text=True, timeout=timeout, creationflags=self._creationflags, - startupinfo=self._startupinfo, # ⭐ 关键:隐藏窗口 + startupinfo=self._startupinfo, ) except subprocess.TimeoutExpired: if check: @@ -97,21 +109,42 @@ class IOSActivator: out = proc.stdout or "" err = proc.stderr or "" - if check and proc.returncode != 0: + print(f"[ios] 命令失败({desc}), rc={proc.returncode}") raise RuntimeError(f"[ios] 命令失败({desc}), returncode={proc.returncode}") return proc.returncode, out, err - def _spawn_tunnel(self) -> None: - with self._lock: - if self._tunnel_proc is not None and self._tunnel_proc.poll() is None: + # ===== tunnel 相关 ===== + def _drain_process_output(self, proc: subprocess.Popen, name: str): + """吃掉后台进程输出,防止缓冲区阻塞""" + try: + if proc.stdout: + for line in proc.stdout: + line = line.rstrip() + if line: + print(f"[ios][{name}] {line}") + except Exception as e: + print(f"[ios][{name}] 读取 stdout 异常: {e}") + + try: + if proc.stderr: + for line in proc.stderr: + line = line.rstrip() + if line: + print(f"[ios][{name}][stderr] {line}") + except Exception as e: + print(f"[ios][{name}] 读取 stderr 异常: {e}") + + def _spawn_tunnel(self): + """启动 / 复用全局 tunnel""" + with IOSActivator._tunnel_lock: + if IOSActivator._tunnel_proc is not None and IOSActivator._tunnel_proc.poll() is None: print("[ios] tunnel 已经在运行,跳过重新启动") return cmd = [self.ios_path, "tunnel", "start"] - print("[ios] 启动 go-ios tunnel: %s", " ".join(cmd)) - + print("[ios] 启动 go-ios tunnel:", " ".join(cmd)) try: proc = subprocess.Popen( cmd, @@ -119,15 +152,14 @@ class IOSActivator: stderr=subprocess.PIPE, text=True, creationflags=self._creationflags, - startupinfo=self._startupinfo, # ⭐ 关键:隐藏窗口 + startupinfo=self._startupinfo, ) except Exception as e: - # 这里改成 warning,并且直接返回,不往外抛 - print("[ios] 启动 tunnel 失败(忽略): %s", e) + print("[ios] 启动 tunnel 失败(忽略):", e) return - self._tunnel_proc = proc - print("[ios] tunnel 启动成功, PID=%s", proc.pid) + IOSActivator._tunnel_proc = proc + print("[ios] tunnel 启动成功, PID=", proc.pid) threading.Thread( target=self._drain_process_output, @@ -135,39 +167,14 @@ class IOSActivator: daemon=True, ).start() - def _drain_process_output(self, proc: subprocess.Popen, name: str): - """简单把后台进程的输出打到日志里,避免缓冲区阻塞。""" - try: - if proc.stdout: - for line in proc.stdout: - line = line.rstrip() - print(line) - except Exception as e: - print("[ios][%s] 读取 stdout 异常: %s", name, e) - - try: - if proc.stderr: - for line in proc.stderr: - line = line.rstrip() - if line: - print("[ios][%s][stderr] %s", name, line) - except Exception as e: - print("[ios][%s] 读取 stderr 异常: %s", name, e) - - # ======================= - # 具体步骤封装 - # ======================= - def _pair_until_success(self, udid: str) -> None: - """ - 调用 `ios --udid pair`,直到成功或者超时。 - 成功条件:stdout 中出现 "Successfully paired" - """ + # ===== pair & image ===== + def _pair_until_success(self, udid: str): deadline = time.time() + self.pair_timeout attempt = 0 while True: attempt += 1 - print("[ios] 开始配对设备(%s),第 %d 次尝试", udid, attempt) + print(f"[ios] 开始配对设备({udid}),第 {attempt} 次尝试") rc, out, err = self._run( ["--udid", udid, "pair"], @@ -178,23 +185,16 @@ class IOSActivator: text = (out or "") + "\n" + (err or "") if "Successfully paired" in text: - print("[ios] 设备 %s 配对成功", udid) + print(f"[ios] 设备 {udid} 配对成功") return if time.time() >= deadline: - raise RuntimeError(f"[ios] 设备 {udid} 在超时时间内配对失败") + raise RuntimeError(f"[ios] 设备 {udid} 在超时时间内配对失败(rc={rc})") time.sleep(self.pair_retry_interval) - def _mount_dev_image(self, udid: str) -> None: - """ - `ios --udid image auto` 挂载开发者镜像。 - 成功条件:输出里出现 "success mounting image" 或 - "there is already a developer image mounted" 之类的提示。 - 挂载失败现在只打 warning,不再抛异常,避免阻断后续 runwda。 - """ - print("[ios] 开始为设备 %s 挂载开发者镜像 (image auto)", udid) - + def _mount_dev_image(self, udid: str): + print(f"[ios] 开始为设备 {udid} 挂载开发者镜像 (image auto)") rc, out, err = self._run( ["--udid", udid, "image", "auto"], desc=f"image auto({udid})", @@ -204,70 +204,140 @@ class IOSActivator: text = (out or "") + "\n" + (err or "") text_lower = text.lower() - - # 这些都当成功处理 success_keywords = [ "success mounting image", "there is already a developer image mounted", ] if any(k in text_lower for k in success_keywords): - print("[ios] 设备 %s 开发者镜像挂载完成", udid) + print(f"[ios] 设备 {udid} 开发者镜像挂载完成") if text.strip(): - print("[ios][image auto] output:\n%s", text.strip()) + print("[ios][image auto] output:\n", text.strip()) return - # 到这里说明没找到成功关键字,当成“不可靠但非致命” - print( - "[ios] 设备 %s 挂载开发者镜像可能失败(rc=%s),输出:\n%s", - udid, rc, text.strip() - ) - # 关键:不再 raise,直接 return,让后续 runwda 继续试 - return + print(f"[ios] 设备 {udid} 挂载开发者镜像可能失败(rc={rc}),输出:\n{text.strip()}") - def _run_wda(self, udid: str) -> None: - # ⭐ 按你验证的命令构造参数(绝对正确) - args = [ + # ===== runwda(关键逻辑) ===== + def _run_wda_once_async(self, udid: str, on_wda_ready: Optional[Callable[[str], None]]) -> bool: + """ + 单次 runwda: + - 异步启动 ios.exe + - 实时读 stdout/stderr + - 捕获关键日志(got capabilities / authorized true)视为成功 + - 超时/进程退出且未成功 -> 失败 + """ + cmd = [ + self.ios_path, f"--udid={udid}", "runwda", f"--bundleid={WdaAppBundleId}", f"--testrunnerbundleid={WdaAppBundleId}", - "--xctestconfig=yolo.xctest", # ⭐ 你亲自验证成功的值 + "--xctestconfig=yolo.xctest", ] + print("[ios] 异步启动 runwda:", " ".join(cmd)) - rc, out, err = self._run( - args, - desc=f"runwda({udid})", - timeout=300, - check=False, - ) + try: + proc = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + creationflags=self._creationflags, + startupinfo=self._startupinfo, + ) + except Exception as e: + print(f"[ios] 启动 runwda 进程失败: {e}") + return False - # ======================= - # 对外主流程 - # ======================= - def activate_ios17(self, udid: str) -> None: - print("[WDA] iOS17+ 激活开始,udid=%s", udid) + success_evt = threading.Event() - # 1. 启动 tunnel + def _reader(pipe, tag: str): + try: + for raw in pipe: + line = (raw or "").rstrip() + if not line: + continue + print(f"[WDA-LOG] {line}") + lower = line.lower() + # 这里是你实测的“成功特征” + if "got capabilities" in lower or '"authorized":true' in lower: + success_evt.set() + print(f"[ios] 捕获到 WDA 启动成功日志({tag}),udid={udid}") + break + except Exception as e: + print(f"[ios] 读取 {tag} 日志异常: {e}") + + # 日志线程 + if proc.stdout: + threading.Thread(target=_reader, args=(proc.stdout, "stdout"), daemon=True).start() + if proc.stderr: + threading.Thread(target=_reader, args=(proc.stderr, "stderr"), daemon=True).start() + + # 等待成功 / 退出 / 超时 + start = time.time() + while True: + if success_evt.is_set(): + print(f"[ios] WDA 日志确认已启动,udid={udid}") + if on_wda_ready: + try: + on_wda_ready(udid) + except Exception as e: + print(f"[WDA] 回调执行异常: {e}") + # 不主动杀进程,让 WDA 挂在那儿 + return True + + rc = proc.poll() + if rc is not None: + print(f"[ios] runwda 进程退出 rc={rc},未检测到成功日志,udid={udid}") + return False + + if time.time() - start > self.runwda_wait_timeout: + print(f"[ios] runwda 等待超时({self.runwda_wait_timeout}s),未确认成功,udid={udid}") + try: + proc.terminate() + except Exception: + pass + return False + + time.sleep(0.2) + + def _run_wda_with_retry(self, udid: str, on_wda_ready: Optional[Callable[[str], None]]) -> bool: + for attempt in range(1, self.runwda_max_retry + 1): + print(f"[ios] runwda 尝试 {attempt}/{self.runwda_max_retry},udid={udid}") + ok = self._run_wda_once_async(udid, on_wda_ready) + if ok: + print(f"[ios] runwda 第 {attempt} 次尝试成功,udid={udid}") + return True + + print(f"[ios] runwda 第 {attempt} 次尝试失败,udid={udid}") + if attempt < self.runwda_max_retry: + time.sleep(self.runwda_retry_interval) + + print(f"[ios] runwda 多次失败,放弃,udid={udid}") + return False + + # ===== 对外主流程 ===== + def activate_ios17(self, udid: str, on_wda_ready: Optional[Callable[[str], None]] = None) -> None: + print(f"[WDA] iOS17+ 激活开始,udid={udid}, 回调={on_wda_ready}") + + # 1. tunnel self._spawn_tunnel() - # 2. 一直等到 pair 成功(pair 不成功就没法玩了,直接返回) + # 2. 配对 try: self._pair_until_success(udid) except Exception as e: - print("[WDA] pair 失败,终止激活流程 udid=%s, err=%s", udid, e) + print(f"[WDA] pair 失败,终止激活流程 udid={udid}, err={e}") return - # 3. 挂载开发者镜像(现在是非致命错误) + # 3. 挂镜像(非致命) try: self._mount_dev_image(udid) except Exception as e: - # 理论上不会再进到这里,但为了稳妥,多一层保护 - print("[WDA] 挂载开发者镜像出现异常,忽略继续 udid=%s, err=%s", udid, e) + print(f"[WDA] 挂载开发者镜像异常(忽略) udid={udid}, err={e}") - # 4. 尝试启动 WDA - try: - self._run_wda(udid) - except Exception as e: - print("[WDA] runwda 调用异常 udid=%s, err=%s", udid, e) + # 4. runwda + 回调 + ok = self._run_wda_with_retry(udid, on_wda_ready) + if not ok: + print(f"[WDA] runwda 多次失败,可能需要手动检查设备,udid={udid}") - print("[WDA] iOS17+ 激活流程结束(不代表一定成功),udid=%s", udid) \ No newline at end of file + print(f"[WDA] iOS17+ 激活流程结束(不代表一定成功),udid={udid}") \ No newline at end of file