import os import re import sys import socket import subprocess from typing import Optional from Entity.Variables import WdaAppBundleId class IOSActivator: """ 轻量 iOS 激活器(仅代码调用) - 打包安全版 1) 启动 `pymobiledevice3 remote tunneld`(子进程常驻) 2) 自动挂载 Developer Disk Image(进程内调用 CLI) 3) 设备隧道就绪后启动 WDA(进程内调用 CLI): - 优先使用 `--rsd `(支持 IPv6) - 失败再用 `PYMOBILEDEVICE3_TUNNEL=127.0.0.1:` 回退(仅 IPv4) """ def __init__(self, python_executable: Optional[str] = None): # 仅用于旧逻辑兼容;本版本已不依赖 self.python 去 -m 调 CLI self.python = python_executable or None # ========================================================================= # 内部工具:进程内调用 pymobiledevice3 CLI # ========================================================================= def _pmd3_run(self, args: list[str], udid: str, extra_env: Optional[dict] = None) -> str: import subprocess launcher, env = self._resolve_pmd3_argv_and_env() env["PYMOBILEDEVICE3_UDID"] = udid env["PYTHONUNBUFFERED"] = "1" env.setdefault("PYTHONIOENCODING", "utf-8") if extra_env: for k, v in extra_env.items(): if v is None: env.pop(k, None) else: env[k] = str(v) cmd = [*launcher, *args] print("[pmd3]", " ".join(map(str, cmd))) try: return subprocess.check_output( cmd, text=True, stderr=subprocess.STDOUT, env=env, **self._win_hidden_popen_kwargs() ) or "" except subprocess.CalledProcessError as exc: raise RuntimeError(exc.output or f"pymobiledevice3 执行失败,退出码 {exc.returncode}") def _pmd3_run_subprocess(self, full_args: list[str], extra_env: Optional[dict] = None) -> str: """ 兜底:通过子进程执行 pymobiledevice3(使用 _resolve_pmd3_argv_and_env())。 """ import subprocess launcher, env = self._resolve_pmd3_argv_and_env() if extra_env: for k, v in extra_env.items(): if v is None: env.pop(k, None) else: env[k] = str(v) cmd = [*launcher, *full_args] cmd = self._ensure_str_list(cmd) print("[pmd3-subproc]", " ".join(cmd)) try: out = subprocess.check_output( cmd, text=True, stderr=subprocess.STDOUT, env=env, **self._win_hidden_popen_kwargs() ) return out or "" except subprocess.CalledProcessError as exc: raise RuntimeError(exc.output or f"pymobiledevice3 子进程执行失败,代码 {exc.returncode}") # ========================================================================= # 旧版环境依赖的替代方案:仅用于启动 tunneld 的子进程(需要常驻) # ========================================================================= def _resolve_pmd3_argv_and_env(self): import os, sys, shutil, subprocess from pathlib import Path env = os.environ.copy() env["PYTHONUNBUFFERED"] = "1" env.setdefault("PYTHONIOENCODING", "utf-8") # 1) 明确使用 IOSAI_PYTHON(如果上层已设置) prefer_py = env.get("IOSAI_PYTHON") # 2) 构造一批候选路径(先根目录,再 Scripts) base = Path(sys.argv[0]).resolve() base_dir = base.parent if base.is_file() else base py_name = "python.exe" if os.name == "nt" else "python" sidecar_candidates = [ base_dir / "python-rt" / py_name, # ✅ 嵌入式/你现在的摆放方式 base_dir / "python-rt" / "Scripts" / py_name, # 兼容虚拟环境式 base_dir.parent / "python-rt" / py_name, base_dir.parent / "python-rt" / "Scripts" / py_name, ] # 若外层已通过 IOSAI_PYTHON 指了路径,也放进候选 if prefer_py: sidecar_candidates.insert(0, Path(prefer_py)) # 打印探测 for cand in sidecar_candidates: print(f"[IOSAI] 🔎 probing sidecar at: {cand}") if cand.is_file(): try: out = subprocess.check_output( [str(cand), "-c", "import pymobiledevice3;print('ok')"], text=True, stderr=subprocess.STDOUT, env=env, timeout=6, **self._win_hidden_popen_kwargs() ) if "ok" in out: print(f"[IOSAI] ✅ sidecar selected: {cand}") return ([str(cand), "-u", "-m", "pymobiledevice3"], env) except Exception: # 候选解释器存在但没装 pmd3,就继续找下一个 pass # 3) 回退:系统 PATH 里直接找 pymobiledevice3 可执行 exe = shutil.which("pymobiledevice3") if exe: print(f"[IOSAI] ✅ use PATH executable: {exe}") return ([exe], env) # 4) 兜底:从系统 Python 里找装了 pmd3 的解释器 py_candidates = [] base_exec = getattr(sys, "_base_executable", None) if base_exec and os.path.isfile(base_exec): py_candidates.append(base_exec) for name in ("python3.exe", "python.exe", "py.exe", "python3", "python"): p = shutil.which(name) if p and p not in py_candidates: py_candidates.append(p) for py in py_candidates: print(f"[IOSAI] 🔎 probing system python: {py}") try: out = subprocess.check_output( [py, "-c", "import pymobiledevice3;print('ok')"], text=True, stderr=subprocess.STDOUT, env=env, timeout=6, **self._win_hidden_popen_kwargs() ) if "ok" in out: print(f"[IOSAI] ✅ system python selected: {py}") return ([py, "-u", "-m", "pymobiledevice3"], env) except Exception: continue raise RuntimeError("未检测到可用的 pymobiledevice3(建议携带 python-rt 或安装系统 Python+pmd3)。") def _ensure_str_list(self, seq): return [str(x) for x in seq] def _win_hidden_popen_kwargs(self): """在 Windows 上隐藏子进程窗口;非 Windows 返回空参数。""" if os.name != "nt": return {} import subprocess as _sp si = _sp.STARTUPINFO() si.dwFlags |= _sp.STARTF_USESHOWWINDOW si.wShowWindow = 0 # SW_HIDE return { "startupinfo": si, "creationflags": getattr(_sp, "CREATE_NO_WINDOW", 0x08000000), } # ========================================================================= # 功能函数- # ========================================================================= def _auto_mount_developer_disk(self, udid: str, retries: int = 3, backoff_seconds: float = 2.0) -> None: """ 使用进程内 CLI:pymobiledevice3 mounter auto-mount(带重试) """ import time last_err_text = "" for i in range(max(1, retries)): try: out = self._pmd3_run(["mounter", "auto-mount"], udid) if out: for line in out.splitlines(): print(f"[mounter] {line}") if "already mounted" in (out or "").lower(): print("[mounter] Developer disk image already mounted.") else: print("[mounter] Developer disk image mounted.") return except Exception as e: last_err_text = str(e) if i < retries - 1: print(f"[mounter] attempt {i+1}/{retries} failed, retrying in {backoff_seconds}s ...") try: time.sleep(backoff_seconds) except Exception: pass else: raise RuntimeError(f"Auto-mount failed after {retries} attempts.\n{last_err_text}") def _is_ipv4_host(self, host: str) -> bool: import re as _re return bool(_re.match(r"^\d{1,3}(\.\d{1,3}){3}$", host)) def _wait_for_rsd_ready(self, rsd_host: str, rsd_port: str, retries: int = 5, delay: float = 3.0) -> bool: """ 探测 RSD 通道是否就绪:直接尝试 TCP 连接。 """ port_int = int(rsd_port) for i in range(1, retries + 1): print(f"[rsd] Probing RSD {rsd_host}:{rsd_port} (attempt {i}/{retries}) ...") try: with socket.create_connection((rsd_host, port_int), timeout=2): print("[rsd] ✅ RSD is reachable and ready.") return True except (socket.timeout, ConnectionRefusedError, OSError) as e: print(f"[rsd] Not ready yet ({e}). Retrying...") import time as _t _t.sleep(delay) print("[rsd] ❌ RSD did not become ready after retries.") return False def _launch_wda_via_rsd(self, bundle_id: str, rsd_host: str, rsd_port: str, udid: str) -> None: """ 使用进程内 CLI:pymobiledevice3 developer dvt launch --rsd """ print(f"[wda] Launch via RSD {rsd_host}:{rsd_port}, bundle: {bundle_id}") out = self._pmd3_run( ["developer", "dvt", "launch", bundle_id, "--rsd", rsd_host, rsd_port], udid=udid, ) if out: for line in out.splitlines(): print(f"[wda] {line}") print("[wda] Launch via RSD completed.") def _launch_wda_via_http_tunnel(self, bundle_id: str, http_host: str, http_port: str, udid: str) -> None: """ 退路:通过 HTTP 网关端口设置 PYMOBILEDEVICE3_TUNNEL(仅 IPv4)。 使用进程内 CLI 启动 WDA。 """ if not self._is_ipv4_host(http_host): raise RuntimeError(f"HTTP tunnel host must be IPv4, got {http_host}") tunnel_endpoint = f"{http_host}:{http_port}" print(f"[wda] Launch via HTTP tunnel {tunnel_endpoint}, bundle: {bundle_id}") out = self._pmd3_run( ["developer", "dvt", "launch", bundle_id], udid=udid, extra_env={"PYMOBILEDEVICE3_TUNNEL": tunnel_endpoint}, ) if out: for line in out.splitlines(): print(f"[wda] {line}") print("[wda] Launch via HTTP tunnel completed.") def _pick_available_port(self, base=49151, step=10) -> int: """挑选一个可用端口(避免重复)""" for i in range(0, 1000, step): port = base + i with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: if s.connect_ex(("127.0.0.1", port)) != 0: return port raise RuntimeError("No free port found for tunneld") # ========================================================================= # 对外方法 # ========================================================================= def activate( self, udid: str, wda_bundle_id: Optional[str] = WdaAppBundleId, ready_timeout_sec: float = 120.0, mount_retries: int = 3, backoff_seconds: float = 2.0, rsd_probe_retries: int = 5, rsd_probe_delay_sec: float = 3.0, pre_mount_first: bool = True, ) -> str: """ 执行:挂镜像(可选) -> 开隧道 -> (等待 RSD 就绪)-> 启动 WDA - 优先用 `--rsd` 启动 - 失败再用 HTTP 网关作为退路 """ if not udid or not isinstance(udid, str): raise ValueError("udid is required and must be a non-empty string") print(f"[activate] UDID = {udid}") # ⚠️ 检查管理员权限 if os.name == "nt": import ctypes try: is_admin = ctypes.windll.shell32.IsUserAnAdmin() != 0 except Exception: is_admin = False if not is_admin: print("[⚠️] 当前进程未以管理员身份运行,tunneld 可能无法创建 USB 隧道。") import time as _t start_ts = _t.time() # 1) (可选) 先挂载,避免 tidevice 并发 DDI 竞态 if pre_mount_first: try: self._auto_mount_developer_disk(udid, retries=mount_retries, backoff_seconds=backoff_seconds) _t.sleep(2) # 稳定 DDI except Exception as e: print(f"[activate] 预挂载失败(继续尝试开隧道后再挂载一次):{e}") # 2) 启动 tunneld 子进程 port = self._pick_available_port() launcher, env2 = self._resolve_pmd3_argv_and_env() env2["PYTHONUNBUFFERED"] = "1" env2.setdefault("PYTHONIOENCODING", "utf-8") env2["PYMOBILEDEVICE3_UDID"] = udid cmd = self._ensure_str_list([*launcher, "remote", "tunneld", "--port", str(port)]) print("[activate] 使用命令启动隧道:", " ".join(cmd)) proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True, env=env2, **self._win_hidden_popen_kwargs() ) captured: list[str] = [] http_host: Optional[str] = None http_port: Optional[str] = None rsd_host: Optional[str] = None rsd_port: Optional[str] = None device_tunnel_ready = False wda_started = False mount_done = pre_mount_first HTTP_RE = re.compile(r"http://([0-9.]+):(\d+)") RSD_CREATED_RE = re.compile(r"Created tunnel\s+--rsd\s+([^\s]+)\s+(\d+)") RSD_FALLBACK_RE = re.compile(r"--rsd\s+(\S+?)[\s:](\d+)") try: assert proc.stdout is not None for line in proc.stdout: captured.append(line) print(f"[tunneld] {line}", end="") if proc.poll() is not None: break # 捕获 HTTP 网关端口 if http_port is None: m = HTTP_RE.search(line) if m: http_host, http_port = m.group(1), m.group(2) print(f"[tunneld] Tunnel API: {http_host}:{http_port}") # ✅ 捕获 RSD(仅识别当前 UDID 的行) if self._line_is_for_udid(line, udid): m = RSD_CREATED_RE.search(line) or RSD_FALLBACK_RE.search(line) if m and not device_tunnel_ready: rsd_host, rsd_port = m.group(1), m.group(2) device_tunnel_ready = True print(f"[tunneld] Device-level tunnel ready (RSD {rsd_host}:{rsd_port}).") else: continue # 其它设备的隧道日志忽略 # 当设备隧道准备好后启动 WDA if (not wda_started) and wda_bundle_id and device_tunnel_ready: try: if not mount_done: self._auto_mount_developer_disk( udid, retries=mount_retries, backoff_seconds=backoff_seconds ) _t.sleep(2) mount_done = True rsd_ok = False if rsd_host and rsd_port: rsd_ok = self._wait_for_rsd_ready( rsd_host, rsd_port, retries=rsd_probe_retries, delay=rsd_probe_delay_sec, ) if rsd_ok: self._launch_wda_via_rsd( bundle_id=wda_bundle_id, rsd_host=rsd_host, rsd_port=rsd_port, udid=udid, ) else: if http_host and http_port: self._launch_wda_via_http_tunnel( bundle_id=wda_bundle_id, http_host=http_host, http_port=http_port, udid=udid, ) else: raise RuntimeError("No valid tunnel endpoint for fallback.") wda_started = True except RuntimeError as exc: print(str(exc), file=sys.stderr) proc.terminate() try: proc.wait(timeout=5) except subprocess.TimeoutExpired: proc.kill() raise # 超时保护 if (not wda_started) and ready_timeout_sec > 0 and (_t.time() - start_ts > ready_timeout_sec): print(f"[tunneld] Timeout waiting for device tunnel ({ready_timeout_sec}s). Aborting.") proc.terminate() try: proc.wait(timeout=5) except subprocess.TimeoutExpired: proc.kill() break # 结束清理 try: return_code = proc.wait(timeout=5) except subprocess.TimeoutExpired: proc.kill() return_code = proc.returncode or -9 output = "".join(captured) if return_code != 0 and not wda_started: raise RuntimeError(f"tunneld exited with code {return_code}.\n{output}") print("[activate] Done.") return output except KeyboardInterrupt: print("\n[activate] Interrupted, cleaning up ...", file=sys.stderr) try: proc.terminate() proc.wait(timeout=5) except Exception: try: proc.kill() except Exception: pass raise def _line_is_for_udid(self, line: str, udid: str) -> bool: """日志行是否属于目标 UDID。""" try: return udid.lower() in (line or "").lower() except Exception: return False