Files
iOSAI/Module/IOSActivator.py
2025-10-23 18:53:22 +08:00

445 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import sys
import socket
import subprocess
from typing import Optional
from Entity.Variables import WdaAppBundleId
class IOSActivator:
"""
轻量 iOS 激活器(仅代码调用) - 打包安全版
1) 启动 `pymobiledevice3 remote tunneld`(子进程常驻)
2) 自动挂载 Developer Disk Image进程内调用 CLI
3) 设备隧道就绪后启动 WDA进程内调用 CLI
- 优先使用 `--rsd <host> <port>`(支持 IPv6
- 失败再用 `PYMOBILEDEVICE3_TUNNEL=127.0.0.1:<port>` 回退(仅 IPv4
"""
def __init__(self, python_executable: Optional[str] = None):
# 仅用于旧逻辑兼容;本版本已不依赖 self.python 去 -m 调 CLI
self.python = python_executable or None
# =========================================================================
# 内部工具:进程内调用 pymobiledevice3 CLI
# =========================================================================
def _pmd3_run(self, args: list[str], udid: str, extra_env: Optional[dict] = None) -> str:
import subprocess
launcher, env = self._resolve_pmd3_argv_and_env()
env["PYMOBILEDEVICE3_UDID"] = udid
env["PYTHONUNBUFFERED"] = "1"
env.setdefault("PYTHONIOENCODING", "utf-8")
if extra_env:
for k, v in extra_env.items():
if v is None:
env.pop(k, None)
else:
env[k] = str(v)
cmd = [*launcher, *args]
print("[pmd3]", " ".join(map(str, cmd)))
try:
return subprocess.check_output(cmd, text=True, stderr=subprocess.STDOUT, env=env) or ""
except subprocess.CalledProcessError as exc:
raise RuntimeError(exc.output or f"pymobiledevice3 执行失败,退出码 {exc.returncode}")
def _pmd3_run_subprocess(self, full_args: list[str], extra_env: Optional[dict] = None) -> str:
"""
兜底:通过子进程执行 pymobiledevice3使用 _resolve_pmd3_argv_and_env())。
"""
import subprocess
launcher, env = self._resolve_pmd3_argv_and_env()
if extra_env:
for k, v in extra_env.items():
if v is None:
env.pop(k, None)
else:
env[k] = str(v)
cmd = [*launcher, *full_args]
cmd = self._ensure_str_list(cmd)
print("[pmd3-subproc]", " ".join(cmd))
try:
out = subprocess.check_output(cmd, text=True, stderr=subprocess.STDOUT, env=env)
return out or ""
except subprocess.CalledProcessError as exc:
raise RuntimeError(exc.output or f"pymobiledevice3 子进程执行失败,代码 {exc.returncode}")
# =========================================================================
# 旧版环境依赖的替代方案:仅用于启动 tunneld 的子进程(需要常驻)
# =========================================================================
def _resolve_pmd3_argv_and_env(self):
import os, sys, shutil, subprocess
from pathlib import Path
env = os.environ.copy()
env["PYTHONUNBUFFERED"] = "1"
env.setdefault("PYTHONIOENCODING", "utf-8")
# 1) 明确使用 IOSAI_PYTHON如果上层已设置
prefer_py = env.get("IOSAI_PYTHON")
# 2) 构造一批候选路径(先根目录,再 Scripts
base = Path(sys.argv[0]).resolve()
base_dir = base.parent if base.is_file() else base
py_name = "python.exe" if os.name == "nt" else "python"
sidecar_candidates = [
base_dir / "python-rt" / py_name, # ✅ 嵌入式/你现在的摆放方式
base_dir / "python-rt" / "Scripts" / py_name, # 兼容虚拟环境式
base_dir.parent / "python-rt" / py_name,
base_dir.parent / "python-rt" / "Scripts" / py_name,
]
# 若外层已通过 IOSAI_PYTHON 指了路径,也放进候选
if prefer_py:
sidecar_candidates.insert(0, Path(prefer_py))
# 打印探测
for cand in sidecar_candidates:
print(f"[IOSAI] 🔎 probing sidecar at: {cand}")
if cand.is_file():
try:
out = subprocess.check_output(
[str(cand), "-c", "import pymobiledevice3;print('ok')"],
text=True, stderr=subprocess.STDOUT, env=env, timeout=6
)
if "ok" in out:
print(f"[IOSAI] ✅ sidecar selected: {cand}")
return ([str(cand), "-u", "-m", "pymobiledevice3"], env)
except Exception:
# 候选解释器存在但没装 pmd3就继续找下一个
pass
# 3) 回退:系统 PATH 里直接找 pymobiledevice3 可执行
exe = shutil.which("pymobiledevice3")
if exe:
print(f"[IOSAI] ✅ use PATH executable: {exe}")
return ([exe], env)
# 4) 兜底:从系统 Python 里找装了 pmd3 的解释器
py_candidates = []
base_exec = getattr(sys, "_base_executable", None)
if base_exec and os.path.isfile(base_exec):
py_candidates.append(base_exec)
for name in ("python3.exe", "python.exe", "py.exe", "python3", "python"):
p = shutil.which(name)
if p and p not in py_candidates:
py_candidates.append(p)
for py in py_candidates:
print(f"[IOSAI] 🔎 probing system python: {py}")
try:
out = subprocess.check_output(
[py, "-c", "import pymobiledevice3;print('ok')"],
text=True, stderr=subprocess.STDOUT, env=env, timeout=6
)
if "ok" in out:
print(f"[IOSAI] ✅ system python selected: {py}")
return ([py, "-u", "-m", "pymobiledevice3"], env)
except Exception:
continue
raise RuntimeError("未检测到可用的 pymobiledevice3建议携带 python-rt 或安装系统 Python+pmd3")
def _ensure_str_list(self, seq):
return [str(x) for x in seq]
# =========================================================================
# 功能函数-
# =========================================================================
def _auto_mount_developer_disk(self, udid: str, retries: int = 3, backoff_seconds: float = 2.0) -> None:
print("[mounter] Developer disk image mounted.")
"""
使用进程内 CLIpymobiledevice3 mounter auto-mount带重试
"""
import time
last_err_text = ""
for i in range(max(1, retries)):
try:
out = self._pmd3_run(["mounter", "auto-mount"], udid)
if out:
for line in out.splitlines():
print(f"[mounter] {line}")
if "already mounted" in (out or "").lower():
print("[mounter] Developer disk image already mounted.")
else:
print("[mounter] Developer disk image mounted.")
return
except Exception as e:
last_err_text = str(e)
if i < retries - 1:
print(f"[mounter] attempt {i+1}/{retries} failed, retrying in {backoff_seconds}s ...")
try:
time.sleep(backoff_seconds)
except Exception:
pass
else:
raise RuntimeError(f"Auto-mount failed after {retries} attempts.\n{last_err_text}")
def _is_ipv4_host(self, host: str) -> bool:
import re as _re
return bool(_re.match(r"^\d{1,3}(\.\d{1,3}){3}$", host))
def _wait_for_rsd_ready(self, rsd_host: str, rsd_port: str, retries: int = 5, delay: float = 3.0) -> bool:
"""
探测 RSD 通道是否就绪:直接尝试 TCP 连接。
"""
port_int = int(rsd_port)
for i in range(1, retries + 1):
print(f"[rsd] Probing RSD {rsd_host}:{rsd_port} (attempt {i}/{retries}) ...")
try:
with socket.create_connection((rsd_host, port_int), timeout=2):
print("[rsd] ✅ RSD is reachable and ready.")
return True
except (socket.timeout, ConnectionRefusedError, OSError) as e:
print(f"[rsd] Not ready yet ({e}). Retrying...")
import time as _t
_t.sleep(delay)
print("[rsd] ❌ RSD did not become ready after retries.")
return False
def _launch_wda_via_rsd(self, bundle_id: str, rsd_host: str, rsd_port: str, udid: str) -> None:
"""
使用进程内 CLIpymobiledevice3 developer dvt launch <bundle> --rsd <host> <port>
"""
print(f"[wda] Launch via RSD {rsd_host}:{rsd_port}, bundle: {bundle_id}")
out = self._pmd3_run(
["developer", "dvt", "launch", bundle_id, "--rsd", rsd_host, rsd_port],
udid=udid,
)
if out:
for line in out.splitlines():
print(f"[wda] {line}")
print("[wda] Launch via RSD completed.")
def _launch_wda_via_http_tunnel(self, bundle_id: str, http_host: str, http_port: str, udid: str) -> None:
"""
退路:通过 HTTP 网关端口设置 PYMOBILEDEVICE3_TUNNEL仅 IPv4
使用进程内 CLI 启动 WDA。
"""
if not self._is_ipv4_host(http_host):
raise RuntimeError(f"HTTP tunnel host must be IPv4, got {http_host}")
tunnel_endpoint = f"{http_host}:{http_port}"
print(f"[wda] Launch via HTTP tunnel {tunnel_endpoint}, bundle: {bundle_id}")
out = self._pmd3_run(
["developer", "dvt", "launch", bundle_id],
udid=udid,
extra_env={"PYMOBILEDEVICE3_TUNNEL": tunnel_endpoint},
)
if out:
for line in out.splitlines():
print(f"[wda] {line}")
print("[wda] Launch via HTTP tunnel completed.")
def _pick_available_port(self, base=49151, step=10) -> int:
"""挑选一个可用端口(避免重复)"""
for i in range(0, 1000, step):
port = base + i
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
if s.connect_ex(("127.0.0.1", port)) != 0:
return port
raise RuntimeError("No free port found for tunneld")
# =========================================================================
# 对外方法
# =========================================================================
def activate(
self,
udid: str,
wda_bundle_id: Optional[str] = WdaAppBundleId,
ready_timeout_sec: float = 120.0,
mount_retries: int = 3,
backoff_seconds: float = 2.0,
rsd_probe_retries: int = 5,
rsd_probe_delay_sec: float = 3.0,
pre_mount_first: bool = True,
) -> str:
"""
执行:挂镜像(可选) -> 开隧道 -> (等待 RSD 就绪)-> 启动 WDA
- 优先用 `--rsd` 启动
- 失败再用 HTTP 网关作为退路
"""
if not udid or not isinstance(udid, str):
raise ValueError("udid is required and must be a non-empty string")
print(f"[activate] UDID = {udid}")
# ⚠️ 检查管理员权限
if os.name == "nt":
import ctypes
try:
is_admin = ctypes.windll.shell32.IsUserAnAdmin() != 0
except Exception:
is_admin = False
if not is_admin:
print("[⚠️] 当前进程未以管理员身份运行tunneld 可能无法创建 USB 隧道。")
import time as _t
start_ts = _t.time()
# 1) (可选) 先挂载,避免 tidevice 并发 DDI 竞态
if pre_mount_first:
try:
self._auto_mount_developer_disk(udid, retries=mount_retries, backoff_seconds=backoff_seconds)
_t.sleep(2) # 稳定 DDI
except Exception as e:
print(f"[activate] 预挂载失败(继续尝试开隧道后再挂载一次):{e}")
# 2) 启动 tunneld 子进程
port = self._pick_available_port()
launcher, env2 = self._resolve_pmd3_argv_and_env()
env2["PYTHONUNBUFFERED"] = "1"
env2.setdefault("PYTHONIOENCODING", "utf-8")
env2["PYMOBILEDEVICE3_UDID"] = udid
cmd = self._ensure_str_list([*launcher, "remote", "tunneld", "--port", str(port)])
print("[activate] 使用命令启动隧道:", " ".join(cmd))
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
universal_newlines=True,
env=env2,
)
captured: list[str] = []
http_host: Optional[str] = None
http_port: Optional[str] = None
rsd_host: Optional[str] = None
rsd_port: Optional[str] = None
device_tunnel_ready = False
wda_started = False
mount_done = pre_mount_first
HTTP_RE = re.compile(r"http://([0-9.]+):(\d+)")
RSD_CREATED_RE = re.compile(r"Created tunnel\s+--rsd\s+([^\s]+)\s+(\d+)")
RSD_FALLBACK_RE = re.compile(r"--rsd\s+(\S+?)[\s:](\d+)")
try:
assert proc.stdout is not None
for line in proc.stdout:
captured.append(line)
print(f"[tunneld] {line}", end="")
if proc.poll() is not None:
break
# 捕获 HTTP 网关端口
if http_port is None:
m = HTTP_RE.search(line)
if m:
http_host, http_port = m.group(1), m.group(2)
print(f"[tunneld] Tunnel API: {http_host}:{http_port}")
# ✅ 捕获 RSD仅识别当前 UDID 的行)
if self._line_is_for_udid(line, udid):
m = RSD_CREATED_RE.search(line) or RSD_FALLBACK_RE.search(line)
if m and not device_tunnel_ready:
rsd_host, rsd_port = m.group(1), m.group(2)
device_tunnel_ready = True
print(f"[tunneld] Device-level tunnel ready (RSD {rsd_host}:{rsd_port}).")
else:
continue # 其它设备的隧道日志忽略
# 当设备隧道准备好后启动 WDA
if (not wda_started) and wda_bundle_id and device_tunnel_ready:
try:
if not mount_done:
self._auto_mount_developer_disk(
udid, retries=mount_retries, backoff_seconds=backoff_seconds
)
_t.sleep(2)
mount_done = True
rsd_ok = False
if rsd_host and rsd_port:
rsd_ok = self._wait_for_rsd_ready(
rsd_host, rsd_port,
retries=rsd_probe_retries,
delay=rsd_probe_delay_sec,
)
if rsd_ok:
self._launch_wda_via_rsd(
bundle_id=wda_bundle_id,
rsd_host=rsd_host,
rsd_port=rsd_port,
udid=udid,
)
else:
if http_host and http_port:
self._launch_wda_via_http_tunnel(
bundle_id=wda_bundle_id,
http_host=http_host,
http_port=http_port,
udid=udid,
)
else:
raise RuntimeError("No valid tunnel endpoint for fallback.")
wda_started = True
except RuntimeError as exc:
print(str(exc), file=sys.stderr)
proc.terminate()
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
raise
# 超时保护
if (not wda_started) and ready_timeout_sec > 0 and (_t.time() - start_ts > ready_timeout_sec):
print(f"[tunneld] Timeout waiting for device tunnel ({ready_timeout_sec}s). Aborting.")
proc.terminate()
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
break
# 结束清理
try:
return_code = proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
return_code = proc.returncode or -9
output = "".join(captured)
if return_code != 0 and not wda_started:
raise RuntimeError(f"tunneld exited with code {return_code}.\n{output}")
print("[activate] Done.")
return output
except KeyboardInterrupt:
print("\n[activate] Interrupted, cleaning up ...", file=sys.stderr)
try:
proc.terminate()
proc.wait(timeout=5)
except Exception:
try:
proc.kill()
except Exception:
pass
raise
def _line_is_for_udid(self, line: str, udid: str) -> bool:
"""日志行是否属于目标 UDID。"""
try:
return udid.lower() in (line or "").lower()
except Exception:
return False