优化底层逻辑

This commit is contained in:
2025-11-21 22:03:35 +08:00
parent d96a19c659
commit af4ab583a5
5 changed files with 456 additions and 1224 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -1,540 +1,273 @@
# -*- coding: utf-8 -*-
import os
import re
import sys
import atexit
import signal
import socket
import subprocess import subprocess
from typing import Optional, List, Tuple, Dict, Set import threading
import time
import os
import sys
from typing import Tuple, Optional
from Entity.Variables import WdaAppBundleId from Entity.Variables import WdaAppBundleId
import time as _t
class IOSActivator: class IOSActivator:
""" """
轻量 iOS 激活器(仅代码调用) - 进程/网卡可控版 专门给 iOS17+ 设备用的 go-ios 激活器:
1) 启动 `pymobiledevice3 remote tunneld`(可常驻/可一次性) - 外部先探测 WDA不存在时再调用 activate_ios17
2) 自动挂载 DDI - 内部流程tunnel start -> pair(等待成功) -> image auto -> runwda
3) 隧道就绪后启动 WDA
4) 程序退出或 keep_tunnel=False 时,确保 tunneld 进程与虚拟网卡被清理
""" """
# ---------- 正则 ---------- def __init__(
HTTP_RE = re.compile(r"http://([0-9.]+):(\d+)") self,
RSD_CREATED_RE = re.compile(r"Created tunnel\s+--rsd\s+([^\s]+)\s+(\d+)") ios_path: Optional[str] = None,
RSD_FALLBACK_RE = re.compile(r"--rsd\s+(\S+?)[\s:](\d+)") pair_timeout: int = 60, # 配对最多等多久
IFACE_RE = re.compile(r"\b(pymobiledevice3-tunnel-[^\s/\\]+)\b", re.IGNORECASE) pair_retry_interval: int = 3, # 每次重试间隔
):
def __init__(self, python_executable: Optional[str] = None):
self.python = python_executable or None
self._live_procs: Dict[str, subprocess.Popen] = {} # udid -> tunneld proc
self._live_ifaces: Dict[str, Set[str]] = {} # udid -> {iface names}
self._registered = False
# =============== 公共入口 ===============
def activate(
self,
udid: str,
wda_bundle_id: Optional[str] = WdaAppBundleId,
ready_timeout_sec: float = 120.0,
mount_retries: int = 3,
backoff_seconds: float = 2.0,
rsd_probe_retries: int = 5,
rsd_probe_delay_sec: float = 3.0,
pre_mount_first: bool = True,
keep_tunnel: bool = False, # 默认 FalseWDA 拉起后关闭隧道
broad_cleanup_on_exit: bool = True, # 退出时顺带清理所有 pmd3 残留网卡
) -> str:
""" """
流程:挂镜像(可选) -> 开隧道 -> 等 RSD -> 启动 WDA :param ios_path: ios.exe 的绝对路径,例如 E:\\code\\Python\\iOSAi\\resources\\ios.exe
- keep_tunnel=FalseWDA 启动后关闭隧道并清理 如果为 None则自动从项目的 resources 目录中寻找 ios.exe
- keep_tunnel=True隧道常驻由上层/atexit 清理
""" """
if not udid or not isinstance(udid, str):
raise ValueError("udid is required and must be a non-empty string")
print(f"[activate] UDID = {udid}") # ==== 统一获取 resources 目录(支持源码运行 + Nuitka EXE ====
self._ensure_exit_hooks(broad_cleanup_on_exit=broad_cleanup_on_exit) if "__compiled__" in globals():
# 被 Nuitka 编译后的 exe 运行时
base_dir = os.path.dirname(sys.executable) # exe 所在目录
else:
# 开发环境,直接跑 .py
cur_file = os.path.abspath(__file__) # 当前 .py 文件所在目录
base_dir = os.path.dirname(os.path.dirname(cur_file)) # 回到项目根 iOSAi
# Windows 管理员检测 resource_dir = os.path.join(base_dir, "resources")
# 如果外部没有显式传 ios_path就用 resources/ios.exe
if ios_path is None or ios_path == "":
ios_path = os.path.join(resource_dir, "ios.exe")
self.ios_path = ios_path
self.pair_timeout = pair_timeout
self.pair_retry_interval = pair_retry_interval
self._lock = threading.Lock()
# go-ios tunnel 的后台进程
self._tunnel_proc: Optional[subprocess.Popen] = None
# Windows: 避免弹黑框
self._creationflags = 0
self._startupinfo = None # ⭐ 新增:统一控制窗口隐藏
if os.name == "nt": if os.name == "nt":
import ctypes
try: try:
is_admin = ctypes.windll.shell32.IsUserAnAdmin() != 0 self._creationflags = subprocess.CREATE_NO_WINDOW # type: ignore[attr-defined]
except Exception: except Exception:
is_admin = False self._creationflags = 0
if not is_admin:
print("[⚠] 未以管理员运行:若需要移除虚拟网卡,可能失败。")
start_ts = _t.time() # ⭐ 用 STARTUPINFO + STARTF_USESHOWWINDOW 彻底隐藏窗口
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
si.wShowWindow = 0 # SW_HIDE
self._startupinfo = si
# 1) 预挂载(失败不致命) # =======================
if pre_mount_first: # 基础执行封装
try: # =======================
self._auto_mount_developer_disk( def _run(
udid, retries=mount_retries, backoff_seconds=backoff_seconds self,
) args,
_t.sleep(2) desc: str = "",
except Exception as e: timeout: Optional[int] = None,
print(f"[activate] 预挂载失败(稍后再试):{e}") check: bool = True,
) -> Tuple[int, str, str]:
# 2) 启动 tunneld """
http_host: Optional[str] = None 同步执行 ios.exe等它返回。
http_port: Optional[str] = None # ⚠️ 端口以 str 存储 :return: (returncode, stdout, stderr)
rsd_host: Optional[str] = None """
rsd_port: Optional[str] = None # ⚠️ 端口以 str 存储 cmd = [self.ios_path] + list(args)
iface_names: Set[str] = set()
proc, _port_ignored = self._start_tunneld(udid)
self._live_procs[udid] = proc
self._live_ifaces[udid] = iface_names
captured: List[str] = []
out: str = ""
wda_started = False
mount_done = pre_mount_first
try: try:
assert proc.stdout is not None proc = subprocess.run(
for line in proc.stdout:
captured.append(line)
# 日志长度控制,防止常驻时内存涨太多
if len(captured) > 20000:
captured = captured[-10000:]
print(f"[tunneld] {line}", end="")
# 捕获虚拟网卡名
for m in self.IFACE_RE.finditer(line):
iface_names.add(m.group(1))
# 子进程若退出则停止读取
if proc.poll() is not None:
break
# 捕获 HTTP 网关端口(保持为字符串)
if http_port is None:
m = self.HTTP_RE.search(line)
if m:
http_host = m.group(1)
http_port = m.group(2)
# 简单校验
try:
_ = int(http_port)
except Exception:
print(f"[tunneld] bad http port: {http_port}")
http_host, http_port = None, None
else:
print(f"[tunneld] Tunnel API: {http_host}:{http_port}")
# 只处理当前 UDID 的 RSD 行
if not self._line_is_for_udid(line, udid):
continue
m = self.RSD_CREATED_RE.search(line) or self.RSD_FALLBACK_RE.search(line)
if m and rsd_host is None and rsd_port is None:
rsd_host = m.group(1)
rsd_port = m.group(2)
try:
_ = int(rsd_port) # 仅作数字校验
except Exception:
print(f"[tunneld] bad rsd port: {rsd_port}")
rsd_host, rsd_port = None, None
else:
print(f"[tunneld] Device-level tunnel ready (RSD {rsd_host}:{rsd_port}).")
# ========= 尝试启动 WDA =========
if (not wda_started) and wda_bundle_id and (rsd_host is not None) and (rsd_port is not None):
if not mount_done:
self._auto_mount_developer_disk(
udid, retries=mount_retries, backoff_seconds=backoff_seconds
)
_t.sleep(2)
mount_done = True
# RSD 优先;探测时临时转 int启动命令仍传 str 端口
rsd_port_int = int(rsd_port)
if self._wait_for_rsd_ready(
rsd_host, rsd_port_int, retries=rsd_probe_retries, delay=rsd_probe_delay_sec
):
# 这里的实现通常会拼 subprocess 命令行,故端口保持 str
self._launch_wda_via_rsd(
bundle_id=wda_bundle_id,
rsd_host=rsd_host,
rsd_port=rsd_port, # ⚠️ 传入 str避免 subprocess 报错
udid=udid,
)
wda_started = True
elif (http_host is not None) and (http_port is not None):
self._launch_wda_via_http_tunnel(
bundle_id=wda_bundle_id,
http_host=http_host,
http_port=http_port, # ⚠️ 传入 str
udid=udid,
)
wda_started = True
else:
raise RuntimeError("No valid tunnel endpoint for WDA.")
# ✅ WDA 已启动;默认一次性模式直接退出读取循环
if wda_started and not keep_tunnel:
_t.sleep(0.5) # 给隧道多刷几行
print("[activate] WDA launched; exiting reader loop (keep_tunnel=False).")
break
# 超时保护(仅在 WDA 尚未启动时生效)
if (not wda_started) and ready_timeout_sec > 0 and (_t.time() - start_ts > ready_timeout_sec):
print(f"[tunneld] Timeout waiting for device tunnel ({ready_timeout_sec}s). Aborting.")
break
print("[activate] 启动 WDA 读取阶段结束")
out = "".join(captured)
except Exception as e:
print(f"[activate] 发生异常:{e}")
raise
finally:
if not keep_tunnel:
try:
self.stop_tunnel(udid, broad_cleanup=broad_cleanup_on_exit)
except Exception as ce:
print(f"[activate] stop_tunnel 清理异常:{ce}")
print("[activate] Done.")
return out
# =============== 外部可显式调用的清理 ===============
def stop_tunnel(self, udid: str, broad_cleanup: bool = True):
"""关闭某 UDID 的 tunneld并清理已知/残留的 pmd3 虚拟网卡。"""
proc = self._live_procs.pop(udid, None)
ifaces = self._live_ifaces.pop(udid, set())
# 1) 结束进程
if proc:
try:
proc.terminate()
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
try:
proc.kill()
proc.wait(timeout=3)
except Exception:
pass
# 兜底:杀掉本进程树内可能残留的 tunneld
self._kill_stray_tunneld_children()
# 2) 清理网卡
try:
if os.name == "nt":
# 先按已知名精确删除
for name in sorted(ifaces):
self._win_remove_adapter(name)
# 宽匹配兜底
if broad_cleanup:
self._win_remove_all_pmd3_adapters()
else:
# *nix 基本不需要手动删,若有需要可在此处添加 ip link delete 等
pass
except Exception as e:
print(f"[cleanup] adapter cleanup error: {e}")
# =============== 内部:启动 tunneld ===============
def _start_tunneld(self, udid: str) -> Tuple[subprocess.Popen, int]:
port = self._pick_available_port()
launcher, env2 = self._resolve_pmd3_argv_and_env()
env2["PYTHONUNBUFFERED"] = "1"
env2.setdefault("PYTHONIOENCODING", "utf-8")
env2["PYMOBILEDEVICE3_UDID"] = udid
cmd = self._ensure_str_list([*launcher, "remote", "tunneld", "--port", str(port)])
print("[activate] 启动隧道:", " ".join(cmd))
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
universal_newlines=True,
env=env2,
**self._win_hidden_popen_kwargs()
)
return proc, port
# =============== 退出/信号回收 ===============
def _ensure_exit_hooks(self, broad_cleanup_on_exit: bool):
if self._registered:
return
self._registered = True
def _on_exit():
# 逐个关闭存活隧道
for udid in list(self._live_procs.keys()):
try:
self.stop_tunnel(udid, broad_cleanup=broad_cleanup_on_exit)
except Exception:
pass
atexit.register(_on_exit)
def _signal_handler(signum, frame):
_on_exit()
# 默认行为:再次发出信号退出
try:
signal.signal(signum, signal.SIG_DFL)
except Exception:
pass
os.kill(os.getpid(), signum)
for sig in (signal.SIGINT, signal.SIGTERM):
try:
signal.signal(sig, _signal_handler)
except Exception:
pass # 某些环境不允许设置
# =============== Windows 虚拟网卡清理 ===============
def _win_remove_adapter(self, name: str):
"""按名删除一个虚拟网卡。"""
print(f"[cleanup] remove adapter: {name}")
# 先尝试 PowerShell需要管理员
ps = [
"powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command",
f"$a=Get-NetAdapter -Name '{name}' -ErrorAction SilentlyContinue; "
f"if($a){{ Disable-NetAdapter -Name '{name}' -Confirm:$false -PassThru | Out-Null; "
f"Remove-NetAdapter -Name '{name}' -Confirm:$false -ErrorAction SilentlyContinue; }}"
]
try:
subprocess.run(ps, check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **self._win_hidden_popen_kwargs())
return
except Exception:
pass
# 兜底netsh 禁用
try:
subprocess.run(
["netsh", "interface", "set", "interface", name, "disable"],
check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **self._win_hidden_popen_kwargs()
)
except Exception:
pass
def _win_remove_all_pmd3_adapters(self):
"""宽匹配删除所有 pymobiledevice3-tunnel-* 网卡(防残留)。"""
print("[cleanup] sweeping all pymobiledevice3-tunnel-* adapters")
ps = [
"powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command",
r"$nics=Get-NetAdapter -Name 'pymobiledevice3-tunnel-*' -ErrorAction SilentlyContinue; "
r"foreach($n in $nics){ Disable-NetAdapter -Name $n.Name -Confirm:$false -PassThru | Out-Null; "
r"Remove-NetAdapter -Name $n.Name -Confirm:$false -ErrorAction SilentlyContinue; }"
]
try:
subprocess.run(ps, check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **self._win_hidden_popen_kwargs())
except Exception:
pass
def _kill_stray_tunneld_children(self):
"""在当前进程空间内尽量清理残留的 tunneld 子进程。"""
import psutil
try:
me = psutil.Process(os.getpid())
for ch in me.children(recursive=True):
try:
cmd = " ".join(ch.cmdline()).lower()
except Exception:
cmd = ""
if "pymobiledevice3" in cmd and "remote" in cmd and "tunneld" in cmd:
try:
ch.terminate()
ch.wait(2)
except Exception:
try:
ch.kill()
except Exception:
pass
except Exception:
pass
# =============== 其它工具 & 你原有的方法(未改动核心逻辑) ===============
def _pmd3_run(self, args: List[str], udid: str, extra_env: Optional[dict] = None) -> str:
launcher, env = self._resolve_pmd3_argv_and_env()
env["PYMOBILEDEVICE3_UDID"] = udid
env["PYTHONUNBUFFERED"] = "1"
env.setdefault("PYTHONIOENCODING", "utf-8")
if extra_env:
for k, v in extra_env.items():
if v is None:
env.pop(k, None)
else:
env[k] = str(v)
cmd = [*launcher, *args]
print("[pmd3]", " ".join(map(str, cmd)))
try:
return subprocess.check_output(
cmd, cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True, text=True,
stderr=subprocess.STDOUT, timeout=timeout,
env=env, creationflags=self._creationflags,
**self._win_hidden_popen_kwargs() startupinfo=self._startupinfo, # ⭐ 关键:隐藏窗口
) or "" )
except subprocess.CalledProcessError as exc: except subprocess.TimeoutExpired:
raise RuntimeError(exc.output or f"pymobiledevice3 执行失败,退出码 {exc.returncode}") if check:
raise
return -1, "", "timeout"
def _ensure_str_list(self, seq): out = proc.stdout or ""
return [str(x) for x in seq] err = proc.stderr or ""
def _win_hidden_popen_kwargs(self): if check and proc.returncode != 0:
if os.name != "nt": raise RuntimeError(f"[ios] 命令失败({desc}), returncode={proc.returncode}")
return {}
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
si.wShowWindow = 0 # SW_HIDE
return {
"startupinfo": si,
"creationflags": getattr(subprocess, "CREATE_NO_WINDOW", 0x08000000),
}
def _resolve_pmd3_argv_and_env(self): return proc.returncode, out, err
import shutil, subprocess
from pathlib import Path
env = os.environ.copy() def _spawn_tunnel(self) -> None:
env["PYTHONUNBUFFERED"] = "1" with self._lock:
env.setdefault("PYTHONIOENCODING", "utf-8") if self._tunnel_proc is not None and self._tunnel_proc.poll() is None:
print("[ios] tunnel 已经在运行,跳过重新启动")
prefer_py = env.get("IOSAI_PYTHON")
base = Path(sys.argv[0]).resolve()
base_dir = base.parent if base.is_file() else base
py_name = "python.exe" if os.name == "nt" else "python"
sidecar_candidates = [
base_dir / "python-rt" / py_name,
base_dir / "python-rt" / "Scripts" / py_name,
base_dir.parent / "python-rt" / py_name,
base_dir.parent / "python-rt" / "Scripts" / py_name,
]
if prefer_py:
sidecar_candidates.insert(0, Path(prefer_py))
for cand in sidecar_candidates:
print(f"[IOSAI] 🔎 probing sidecar at: {cand}")
if cand.is_file():
try:
out = subprocess.check_output(
[str(cand), "-c", "import pymobiledevice3;print('ok')"],
text=True, stderr=subprocess.STDOUT, env=env, timeout=6, **self._win_hidden_popen_kwargs()
)
if "ok" in out:
print(f"[IOSAI] ✅ sidecar selected: {cand}")
return ([str(cand), "-u", "-m", "pymobiledevice3"], env)
except Exception:
pass
exe = shutil.which("pymobiledevice3")
if exe:
print(f"[IOSAI] ✅ use PATH executable: {exe}")
return ([exe], env)
py_candidates = []
base_exec = getattr(sys, "_base_executable", None)
if base_exec and os.path.isfile(base_exec):
py_candidates.append(base_exec)
for name in ("python3.exe", "python.exe", "py.exe", "python3", "python"):
p = shutil.which(name)
if p and p not in py_candidates:
py_candidates.append(p)
for py in py_candidates:
print(f"[IOSAI] 🔎 probing system python: {py}")
try:
out = subprocess.check_output(
[py, "-c", "import pymobiledevice3;print('ok')"],
text=True, stderr=subprocess.STDOUT, env=env, timeout=6, **self._win_hidden_popen_kwargs()
)
if "ok" in out:
print(f"[IOSAI] ✅ system python selected: {py}")
return ([py, "-u", "-m", "pymobiledevice3"], env)
except Exception:
continue
raise RuntimeError("未检测到可用的 pymobiledevice3建议携带 python-rt 或安装系统 Python+pmd3")
# -------- DDI / RSD / 启动 WDA (与你原逻辑一致) --------
def _auto_mount_developer_disk(self, udid: str, retries: int = 3, backoff_seconds: float = 2.0) -> None:
import time as _t
last_err = ""
for i in range(max(1, retries)):
try:
out = self._pmd3_run(["mounter", "auto-mount"], udid)
if out:
for line in out.splitlines():
print(f"[mounter] {line}")
if "already mounted" in (out or "").lower():
print("[mounter] Developer disk image already mounted.")
else:
print("[mounter] Developer disk image mounted.")
return return
except Exception as e:
last_err = str(e)
if i < retries - 1:
print(f"[mounter] attempt {i+1}/{retries} failed, retrying in {backoff_seconds}s ...")
_t.sleep(backoff_seconds)
else:
raise RuntimeError(f"Auto-mount failed after {retries} attempts.\n{last_err}")
def _is_ipv4_host(self, host: str) -> bool: cmd = [self.ios_path, "tunnel", "start"]
return bool(re.match(r"^\d{1,3}(\.\d{1,3}){3}$", host)) print("[ios] 启动 go-ios tunnel: %s", " ".join(cmd))
def _wait_for_rsd_ready(self, rsd_host: str, rsd_port: str, retries: int = 5, delay: float = 3.0) -> bool:
port_int = int(rsd_port)
for i in range(1, retries + 1):
print(f"[rsd] Probing RSD {rsd_host}:{rsd_port} (attempt {i}/{retries}) ...")
try: try:
with socket.create_connection((rsd_host, port_int), timeout=2): proc = subprocess.Popen(
print("[rsd] ✅ RSD is reachable and ready.") cmd,
return True stdout=subprocess.PIPE,
except (socket.timeout, ConnectionRefusedError, OSError) as e: stderr=subprocess.PIPE,
print(f"[rsd] Not ready yet ({e}). Retrying...") text=True,
import time as _t creationflags=self._creationflags,
_t.sleep(delay) startupinfo=self._startupinfo, # ⭐ 关键:隐藏窗口
print("[rsd] ❌ RSD did not become ready after retries.") )
return False except Exception as e:
# 这里改成 warning并且直接返回不往外抛
print("[ios] 启动 tunnel 失败(忽略): %s", e)
return
def _launch_wda_via_rsd(self, bundle_id: str, rsd_host: str, rsd_port: str, udid: str) -> None: self._tunnel_proc = proc
print(f"[wda] Launch via RSD {rsd_host}:{rsd_port}, bundle: {bundle_id}") print("[ios] tunnel 启动成功, PID=%s", proc.pid)
out = self._pmd3_run(["developer", "dvt", "launch", bundle_id, "--rsd", rsd_host, rsd_port], udid=udid)
if out:
for line in out.splitlines():
print(f"[wda] {line}")
print("[wda] Launch via RSD completed.")
def _launch_wda_via_http_tunnel(self, bundle_id: str, http_host: str, http_port: str, udid: str) -> None: threading.Thread(
if not self._is_ipv4_host(http_host): target=self._drain_process_output,
raise RuntimeError(f"HTTP tunnel host must be IPv4, got {http_host}") args=(proc, "tunnel"),
tunnel_endpoint = f"{http_host}:{http_port}" daemon=True,
print(f"[wda] Launch via HTTP tunnel {tunnel_endpoint}, bundle: {bundle_id}") ).start()
out = self._pmd3_run(["developer", "dvt", "launch", bundle_id], udid=udid,
extra_env={"PYMOBILEDEVICE3_TUNNEL": tunnel_endpoint})
if out:
for line in out.splitlines():
print(f"[wda] {line}")
print("[wda] Launch via HTTP tunnel completed.")
# -------- 端口挑选 -------- def _drain_process_output(self, proc: subprocess.Popen, name: str):
def _pick_available_port(self, base=49151, step=10) -> int: """简单把后台进程的输出打到日志里,避免缓冲区阻塞。"""
for i in range(0, 1000, step):
port = base + i
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
if s.connect_ex(("127.0.0.1", port)) != 0:
return port
raise RuntimeError("No free port found for tunneld")
# -------- UDID 过滤 --------
def _line_is_for_udid(self, line: str, udid: str) -> bool:
try: try:
return udid.lower() in (line or "").lower() if proc.stdout:
except Exception: for line in proc.stdout:
return False line = line.rstrip()
print(line)
except Exception as e:
print("[ios][%s] 读取 stdout 异常: %s", name, e)
try:
if proc.stderr:
for line in proc.stderr:
line = line.rstrip()
if line:
print("[ios][%s][stderr] %s", name, line)
except Exception as e:
print("[ios][%s] 读取 stderr 异常: %s", name, e)
# =======================
# 具体步骤封装
# =======================
def _pair_until_success(self, udid: str) -> None:
"""
调用 `ios --udid <udid> pair`,直到成功或者超时。
成功条件stdout 中出现 "Successfully paired"
"""
deadline = time.time() + self.pair_timeout
attempt = 0
while True:
attempt += 1
print("[ios] 开始配对设备(%s),第 %d 次尝试", udid, attempt)
rc, out, err = self._run(
["--udid", udid, "pair"],
desc=f"pair({udid})",
timeout=20,
check=False,
)
text = (out or "") + "\n" + (err or "")
if "Successfully paired" in text:
print("[ios] 设备 %s 配对成功", udid)
return
if time.time() >= deadline:
raise RuntimeError(f"[ios] 设备 {udid} 在超时时间内配对失败")
time.sleep(self.pair_retry_interval)
def _mount_dev_image(self, udid: str) -> None:
"""
`ios --udid <udid> image auto` 挂载开发者镜像。
成功条件:输出里出现 "success mounting image"
"there is already a developer image mounted" 之类的提示。
挂载失败现在只打 warning不再抛异常避免阻断后续 runwda。
"""
print("[ios] 开始为设备 %s 挂载开发者镜像 (image auto)", udid)
rc, out, err = self._run(
["--udid", udid, "image", "auto"],
desc=f"image auto({udid})",
timeout=300,
check=False,
)
text = (out or "") + "\n" + (err or "")
text_lower = text.lower()
# 这些都当成功处理
success_keywords = [
"success mounting image",
"there is already a developer image mounted",
]
if any(k in text_lower for k in success_keywords):
print("[ios] 设备 %s 开发者镜像挂载完成", udid)
if text.strip():
print("[ios][image auto] output:\n%s", text.strip())
return
# 到这里说明没找到成功关键字,当成“不可靠但非致命”
print(
"[ios] 设备 %s 挂载开发者镜像可能失败(rc=%s),输出:\n%s",
udid, rc, text.strip()
)
# 关键:不再 raise直接 return让后续 runwda 继续试
return
def _run_wda(self, udid: str) -> None:
# ⭐ 按你验证的命令构造参数(绝对正确)
args = [
f"--udid={udid}",
"runwda",
f"--bundleid={WdaAppBundleId}",
f"--testrunnerbundleid={WdaAppBundleId}",
"--xctestconfig=yolo.xctest", # ⭐ 你亲自验证成功的值
]
rc, out, err = self._run(
args,
desc=f"runwda({udid})",
timeout=300,
check=False,
)
# =======================
# 对外主流程
# =======================
def activate_ios17(self, udid: str) -> None:
print("[WDA] iOS17+ 激活开始udid=%s", udid)
# 1. 启动 tunnel
self._spawn_tunnel()
# 2. 一直等到 pair 成功pair 不成功就没法玩了,直接返回)
try:
self._pair_until_success(udid)
except Exception as e:
print("[WDA] pair 失败,终止激活流程 udid=%s, err=%s", udid, e)
return
# 3. 挂载开发者镜像(现在是非致命错误)
try:
self._mount_dev_image(udid)
except Exception as e:
# 理论上不会再进到这里,但为了稳妥,多一层保护
print("[WDA] 挂载开发者镜像出现异常,忽略继续 udid=%s, err=%s", udid, e)
# 4. 尝试启动 WDA
try:
self._run_wda(udid)
except Exception as e:
print("[WDA] runwda 调用异常 udid=%s, err=%s", udid, e)
print("[WDA] iOS17+ 激活流程结束不代表一定成功udid=%s", udid)

View File

@@ -4,6 +4,7 @@ import os
import sys import sys
from pathlib import Path from pathlib import Path
import tidevice
from hypercorn.asyncio import serve from hypercorn.asyncio import serve
from hypercorn.config import Config from hypercorn.config import Config

BIN
resources/ios.exe Normal file

Binary file not shown.

BIN
resources/wintun.dll Normal file

Binary file not shown.