Files
iOSAI/Module/IOSActivator.py
2025-10-24 22:04:28 +08:00

482 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import os
import re
import sys
import atexit
import signal
import socket
import subprocess
from typing import Optional, List, Tuple, Dict, Set
from Entity.Variables import WdaAppBundleId
class IOSActivator:
"""
轻量 iOS 激活器(仅代码调用) - 进程/网卡可控版
1) 启动 `pymobiledevice3 remote tunneld`(可常驻/可一次性)
2) 自动挂载 DDI
3) 隧道就绪后启动 WDA
4) 程序退出或 keep_tunnel=False 时,确保 tunneld 进程与虚拟网卡被清理
"""
# ---------- 正则 ----------
HTTP_RE = re.compile(r"http://([0-9.]+):(\d+)")
RSD_CREATED_RE = re.compile(r"Created tunnel\s+--rsd\s+([^\s]+)\s+(\d+)")
RSD_FALLBACK_RE = re.compile(r"--rsd\s+(\S+?)[\s:](\d+)")
IFACE_RE = re.compile(r"\b(pymobiledevice3-tunnel-[^\s/\\]+)\b", re.IGNORECASE)
def __init__(self, python_executable: Optional[str] = None):
self.python = python_executable or None
self._live_procs: Dict[str, subprocess.Popen] = {} # udid -> tunneld proc
self._live_ifaces: Dict[str, Set[str]] = {} # udid -> {iface names}
self._registered = False
# =============== 公共入口 ===============
def activate(
self,
udid: str,
wda_bundle_id: Optional[str] = WdaAppBundleId,
ready_timeout_sec: float = 120.0,
mount_retries: int = 3,
backoff_seconds: float = 2.0,
rsd_probe_retries: int = 5,
rsd_probe_delay_sec: float = 3.0,
pre_mount_first: bool = True,
keep_tunnel: bool = False, # << 新增:默认 False一次性用完就关
broad_cleanup_on_exit: bool = True, # << 退出时顺带清理所有 pmd3 残留网卡
) -> str:
"""
执行:挂镜像(可选) -> 开隧道 -> (等待 RSD 就绪)-> 启动 WDA
- 默认 keep_tunnel=FalseWDA 启动后关闭隧道(避免虚拟网卡常驻)
- keep_tunnel=True让隧道常驻交由 atexit/signal 或上层调用 stop_tunnel() 清理
"""
if not udid or not isinstance(udid, str):
raise ValueError("udid is required and must be a non-empty string")
print(f"[activate] UDID = {udid}")
self._ensure_exit_hooks(broad_cleanup_on_exit=broad_cleanup_on_exit)
# 管理员检测Windows 清理网卡需要)
if os.name == "nt":
import ctypes
try:
is_admin = ctypes.windll.shell32.IsUserAnAdmin() != 0
except Exception:
is_admin = False
if not is_admin:
print("[⚠] 未以管理员运行:若需要移除虚拟网卡,可能失败。")
import time as _t
start_ts = _t.time()
# 1) 预挂载
if pre_mount_first:
try:
self._auto_mount_developer_disk(udid, retries=mount_retries, backoff_seconds=backoff_seconds)
_t.sleep(2)
except Exception as e:
print(f"[activate] 预挂载失败(继续尝试开隧道后再挂载一次):{e}")
# 2) 启动 tunneld
http_host = http_port = rsd_host = rsd_port = None
iface_names: Set[str] = set()
proc, port = self._start_tunneld(udid)
self._live_procs[udid] = proc
self._live_ifaces[udid] = iface_names
captured: List[str] = []
wda_started = False
mount_done = pre_mount_first
try:
assert proc.stdout is not None
for line in proc.stdout:
captured.append(line)
print(f"[tunneld] {line}", end="")
# 捕获虚拟网卡名
for m in self.IFACE_RE.finditer(line):
iface_names.add(m.group(1))
if proc.poll() is not None:
break
# 捕获 HTTP 网关端口
if http_port is None:
m = self.HTTP_RE.search(line)
if m:
http_host, http_port = m.group(1), m.group(2)
print(f"[tunneld] Tunnel API: {http_host}:{http_port}")
# 捕获 RSD仅识别当前 UDID 的行)
if not self._line_is_for_udid(line, udid):
continue
m = self.RSD_CREATED_RE.search(line) or self.RSD_FALLBACK_RE.search(line)
if m and not rsd_host and not rsd_port:
rsd_host, rsd_port = m.group(1), m.group(2)
print(f"[tunneld] Device-level tunnel ready (RSD {rsd_host}:{rsd_port}).")
# 启动 WDA
if (not wda_started) and wda_bundle_id and (rsd_host and rsd_port):
if not mount_done:
self._auto_mount_developer_disk(udid, retries=mount_retries, backoff_seconds=backoff_seconds)
_t.sleep(2)
mount_done = True
if self._wait_for_rsd_ready(rsd_host, rsd_port, retries=rsd_probe_retries, delay=rsd_probe_delay_sec):
self._launch_wda_via_rsd(bundle_id=wda_bundle_id, rsd_host=rsd_host, rsd_port=rsd_port, udid=udid)
wda_started = True
elif http_host and http_port:
self._launch_wda_via_http_tunnel(bundle_id=wda_bundle_id, http_host=http_host, http_port=http_port, udid=udid)
wda_started = True
else:
raise RuntimeError("No valid tunnel endpoint for WDA.")
# 超时保护
if (not wda_started) and ready_timeout_sec > 0 and (_t.time() - start_ts > ready_timeout_sec):
print(f"[tunneld] Timeout waiting for device tunnel ({ready_timeout_sec}s). Aborting.")
break
# 结束/收尾
out = "".join(captured)
finally:
if not keep_tunnel:
# 一次性模式WDA 已启动后就关闭隧道并清理网卡
self.stop_tunnel(udid, broad_cleanup=broad_cleanup_on_exit)
print("[activate] Done.")
return out
# =============== 外部可显式调用的清理 ===============
def stop_tunnel(self, udid: str, broad_cleanup: bool = True):
"""关闭某 UDID 的 tunneld并清理已知/残留的 pmd3 虚拟网卡。"""
proc = self._live_procs.pop(udid, None)
ifaces = self._live_ifaces.pop(udid, set())
# 1) 结束进程
if proc:
try:
proc.terminate()
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
try:
proc.kill()
proc.wait(timeout=3)
except Exception:
pass
# 兜底:杀掉本进程树内可能残留的 tunneld
self._kill_stray_tunneld_children()
# 2) 清理网卡
try:
if os.name == "nt":
# 先按已知名精确删除
for name in sorted(ifaces):
self._win_remove_adapter(name)
# 宽匹配兜底
if broad_cleanup:
self._win_remove_all_pmd3_adapters()
else:
# *nix 基本不需要手动删,若有需要可在此处添加 ip link delete 等
pass
except Exception as e:
print(f"[cleanup] adapter cleanup error: {e}")
# =============== 内部:启动 tunneld ===============
def _start_tunneld(self, udid: str) -> Tuple[subprocess.Popen, int]:
port = self._pick_available_port()
launcher, env2 = self._resolve_pmd3_argv_and_env()
env2["PYTHONUNBUFFERED"] = "1"
env2.setdefault("PYTHONIOENCODING", "utf-8")
env2["PYMOBILEDEVICE3_UDID"] = udid
cmd = self._ensure_str_list([*launcher, "remote", "tunneld", "--port", str(port)])
print("[activate] 启动隧道:", " ".join(cmd))
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
universal_newlines=True,
env=env2,
**self._win_hidden_popen_kwargs()
)
return proc, port
# =============== 退出/信号回收 ===============
def _ensure_exit_hooks(self, broad_cleanup_on_exit: bool):
if self._registered:
return
self._registered = True
def _on_exit():
# 逐个关闭存活隧道
for udid in list(self._live_procs.keys()):
try:
self.stop_tunnel(udid, broad_cleanup=broad_cleanup_on_exit)
except Exception:
pass
atexit.register(_on_exit)
def _signal_handler(signum, frame):
_on_exit()
# 默认行为:再次发出信号退出
try:
signal.signal(signum, signal.SIG_DFL)
except Exception:
pass
os.kill(os.getpid(), signum)
for sig in (signal.SIGINT, signal.SIGTERM):
try:
signal.signal(sig, _signal_handler)
except Exception:
pass # 某些环境不允许设置
# =============== Windows 虚拟网卡清理 ===============
def _win_remove_adapter(self, name: str):
"""按名删除一个虚拟网卡。"""
print(f"[cleanup] remove adapter: {name}")
# 先尝试 PowerShell需要管理员
ps = [
"powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command",
f"$a=Get-NetAdapter -Name '{name}' -ErrorAction SilentlyContinue; "
f"if($a){{ Disable-NetAdapter -Name '{name}' -Confirm:$false -PassThru | Out-Null; "
f"Remove-NetAdapter -Name '{name}' -Confirm:$false -ErrorAction SilentlyContinue; }}"
]
try:
subprocess.run(ps, check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **self._win_hidden_popen_kwargs())
return
except Exception:
pass
# 兜底netsh 禁用
try:
subprocess.run(
["netsh", "interface", "set", "interface", name, "disable"],
check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **self._win_hidden_popen_kwargs()
)
except Exception:
pass
def _win_remove_all_pmd3_adapters(self):
"""宽匹配删除所有 pymobiledevice3-tunnel-* 网卡(防残留)。"""
print("[cleanup] sweeping all pymobiledevice3-tunnel-* adapters")
ps = [
"powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command",
r"$nics=Get-NetAdapter -Name 'pymobiledevice3-tunnel-*' -ErrorAction SilentlyContinue; "
r"foreach($n in $nics){ Disable-NetAdapter -Name $n.Name -Confirm:$false -PassThru | Out-Null; "
r"Remove-NetAdapter -Name $n.Name -Confirm:$false -ErrorAction SilentlyContinue; }"
]
try:
subprocess.run(ps, check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **self._win_hidden_popen_kwargs())
except Exception:
pass
def _kill_stray_tunneld_children(self):
"""在当前进程空间内尽量清理残留的 tunneld 子进程。"""
import psutil
try:
me = psutil.Process(os.getpid())
for ch in me.children(recursive=True):
try:
cmd = " ".join(ch.cmdline()).lower()
except Exception:
cmd = ""
if "pymobiledevice3" in cmd and "remote" in cmd and "tunneld" in cmd:
try:
ch.terminate()
ch.wait(2)
except Exception:
try:
ch.kill()
except Exception:
pass
except Exception:
pass
# =============== 其它工具 & 你原有的方法(未改动核心逻辑) ===============
def _pmd3_run(self, args: List[str], udid: str, extra_env: Optional[dict] = None) -> str:
launcher, env = self._resolve_pmd3_argv_and_env()
env["PYMOBILEDEVICE3_UDID"] = udid
env["PYTHONUNBUFFERED"] = "1"
env.setdefault("PYTHONIOENCODING", "utf-8")
if extra_env:
for k, v in extra_env.items():
if v is None:
env.pop(k, None)
else:
env[k] = str(v)
cmd = [*launcher, *args]
print("[pmd3]", " ".join(map(str, cmd)))
try:
return subprocess.check_output(
cmd,
text=True,
stderr=subprocess.STDOUT,
env=env,
**self._win_hidden_popen_kwargs()
) or ""
except subprocess.CalledProcessError as exc:
raise RuntimeError(exc.output or f"pymobiledevice3 执行失败,退出码 {exc.returncode}")
def _ensure_str_list(self, seq):
return [str(x) for x in seq]
def _win_hidden_popen_kwargs(self):
if os.name != "nt":
return {}
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
si.wShowWindow = 0 # SW_HIDE
return {
"startupinfo": si,
"creationflags": getattr(subprocess, "CREATE_NO_WINDOW", 0x08000000),
}
def _resolve_pmd3_argv_and_env(self):
import shutil, subprocess
from pathlib import Path
env = os.environ.copy()
env["PYTHONUNBUFFERED"] = "1"
env.setdefault("PYTHONIOENCODING", "utf-8")
prefer_py = env.get("IOSAI_PYTHON")
base = Path(sys.argv[0]).resolve()
base_dir = base.parent if base.is_file() else base
py_name = "python.exe" if os.name == "nt" else "python"
sidecar_candidates = [
base_dir / "python-rt" / py_name,
base_dir / "python-rt" / "Scripts" / py_name,
base_dir.parent / "python-rt" / py_name,
base_dir.parent / "python-rt" / "Scripts" / py_name,
]
if prefer_py:
sidecar_candidates.insert(0, Path(prefer_py))
for cand in sidecar_candidates:
print(f"[IOSAI] 🔎 probing sidecar at: {cand}")
if cand.is_file():
try:
out = subprocess.check_output(
[str(cand), "-c", "import pymobiledevice3;print('ok')"],
text=True, stderr=subprocess.STDOUT, env=env, timeout=6, **self._win_hidden_popen_kwargs()
)
if "ok" in out:
print(f"[IOSAI] ✅ sidecar selected: {cand}")
return ([str(cand), "-u", "-m", "pymobiledevice3"], env)
except Exception:
pass
exe = shutil.which("pymobiledevice3")
if exe:
print(f"[IOSAI] ✅ use PATH executable: {exe}")
return ([exe], env)
py_candidates = []
base_exec = getattr(sys, "_base_executable", None)
if base_exec and os.path.isfile(base_exec):
py_candidates.append(base_exec)
for name in ("python3.exe", "python.exe", "py.exe", "python3", "python"):
p = shutil.which(name)
if p and p not in py_candidates:
py_candidates.append(p)
for py in py_candidates:
print(f"[IOSAI] 🔎 probing system python: {py}")
try:
out = subprocess.check_output(
[py, "-c", "import pymobiledevice3;print('ok')"],
text=True, stderr=subprocess.STDOUT, env=env, timeout=6, **self._win_hidden_popen_kwargs()
)
if "ok" in out:
print(f"[IOSAI] ✅ system python selected: {py}")
return ([py, "-u", "-m", "pymobiledevice3"], env)
except Exception:
continue
raise RuntimeError("未检测到可用的 pymobiledevice3建议携带 python-rt 或安装系统 Python+pmd3")
# -------- DDI / RSD / 启动 WDA (与你原逻辑一致) --------
def _auto_mount_developer_disk(self, udid: str, retries: int = 3, backoff_seconds: float = 2.0) -> None:
import time as _t
last_err = ""
for i in range(max(1, retries)):
try:
out = self._pmd3_run(["mounter", "auto-mount"], udid)
if out:
for line in out.splitlines():
print(f"[mounter] {line}")
if "already mounted" in (out or "").lower():
print("[mounter] Developer disk image already mounted.")
else:
print("[mounter] Developer disk image mounted.")
return
except Exception as e:
last_err = str(e)
if i < retries - 1:
print(f"[mounter] attempt {i+1}/{retries} failed, retrying in {backoff_seconds}s ...")
_t.sleep(backoff_seconds)
else:
raise RuntimeError(f"Auto-mount failed after {retries} attempts.\n{last_err}")
def _is_ipv4_host(self, host: str) -> bool:
return bool(re.match(r"^\d{1,3}(\.\d{1,3}){3}$", host))
def _wait_for_rsd_ready(self, rsd_host: str, rsd_port: str, retries: int = 5, delay: float = 3.0) -> bool:
port_int = int(rsd_port)
for i in range(1, retries + 1):
print(f"[rsd] Probing RSD {rsd_host}:{rsd_port} (attempt {i}/{retries}) ...")
try:
with socket.create_connection((rsd_host, port_int), timeout=2):
print("[rsd] ✅ RSD is reachable and ready.")
return True
except (socket.timeout, ConnectionRefusedError, OSError) as e:
print(f"[rsd] Not ready yet ({e}). Retrying...")
import time as _t
_t.sleep(delay)
print("[rsd] ❌ RSD did not become ready after retries.")
return False
def _launch_wda_via_rsd(self, bundle_id: str, rsd_host: str, rsd_port: str, udid: str) -> None:
print(f"[wda] Launch via RSD {rsd_host}:{rsd_port}, bundle: {bundle_id}")
out = self._pmd3_run(["developer", "dvt", "launch", bundle_id, "--rsd", rsd_host, rsd_port], udid=udid)
if out:
for line in out.splitlines():
print(f"[wda] {line}")
print("[wda] Launch via RSD completed.")
def _launch_wda_via_http_tunnel(self, bundle_id: str, http_host: str, http_port: str, udid: str) -> None:
if not self._is_ipv4_host(http_host):
raise RuntimeError(f"HTTP tunnel host must be IPv4, got {http_host}")
tunnel_endpoint = f"{http_host}:{http_port}"
print(f"[wda] Launch via HTTP tunnel {tunnel_endpoint}, bundle: {bundle_id}")
out = self._pmd3_run(["developer", "dvt", "launch", bundle_id], udid=udid,
extra_env={"PYMOBILEDEVICE3_TUNNEL": tunnel_endpoint})
if out:
for line in out.splitlines():
print(f"[wda] {line}")
print("[wda] Launch via HTTP tunnel completed.")
# -------- 端口挑选 --------
def _pick_available_port(self, base=49151, step=10) -> int:
for i in range(0, 1000, step):
port = base + i
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
if s.connect_ex(("127.0.0.1", port)) != 0:
return port
raise RuntimeError("No free port found for tunneld")
# -------- UDID 过滤 --------
def _line_is_for_udid(self, line: str, udid: str) -> bool:
try:
return udid.lower() in (line or "").lower()
except Exception:
return False