# -*- coding: utf-8 -*- import os import re import sys import atexit import signal import socket import subprocess from typing import Optional, List, Tuple, Dict, Set from Entity.Variables import WdaAppBundleId import time as _t class IOSActivator: """ 轻量 iOS 激活器(仅代码调用) - 进程/网卡可控版 1) 启动 `pymobiledevice3 remote tunneld`(可常驻/可一次性) 2) 自动挂载 DDI 3) 隧道就绪后启动 WDA 4) 程序退出或 keep_tunnel=False 时,确保 tunneld 进程与虚拟网卡被清理 """ # ---------- 正则 ---------- HTTP_RE = re.compile(r"http://([0-9.]+):(\d+)") RSD_CREATED_RE = re.compile(r"Created tunnel\s+--rsd\s+([^\s]+)\s+(\d+)") RSD_FALLBACK_RE = re.compile(r"--rsd\s+(\S+?)[\s:](\d+)") IFACE_RE = re.compile(r"\b(pymobiledevice3-tunnel-[^\s/\\]+)\b", re.IGNORECASE) def __init__(self, python_executable: Optional[str] = None): self.python = python_executable or None self._live_procs: Dict[str, subprocess.Popen] = {} # udid -> tunneld proc self._live_ifaces: Dict[str, Set[str]] = {} # udid -> {iface names} self._registered = False # =============== 公共入口 =============== def activate( self, udid: str, wda_bundle_id: Optional[str] = WdaAppBundleId, ready_timeout_sec: float = 120.0, mount_retries: int = 3, backoff_seconds: float = 2.0, rsd_probe_retries: int = 5, rsd_probe_delay_sec: float = 3.0, pre_mount_first: bool = True, keep_tunnel: bool = False, # 默认 False:WDA 拉起后关闭隧道 broad_cleanup_on_exit: bool = True, # 退出时顺带清理所有 pmd3 残留网卡 ) -> str: """ 流程:挂镜像(可选) -> 开隧道 -> 等 RSD -> 启动 WDA - keep_tunnel=False:WDA 启动后关闭隧道并清理 - keep_tunnel=True:隧道常驻,由上层/atexit 清理 """ if not udid or not isinstance(udid, str): raise ValueError("udid is required and must be a non-empty string") print(f"[activate] UDID = {udid}") self._ensure_exit_hooks(broad_cleanup_on_exit=broad_cleanup_on_exit) # Windows 管理员检测 if os.name == "nt": import ctypes try: is_admin = ctypes.windll.shell32.IsUserAnAdmin() != 0 except Exception: is_admin = False if not is_admin: print("[⚠] 未以管理员运行:若需要移除虚拟网卡,可能失败。") start_ts = _t.time() # 1) 预挂载(失败不致命) if pre_mount_first: try: self._auto_mount_developer_disk( udid, retries=mount_retries, backoff_seconds=backoff_seconds ) _t.sleep(2) except Exception as e: print(f"[activate] 预挂载失败(稍后再试):{e}") # 2) 启动 tunneld http_host: Optional[str] = None http_port: Optional[str] = None # ⚠️ 端口以 str 存储 rsd_host: Optional[str] = None rsd_port: Optional[str] = None # ⚠️ 端口以 str 存储 iface_names: Set[str] = set() proc, _port_ignored = self._start_tunneld(udid) self._live_procs[udid] = proc self._live_ifaces[udid] = iface_names captured: List[str] = [] out: str = "" wda_started = False mount_done = pre_mount_first try: assert proc.stdout is not None for line in proc.stdout: captured.append(line) # 日志长度控制,防止常驻时内存涨太多 if len(captured) > 20000: captured = captured[-10000:] print(f"[tunneld] {line}", end="") # 捕获虚拟网卡名 for m in self.IFACE_RE.finditer(line): iface_names.add(m.group(1)) # 子进程若退出则停止读取 if proc.poll() is not None: break # 捕获 HTTP 网关端口(保持为字符串) if http_port is None: m = self.HTTP_RE.search(line) if m: http_host = m.group(1) http_port = m.group(2) # 简单校验 try: _ = int(http_port) except Exception: print(f"[tunneld] bad http port: {http_port}") http_host, http_port = None, None else: print(f"[tunneld] Tunnel API: {http_host}:{http_port}") # 只处理当前 UDID 的 RSD 行 if not self._line_is_for_udid(line, udid): continue m = self.RSD_CREATED_RE.search(line) or self.RSD_FALLBACK_RE.search(line) if m and rsd_host is None and rsd_port is None: rsd_host = m.group(1) rsd_port = m.group(2) try: _ = int(rsd_port) # 仅作数字校验 except Exception: print(f"[tunneld] bad rsd port: {rsd_port}") rsd_host, rsd_port = None, None else: print(f"[tunneld] Device-level tunnel ready (RSD {rsd_host}:{rsd_port}).") # ========= 尝试启动 WDA ========= if (not wda_started) and wda_bundle_id and (rsd_host is not None) and (rsd_port is not None): if not mount_done: self._auto_mount_developer_disk( udid, retries=mount_retries, backoff_seconds=backoff_seconds ) _t.sleep(2) mount_done = True # RSD 优先;探测时临时转 int;启动命令仍传 str 端口 rsd_port_int = int(rsd_port) if self._wait_for_rsd_ready( rsd_host, rsd_port_int, retries=rsd_probe_retries, delay=rsd_probe_delay_sec ): # 这里的实现通常会拼 subprocess 命令行,故端口保持 str self._launch_wda_via_rsd( bundle_id=wda_bundle_id, rsd_host=rsd_host, rsd_port=rsd_port, # ⚠️ 传入 str,避免 subprocess 报错 udid=udid, ) wda_started = True elif (http_host is not None) and (http_port is not None): self._launch_wda_via_http_tunnel( bundle_id=wda_bundle_id, http_host=http_host, http_port=http_port, # ⚠️ 传入 str udid=udid, ) wda_started = True else: raise RuntimeError("No valid tunnel endpoint for WDA.") # ✅ WDA 已启动;默认一次性模式直接退出读取循环 if wda_started and not keep_tunnel: _t.sleep(0.5) # 给隧道多刷几行 print("[activate] WDA launched; exiting reader loop (keep_tunnel=False).") break # 超时保护(仅在 WDA 尚未启动时生效) if (not wda_started) and ready_timeout_sec > 0 and (_t.time() - start_ts > ready_timeout_sec): print(f"[tunneld] Timeout waiting for device tunnel ({ready_timeout_sec}s). Aborting.") break print("[activate] 启动 WDA 读取阶段结束") out = "".join(captured) except Exception as e: print(f"[activate] 发生异常:{e}") raise finally: if not keep_tunnel: try: self.stop_tunnel(udid, broad_cleanup=broad_cleanup_on_exit) except Exception as ce: print(f"[activate] stop_tunnel 清理异常:{ce}") print("[activate] Done.") return out # =============== 外部可显式调用的清理 =============== def stop_tunnel(self, udid: str, broad_cleanup: bool = True): """关闭某 UDID 的 tunneld,并清理已知/残留的 pmd3 虚拟网卡。""" proc = self._live_procs.pop(udid, None) ifaces = self._live_ifaces.pop(udid, set()) # 1) 结束进程 if proc: try: proc.terminate() proc.wait(timeout=5) except subprocess.TimeoutExpired: try: proc.kill() proc.wait(timeout=3) except Exception: pass # 兜底:杀掉本进程树内可能残留的 tunneld self._kill_stray_tunneld_children() # 2) 清理网卡 try: if os.name == "nt": # 先按已知名精确删除 for name in sorted(ifaces): self._win_remove_adapter(name) # 宽匹配兜底 if broad_cleanup: self._win_remove_all_pmd3_adapters() else: # *nix 基本不需要手动删,若有需要可在此处添加 ip link delete 等 pass except Exception as e: print(f"[cleanup] adapter cleanup error: {e}") # =============== 内部:启动 tunneld =============== def _start_tunneld(self, udid: str) -> Tuple[subprocess.Popen, int]: port = self._pick_available_port() launcher, env2 = self._resolve_pmd3_argv_and_env() env2["PYTHONUNBUFFERED"] = "1" env2.setdefault("PYTHONIOENCODING", "utf-8") env2["PYMOBILEDEVICE3_UDID"] = udid cmd = self._ensure_str_list([*launcher, "remote", "tunneld", "--port", str(port)]) print("[activate] 启动隧道:", " ".join(cmd)) proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True, env=env2, **self._win_hidden_popen_kwargs() ) return proc, port # =============== 退出/信号回收 =============== def _ensure_exit_hooks(self, broad_cleanup_on_exit: bool): if self._registered: return self._registered = True def _on_exit(): # 逐个关闭存活隧道 for udid in list(self._live_procs.keys()): try: self.stop_tunnel(udid, broad_cleanup=broad_cleanup_on_exit) except Exception: pass atexit.register(_on_exit) def _signal_handler(signum, frame): _on_exit() # 默认行为:再次发出信号退出 try: signal.signal(signum, signal.SIG_DFL) except Exception: pass os.kill(os.getpid(), signum) for sig in (signal.SIGINT, signal.SIGTERM): try: signal.signal(sig, _signal_handler) except Exception: pass # 某些环境不允许设置 # =============== Windows 虚拟网卡清理 =============== def _win_remove_adapter(self, name: str): """按名删除一个虚拟网卡。""" print(f"[cleanup] remove adapter: {name}") # 先尝试 PowerShell(需要管理员) ps = [ "powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command", f"$a=Get-NetAdapter -Name '{name}' -ErrorAction SilentlyContinue; " f"if($a){{ Disable-NetAdapter -Name '{name}' -Confirm:$false -PassThru | Out-Null; " f"Remove-NetAdapter -Name '{name}' -Confirm:$false -ErrorAction SilentlyContinue; }}" ] try: subprocess.run(ps, check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **self._win_hidden_popen_kwargs()) return except Exception: pass # 兜底:netsh 禁用 try: subprocess.run( ["netsh", "interface", "set", "interface", name, "disable"], check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **self._win_hidden_popen_kwargs() ) except Exception: pass def _win_remove_all_pmd3_adapters(self): """宽匹配删除所有 pymobiledevice3-tunnel-* 网卡(防残留)。""" print("[cleanup] sweeping all pymobiledevice3-tunnel-* adapters") ps = [ "powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command", r"$nics=Get-NetAdapter -Name 'pymobiledevice3-tunnel-*' -ErrorAction SilentlyContinue; " r"foreach($n in $nics){ Disable-NetAdapter -Name $n.Name -Confirm:$false -PassThru | Out-Null; " r"Remove-NetAdapter -Name $n.Name -Confirm:$false -ErrorAction SilentlyContinue; }" ] try: subprocess.run(ps, check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **self._win_hidden_popen_kwargs()) except Exception: pass def _kill_stray_tunneld_children(self): """在当前进程空间内尽量清理残留的 tunneld 子进程。""" import psutil try: me = psutil.Process(os.getpid()) for ch in me.children(recursive=True): try: cmd = " ".join(ch.cmdline()).lower() except Exception: cmd = "" if "pymobiledevice3" in cmd and "remote" in cmd and "tunneld" in cmd: try: ch.terminate() ch.wait(2) except Exception: try: ch.kill() except Exception: pass except Exception: pass # =============== 其它工具 & 你原有的方法(未改动核心逻辑) =============== def _pmd3_run(self, args: List[str], udid: str, extra_env: Optional[dict] = None) -> str: launcher, env = self._resolve_pmd3_argv_and_env() env["PYMOBILEDEVICE3_UDID"] = udid env["PYTHONUNBUFFERED"] = "1" env.setdefault("PYTHONIOENCODING", "utf-8") if extra_env: for k, v in extra_env.items(): if v is None: env.pop(k, None) else: env[k] = str(v) cmd = [*launcher, *args] print("[pmd3]", " ".join(map(str, cmd))) try: return subprocess.check_output( cmd, text=True, stderr=subprocess.STDOUT, env=env, **self._win_hidden_popen_kwargs() ) or "" except subprocess.CalledProcessError as exc: raise RuntimeError(exc.output or f"pymobiledevice3 执行失败,退出码 {exc.returncode}") def _ensure_str_list(self, seq): return [str(x) for x in seq] def _win_hidden_popen_kwargs(self): if os.name != "nt": return {} si = subprocess.STARTUPINFO() si.dwFlags |= subprocess.STARTF_USESHOWWINDOW si.wShowWindow = 0 # SW_HIDE return { "startupinfo": si, "creationflags": getattr(subprocess, "CREATE_NO_WINDOW", 0x08000000), } def _resolve_pmd3_argv_and_env(self): import shutil, subprocess from pathlib import Path env = os.environ.copy() env["PYTHONUNBUFFERED"] = "1" env.setdefault("PYTHONIOENCODING", "utf-8") prefer_py = env.get("IOSAI_PYTHON") base = Path(sys.argv[0]).resolve() base_dir = base.parent if base.is_file() else base py_name = "python.exe" if os.name == "nt" else "python" sidecar_candidates = [ base_dir / "python-rt" / py_name, base_dir / "python-rt" / "Scripts" / py_name, base_dir.parent / "python-rt" / py_name, base_dir.parent / "python-rt" / "Scripts" / py_name, ] if prefer_py: sidecar_candidates.insert(0, Path(prefer_py)) for cand in sidecar_candidates: print(f"[IOSAI] 🔎 probing sidecar at: {cand}") if cand.is_file(): try: out = subprocess.check_output( [str(cand), "-c", "import pymobiledevice3;print('ok')"], text=True, stderr=subprocess.STDOUT, env=env, timeout=6, **self._win_hidden_popen_kwargs() ) if "ok" in out: print(f"[IOSAI] ✅ sidecar selected: {cand}") return ([str(cand), "-u", "-m", "pymobiledevice3"], env) except Exception: pass exe = shutil.which("pymobiledevice3") if exe: print(f"[IOSAI] ✅ use PATH executable: {exe}") return ([exe], env) py_candidates = [] base_exec = getattr(sys, "_base_executable", None) if base_exec and os.path.isfile(base_exec): py_candidates.append(base_exec) for name in ("python3.exe", "python.exe", "py.exe", "python3", "python"): p = shutil.which(name) if p and p not in py_candidates: py_candidates.append(p) for py in py_candidates: print(f"[IOSAI] 🔎 probing system python: {py}") try: out = subprocess.check_output( [py, "-c", "import pymobiledevice3;print('ok')"], text=True, stderr=subprocess.STDOUT, env=env, timeout=6, **self._win_hidden_popen_kwargs() ) if "ok" in out: print(f"[IOSAI] ✅ system python selected: {py}") return ([py, "-u", "-m", "pymobiledevice3"], env) except Exception: continue raise RuntimeError("未检测到可用的 pymobiledevice3(建议携带 python-rt 或安装系统 Python+pmd3)。") # -------- DDI / RSD / 启动 WDA (与你原逻辑一致) -------- def _auto_mount_developer_disk(self, udid: str, retries: int = 3, backoff_seconds: float = 2.0) -> None: import time as _t last_err = "" for i in range(max(1, retries)): try: out = self._pmd3_run(["mounter", "auto-mount"], udid) if out: for line in out.splitlines(): print(f"[mounter] {line}") if "already mounted" in (out or "").lower(): print("[mounter] Developer disk image already mounted.") else: print("[mounter] Developer disk image mounted.") return except Exception as e: last_err = str(e) if i < retries - 1: print(f"[mounter] attempt {i+1}/{retries} failed, retrying in {backoff_seconds}s ...") _t.sleep(backoff_seconds) else: raise RuntimeError(f"Auto-mount failed after {retries} attempts.\n{last_err}") def _is_ipv4_host(self, host: str) -> bool: return bool(re.match(r"^\d{1,3}(\.\d{1,3}){3}$", host)) def _wait_for_rsd_ready(self, rsd_host: str, rsd_port: str, retries: int = 5, delay: float = 3.0) -> bool: port_int = int(rsd_port) for i in range(1, retries + 1): print(f"[rsd] Probing RSD {rsd_host}:{rsd_port} (attempt {i}/{retries}) ...") try: with socket.create_connection((rsd_host, port_int), timeout=2): print("[rsd] ✅ RSD is reachable and ready.") return True except (socket.timeout, ConnectionRefusedError, OSError) as e: print(f"[rsd] Not ready yet ({e}). Retrying...") import time as _t _t.sleep(delay) print("[rsd] ❌ RSD did not become ready after retries.") return False def _launch_wda_via_rsd(self, bundle_id: str, rsd_host: str, rsd_port: str, udid: str) -> None: print(f"[wda] Launch via RSD {rsd_host}:{rsd_port}, bundle: {bundle_id}") out = self._pmd3_run(["developer", "dvt", "launch", bundle_id, "--rsd", rsd_host, rsd_port], udid=udid) if out: for line in out.splitlines(): print(f"[wda] {line}") print("[wda] Launch via RSD completed.") def _launch_wda_via_http_tunnel(self, bundle_id: str, http_host: str, http_port: str, udid: str) -> None: if not self._is_ipv4_host(http_host): raise RuntimeError(f"HTTP tunnel host must be IPv4, got {http_host}") tunnel_endpoint = f"{http_host}:{http_port}" print(f"[wda] Launch via HTTP tunnel {tunnel_endpoint}, bundle: {bundle_id}") out = self._pmd3_run(["developer", "dvt", "launch", bundle_id], udid=udid, extra_env={"PYMOBILEDEVICE3_TUNNEL": tunnel_endpoint}) if out: for line in out.splitlines(): print(f"[wda] {line}") print("[wda] Launch via HTTP tunnel completed.") # -------- 端口挑选 -------- def _pick_available_port(self, base=49151, step=10) -> int: for i in range(0, 1000, step): port = base + i with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: if s.connect_ex(("127.0.0.1", port)) != 0: return port raise RuntimeError("No free port found for tunneld") # -------- UDID 过滤 -------- def _line_is_for_udid(self, line: str, udid: str) -> bool: try: return udid.lower() in (line or "").lower() except Exception: return False