Files
iOSAI/Module/IOSActivator.py
2025-10-28 15:09:36 +08:00

540 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import os
import re
import sys
import atexit
import signal
import socket
import subprocess
from typing import Optional, List, Tuple, Dict, Set
from Entity.Variables import WdaAppBundleId
import time as _t
class IOSActivator:
"""
轻量 iOS 激活器(仅代码调用) - 进程/网卡可控版
1) 启动 `pymobiledevice3 remote tunneld`(可常驻/可一次性)
2) 自动挂载 DDI
3) 隧道就绪后启动 WDA
4) 程序退出或 keep_tunnel=False 时,确保 tunneld 进程与虚拟网卡被清理
"""
# ---------- 正则 ----------
HTTP_RE = re.compile(r"http://([0-9.]+):(\d+)")
RSD_CREATED_RE = re.compile(r"Created tunnel\s+--rsd\s+([^\s]+)\s+(\d+)")
RSD_FALLBACK_RE = re.compile(r"--rsd\s+(\S+?)[\s:](\d+)")
IFACE_RE = re.compile(r"\b(pymobiledevice3-tunnel-[^\s/\\]+)\b", re.IGNORECASE)
def __init__(self, python_executable: Optional[str] = None):
self.python = python_executable or None
self._live_procs: Dict[str, subprocess.Popen] = {} # udid -> tunneld proc
self._live_ifaces: Dict[str, Set[str]] = {} # udid -> {iface names}
self._registered = False
# =============== 公共入口 ===============
def activate(
self,
udid: str,
wda_bundle_id: Optional[str] = WdaAppBundleId,
ready_timeout_sec: float = 120.0,
mount_retries: int = 3,
backoff_seconds: float = 2.0,
rsd_probe_retries: int = 5,
rsd_probe_delay_sec: float = 3.0,
pre_mount_first: bool = True,
keep_tunnel: bool = False, # 默认 FalseWDA 拉起后关闭隧道
broad_cleanup_on_exit: bool = True, # 退出时顺带清理所有 pmd3 残留网卡
) -> str:
"""
流程:挂镜像(可选) -> 开隧道 -> 等 RSD -> 启动 WDA
- keep_tunnel=FalseWDA 启动后关闭隧道并清理
- keep_tunnel=True隧道常驻由上层/atexit 清理
"""
if not udid or not isinstance(udid, str):
raise ValueError("udid is required and must be a non-empty string")
print(f"[activate] UDID = {udid}")
self._ensure_exit_hooks(broad_cleanup_on_exit=broad_cleanup_on_exit)
# Windows 管理员检测
if os.name == "nt":
import ctypes
try:
is_admin = ctypes.windll.shell32.IsUserAnAdmin() != 0
except Exception:
is_admin = False
if not is_admin:
print("[⚠] 未以管理员运行:若需要移除虚拟网卡,可能失败。")
start_ts = _t.time()
# 1) 预挂载(失败不致命)
if pre_mount_first:
try:
self._auto_mount_developer_disk(
udid, retries=mount_retries, backoff_seconds=backoff_seconds
)
_t.sleep(2)
except Exception as e:
print(f"[activate] 预挂载失败(稍后再试):{e}")
# 2) 启动 tunneld
http_host: Optional[str] = None
http_port: Optional[str] = None # ⚠️ 端口以 str 存储
rsd_host: Optional[str] = None
rsd_port: Optional[str] = None # ⚠️ 端口以 str 存储
iface_names: Set[str] = set()
proc, _port_ignored = self._start_tunneld(udid)
self._live_procs[udid] = proc
self._live_ifaces[udid] = iface_names
captured: List[str] = []
out: str = ""
wda_started = False
mount_done = pre_mount_first
try:
assert proc.stdout is not None
for line in proc.stdout:
captured.append(line)
# 日志长度控制,防止常驻时内存涨太多
if len(captured) > 20000:
captured = captured[-10000:]
print(f"[tunneld] {line}", end="")
# 捕获虚拟网卡名
for m in self.IFACE_RE.finditer(line):
iface_names.add(m.group(1))
# 子进程若退出则停止读取
if proc.poll() is not None:
break
# 捕获 HTTP 网关端口(保持为字符串)
if http_port is None:
m = self.HTTP_RE.search(line)
if m:
http_host = m.group(1)
http_port = m.group(2)
# 简单校验
try:
_ = int(http_port)
except Exception:
print(f"[tunneld] bad http port: {http_port}")
http_host, http_port = None, None
else:
print(f"[tunneld] Tunnel API: {http_host}:{http_port}")
# 只处理当前 UDID 的 RSD 行
if not self._line_is_for_udid(line, udid):
continue
m = self.RSD_CREATED_RE.search(line) or self.RSD_FALLBACK_RE.search(line)
if m and rsd_host is None and rsd_port is None:
rsd_host = m.group(1)
rsd_port = m.group(2)
try:
_ = int(rsd_port) # 仅作数字校验
except Exception:
print(f"[tunneld] bad rsd port: {rsd_port}")
rsd_host, rsd_port = None, None
else:
print(f"[tunneld] Device-level tunnel ready (RSD {rsd_host}:{rsd_port}).")
# ========= 尝试启动 WDA =========
if (not wda_started) and wda_bundle_id and (rsd_host is not None) and (rsd_port is not None):
if not mount_done:
self._auto_mount_developer_disk(
udid, retries=mount_retries, backoff_seconds=backoff_seconds
)
_t.sleep(2)
mount_done = True
# RSD 优先;探测时临时转 int启动命令仍传 str 端口
rsd_port_int = int(rsd_port)
if self._wait_for_rsd_ready(
rsd_host, rsd_port_int, retries=rsd_probe_retries, delay=rsd_probe_delay_sec
):
# 这里的实现通常会拼 subprocess 命令行,故端口保持 str
self._launch_wda_via_rsd(
bundle_id=wda_bundle_id,
rsd_host=rsd_host,
rsd_port=rsd_port, # ⚠️ 传入 str避免 subprocess 报错
udid=udid,
)
wda_started = True
elif (http_host is not None) and (http_port is not None):
self._launch_wda_via_http_tunnel(
bundle_id=wda_bundle_id,
http_host=http_host,
http_port=http_port, # ⚠️ 传入 str
udid=udid,
)
wda_started = True
else:
raise RuntimeError("No valid tunnel endpoint for WDA.")
# ✅ WDA 已启动;默认一次性模式直接退出读取循环
if wda_started and not keep_tunnel:
_t.sleep(0.5) # 给隧道多刷几行
print("[activate] WDA launched; exiting reader loop (keep_tunnel=False).")
break
# 超时保护(仅在 WDA 尚未启动时生效)
if (not wda_started) and ready_timeout_sec > 0 and (_t.time() - start_ts > ready_timeout_sec):
print(f"[tunneld] Timeout waiting for device tunnel ({ready_timeout_sec}s). Aborting.")
break
print("[activate] 启动 WDA 读取阶段结束")
out = "".join(captured)
except Exception as e:
print(f"[activate] 发生异常:{e}")
raise
finally:
if not keep_tunnel:
try:
self.stop_tunnel(udid, broad_cleanup=broad_cleanup_on_exit)
except Exception as ce:
print(f"[activate] stop_tunnel 清理异常:{ce}")
print("[activate] Done.")
return out
# =============== 外部可显式调用的清理 ===============
def stop_tunnel(self, udid: str, broad_cleanup: bool = True):
"""关闭某 UDID 的 tunneld并清理已知/残留的 pmd3 虚拟网卡。"""
proc = self._live_procs.pop(udid, None)
ifaces = self._live_ifaces.pop(udid, set())
# 1) 结束进程
if proc:
try:
proc.terminate()
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
try:
proc.kill()
proc.wait(timeout=3)
except Exception:
pass
# 兜底:杀掉本进程树内可能残留的 tunneld
self._kill_stray_tunneld_children()
# 2) 清理网卡
try:
if os.name == "nt":
# 先按已知名精确删除
for name in sorted(ifaces):
self._win_remove_adapter(name)
# 宽匹配兜底
if broad_cleanup:
self._win_remove_all_pmd3_adapters()
else:
# *nix 基本不需要手动删,若有需要可在此处添加 ip link delete 等
pass
except Exception as e:
print(f"[cleanup] adapter cleanup error: {e}")
# =============== 内部:启动 tunneld ===============
def _start_tunneld(self, udid: str) -> Tuple[subprocess.Popen, int]:
port = self._pick_available_port()
launcher, env2 = self._resolve_pmd3_argv_and_env()
env2["PYTHONUNBUFFERED"] = "1"
env2.setdefault("PYTHONIOENCODING", "utf-8")
env2["PYMOBILEDEVICE3_UDID"] = udid
cmd = self._ensure_str_list([*launcher, "remote", "tunneld", "--port", str(port)])
print("[activate] 启动隧道:", " ".join(cmd))
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
universal_newlines=True,
env=env2,
**self._win_hidden_popen_kwargs()
)
return proc, port
# =============== 退出/信号回收 ===============
def _ensure_exit_hooks(self, broad_cleanup_on_exit: bool):
if self._registered:
return
self._registered = True
def _on_exit():
# 逐个关闭存活隧道
for udid in list(self._live_procs.keys()):
try:
self.stop_tunnel(udid, broad_cleanup=broad_cleanup_on_exit)
except Exception:
pass
atexit.register(_on_exit)
def _signal_handler(signum, frame):
_on_exit()
# 默认行为:再次发出信号退出
try:
signal.signal(signum, signal.SIG_DFL)
except Exception:
pass
os.kill(os.getpid(), signum)
for sig in (signal.SIGINT, signal.SIGTERM):
try:
signal.signal(sig, _signal_handler)
except Exception:
pass # 某些环境不允许设置
# =============== Windows 虚拟网卡清理 ===============
def _win_remove_adapter(self, name: str):
"""按名删除一个虚拟网卡。"""
print(f"[cleanup] remove adapter: {name}")
# 先尝试 PowerShell需要管理员
ps = [
"powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command",
f"$a=Get-NetAdapter -Name '{name}' -ErrorAction SilentlyContinue; "
f"if($a){{ Disable-NetAdapter -Name '{name}' -Confirm:$false -PassThru | Out-Null; "
f"Remove-NetAdapter -Name '{name}' -Confirm:$false -ErrorAction SilentlyContinue; }}"
]
try:
subprocess.run(ps, check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **self._win_hidden_popen_kwargs())
return
except Exception:
pass
# 兜底netsh 禁用
try:
subprocess.run(
["netsh", "interface", "set", "interface", name, "disable"],
check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **self._win_hidden_popen_kwargs()
)
except Exception:
pass
def _win_remove_all_pmd3_adapters(self):
"""宽匹配删除所有 pymobiledevice3-tunnel-* 网卡(防残留)。"""
print("[cleanup] sweeping all pymobiledevice3-tunnel-* adapters")
ps = [
"powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command",
r"$nics=Get-NetAdapter -Name 'pymobiledevice3-tunnel-*' -ErrorAction SilentlyContinue; "
r"foreach($n in $nics){ Disable-NetAdapter -Name $n.Name -Confirm:$false -PassThru | Out-Null; "
r"Remove-NetAdapter -Name $n.Name -Confirm:$false -ErrorAction SilentlyContinue; }"
]
try:
subprocess.run(ps, check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **self._win_hidden_popen_kwargs())
except Exception:
pass
def _kill_stray_tunneld_children(self):
"""在当前进程空间内尽量清理残留的 tunneld 子进程。"""
import psutil
try:
me = psutil.Process(os.getpid())
for ch in me.children(recursive=True):
try:
cmd = " ".join(ch.cmdline()).lower()
except Exception:
cmd = ""
if "pymobiledevice3" in cmd and "remote" in cmd and "tunneld" in cmd:
try:
ch.terminate()
ch.wait(2)
except Exception:
try:
ch.kill()
except Exception:
pass
except Exception:
pass
# =============== 其它工具 & 你原有的方法(未改动核心逻辑) ===============
def _pmd3_run(self, args: List[str], udid: str, extra_env: Optional[dict] = None) -> str:
launcher, env = self._resolve_pmd3_argv_and_env()
env["PYMOBILEDEVICE3_UDID"] = udid
env["PYTHONUNBUFFERED"] = "1"
env.setdefault("PYTHONIOENCODING", "utf-8")
if extra_env:
for k, v in extra_env.items():
if v is None:
env.pop(k, None)
else:
env[k] = str(v)
cmd = [*launcher, *args]
print("[pmd3]", " ".join(map(str, cmd)))
try:
return subprocess.check_output(
cmd,
text=True,
stderr=subprocess.STDOUT,
env=env,
**self._win_hidden_popen_kwargs()
) or ""
except subprocess.CalledProcessError as exc:
raise RuntimeError(exc.output or f"pymobiledevice3 执行失败,退出码 {exc.returncode}")
def _ensure_str_list(self, seq):
return [str(x) for x in seq]
def _win_hidden_popen_kwargs(self):
if os.name != "nt":
return {}
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
si.wShowWindow = 0 # SW_HIDE
return {
"startupinfo": si,
"creationflags": getattr(subprocess, "CREATE_NO_WINDOW", 0x08000000),
}
def _resolve_pmd3_argv_and_env(self):
import shutil, subprocess
from pathlib import Path
env = os.environ.copy()
env["PYTHONUNBUFFERED"] = "1"
env.setdefault("PYTHONIOENCODING", "utf-8")
prefer_py = env.get("IOSAI_PYTHON")
base = Path(sys.argv[0]).resolve()
base_dir = base.parent if base.is_file() else base
py_name = "python.exe" if os.name == "nt" else "python"
sidecar_candidates = [
base_dir / "python-rt" / py_name,
base_dir / "python-rt" / "Scripts" / py_name,
base_dir.parent / "python-rt" / py_name,
base_dir.parent / "python-rt" / "Scripts" / py_name,
]
if prefer_py:
sidecar_candidates.insert(0, Path(prefer_py))
for cand in sidecar_candidates:
print(f"[IOSAI] 🔎 probing sidecar at: {cand}")
if cand.is_file():
try:
out = subprocess.check_output(
[str(cand), "-c", "import pymobiledevice3;print('ok')"],
text=True, stderr=subprocess.STDOUT, env=env, timeout=6, **self._win_hidden_popen_kwargs()
)
if "ok" in out:
print(f"[IOSAI] ✅ sidecar selected: {cand}")
return ([str(cand), "-u", "-m", "pymobiledevice3"], env)
except Exception:
pass
exe = shutil.which("pymobiledevice3")
if exe:
print(f"[IOSAI] ✅ use PATH executable: {exe}")
return ([exe], env)
py_candidates = []
base_exec = getattr(sys, "_base_executable", None)
if base_exec and os.path.isfile(base_exec):
py_candidates.append(base_exec)
for name in ("python3.exe", "python.exe", "py.exe", "python3", "python"):
p = shutil.which(name)
if p and p not in py_candidates:
py_candidates.append(p)
for py in py_candidates:
print(f"[IOSAI] 🔎 probing system python: {py}")
try:
out = subprocess.check_output(
[py, "-c", "import pymobiledevice3;print('ok')"],
text=True, stderr=subprocess.STDOUT, env=env, timeout=6, **self._win_hidden_popen_kwargs()
)
if "ok" in out:
print(f"[IOSAI] ✅ system python selected: {py}")
return ([py, "-u", "-m", "pymobiledevice3"], env)
except Exception:
continue
raise RuntimeError("未检测到可用的 pymobiledevice3建议携带 python-rt 或安装系统 Python+pmd3")
# -------- DDI / RSD / 启动 WDA (与你原逻辑一致) --------
def _auto_mount_developer_disk(self, udid: str, retries: int = 3, backoff_seconds: float = 2.0) -> None:
import time as _t
last_err = ""
for i in range(max(1, retries)):
try:
out = self._pmd3_run(["mounter", "auto-mount"], udid)
if out:
for line in out.splitlines():
print(f"[mounter] {line}")
if "already mounted" in (out or "").lower():
print("[mounter] Developer disk image already mounted.")
else:
print("[mounter] Developer disk image mounted.")
return
except Exception as e:
last_err = str(e)
if i < retries - 1:
print(f"[mounter] attempt {i+1}/{retries} failed, retrying in {backoff_seconds}s ...")
_t.sleep(backoff_seconds)
else:
raise RuntimeError(f"Auto-mount failed after {retries} attempts.\n{last_err}")
def _is_ipv4_host(self, host: str) -> bool:
return bool(re.match(r"^\d{1,3}(\.\d{1,3}){3}$", host))
def _wait_for_rsd_ready(self, rsd_host: str, rsd_port: str, retries: int = 5, delay: float = 3.0) -> bool:
port_int = int(rsd_port)
for i in range(1, retries + 1):
print(f"[rsd] Probing RSD {rsd_host}:{rsd_port} (attempt {i}/{retries}) ...")
try:
with socket.create_connection((rsd_host, port_int), timeout=2):
print("[rsd] ✅ RSD is reachable and ready.")
return True
except (socket.timeout, ConnectionRefusedError, OSError) as e:
print(f"[rsd] Not ready yet ({e}). Retrying...")
import time as _t
_t.sleep(delay)
print("[rsd] ❌ RSD did not become ready after retries.")
return False
def _launch_wda_via_rsd(self, bundle_id: str, rsd_host: str, rsd_port: str, udid: str) -> None:
print(f"[wda] Launch via RSD {rsd_host}:{rsd_port}, bundle: {bundle_id}")
out = self._pmd3_run(["developer", "dvt", "launch", bundle_id, "--rsd", rsd_host, rsd_port], udid=udid)
if out:
for line in out.splitlines():
print(f"[wda] {line}")
print("[wda] Launch via RSD completed.")
def _launch_wda_via_http_tunnel(self, bundle_id: str, http_host: str, http_port: str, udid: str) -> None:
if not self._is_ipv4_host(http_host):
raise RuntimeError(f"HTTP tunnel host must be IPv4, got {http_host}")
tunnel_endpoint = f"{http_host}:{http_port}"
print(f"[wda] Launch via HTTP tunnel {tunnel_endpoint}, bundle: {bundle_id}")
out = self._pmd3_run(["developer", "dvt", "launch", bundle_id], udid=udid,
extra_env={"PYMOBILEDEVICE3_TUNNEL": tunnel_endpoint})
if out:
for line in out.splitlines():
print(f"[wda] {line}")
print("[wda] Launch via HTTP tunnel completed.")
# -------- 端口挑选 --------
def _pick_available_port(self, base=49151, step=10) -> int:
for i in range(0, 1000, step):
port = base + i
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
if s.connect_ex(("127.0.0.1", port)) != 0:
return port
raise RuntimeError("No free port found for tunneld")
# -------- UDID 过滤 --------
def _line_is_for_udid(self, line: str, udid: str) -> bool:
try:
return udid.lower() in (line or "").lower()
except Exception:
return False