Files
iOSAI/Module/IOSActivator.py
2025-11-25 18:13:02 +08:00

340 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import time
import threading
import subprocess
from typing import Optional, Callable
from Entity.Variables import WdaAppBundleId
class IOSActivator:
"""
给 iOS17+ 用的 go-ios 激活器(单例):
- 维护一条全局 tunnel 进程
- 流程tunnel start -> pair(重试) -> image auto(不致命) -> runwda(多次重试+日志判定成功)
- WDA 启动成功后触发回调 on_wda_ready(udid)
"""
# ===== 单例 & 全局 tunnel =====
_instance = None
_instance_lock = threading.Lock()
_tunnel_proc: Optional[subprocess.Popen] = None
_tunnel_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
with cls._instance_lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(
self,
ios_path: Optional[str] = None,
pair_timeout: int = 60, # 配对最多等多久
pair_retry_interval: int = 3, # 配对重试间隔
runwda_max_retry: int = 10, # runwda 最大重试次数
runwda_retry_interval: int = 3,# runwda 重试间隔
runwda_wait_timeout: int = 25 # 单次 runwda 等待“成功日志”的超时时间
):
if getattr(self, "_inited", False):
return
# 运行路径处理(源码 / Nuitka EXE
if "__compiled__" in globals():
base_dir = os.path.dirname(sys.executable)
else:
cur_file = os.path.abspath(__file__)
base_dir = os.path.dirname(os.path.dirname(cur_file))
resource_dir = os.path.join(base_dir, "resources")
if not ios_path:
ios_path = os.path.join(resource_dir, "ios.exe")
self.ios_path = ios_path
self.pair_timeout = pair_timeout
self.pair_retry_interval = pair_retry_interval
self.runwda_max_retry = runwda_max_retry
self.runwda_retry_interval = runwda_retry_interval
self.runwda_wait_timeout = runwda_wait_timeout
self._lock = threading.Lock()
# Windows 隐藏黑框
self._creationflags = 0
self._startupinfo = None
if os.name == "nt":
try:
self._creationflags = subprocess.CREATE_NO_WINDOW # type: ignore[attr-defined]
except Exception:
self._creationflags = 0
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type: ignore[attr-defined]
si.wShowWindow = 0 # SW_HIDE
self._startupinfo = si
self._inited = True
# ===== 通用同步命令执行 =====
def _run(
self,
args,
desc: str = "",
timeout: Optional[int] = None,
check: bool = True,
):
cmd = [self.ios_path] + list(args)
cmd_str = " ".join(cmd)
if desc:
print(f"[ios] 执行命令({desc}): {cmd_str}")
else:
print(f"[ios] 执行命令: {cmd_str}")
try:
proc = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=timeout,
creationflags=self._creationflags,
startupinfo=self._startupinfo,
)
except subprocess.TimeoutExpired:
if check:
raise
return -1, "", "timeout"
out = proc.stdout or ""
err = proc.stderr or ""
if check and proc.returncode != 0:
print(f"[ios] 命令失败({desc}), rc={proc.returncode}")
raise RuntimeError(f"[ios] 命令失败({desc}), returncode={proc.returncode}")
return proc.returncode, out, err
# ===== tunnel 相关 =====
def _drain_process_output(self, proc: subprocess.Popen, name: str):
"""吃掉后台进程输出,防止缓冲区阻塞"""
try:
if proc.stdout:
for line in proc.stdout:
line = line.rstrip()
if line:
print(f"[ios][{name}] {line}")
except Exception as e:
print(f"[ios][{name}] 读取 stdout 异常: {e}")
try:
if proc.stderr:
for line in proc.stderr:
line = line.rstrip()
if line:
print(f"[ios][{name}][stderr] {line}")
except Exception as e:
print(f"[ios][{name}] 读取 stderr 异常: {e}")
def _spawn_tunnel(self):
"""启动 / 复用全局 tunnel"""
with IOSActivator._tunnel_lock:
if IOSActivator._tunnel_proc is not None and IOSActivator._tunnel_proc.poll() is None:
print("[ios] tunnel 已经在运行,跳过重新启动")
return
cmd = [self.ios_path, "tunnel", "start"]
print("[ios] 启动 go-ios tunnel:", " ".join(cmd))
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
creationflags=self._creationflags,
startupinfo=self._startupinfo,
)
except Exception as e:
print("[ios] 启动 tunnel 失败(忽略):", e)
return
IOSActivator._tunnel_proc = proc
print("[ios] tunnel 启动成功, PID=", proc.pid)
threading.Thread(
target=self._drain_process_output,
args=(proc, "tunnel"),
daemon=True,
).start()
# ===== pair & image =====
def _pair_until_success(self, udid: str):
deadline = time.time() + self.pair_timeout
attempt = 0
while True:
attempt += 1
print(f"[ios] 开始配对设备({udid}),第 {attempt} 次尝试")
rc, out, err = self._run(
["--udid", udid, "pair"],
desc=f"pair({udid})",
timeout=20,
check=False,
)
text = (out or "") + "\n" + (err or "")
if "Successfully paired" in text:
print(f"[ios] 设备 {udid} 配对成功")
return
if time.time() >= deadline:
raise RuntimeError(f"[ios] 设备 {udid} 在超时时间内配对失败(rc={rc})")
time.sleep(self.pair_retry_interval)
def _mount_dev_image(self, udid: str):
print(f"[ios] 开始为设备 {udid} 挂载开发者镜像 (image auto)")
rc, out, err = self._run(
["--udid", udid, "image", "auto"],
desc=f"image auto({udid})",
timeout=300,
check=False,
)
text = (out or "") + "\n" + (err or "")
text_lower = text.lower()
success_keywords = [
"success mounting image",
"there is already a developer image mounted",
]
if any(k in text_lower for k in success_keywords):
print(f"[ios] 设备 {udid} 开发者镜像挂载完成")
if text.strip():
print("[ios][image auto] output:\n", text.strip())
return
print(f"[ios] 设备 {udid} 挂载开发者镜像可能失败(rc={rc}),输出:\n{text.strip()}")
# ===== runwda关键逻辑 =====
def _run_wda_once_async(self, udid: str, on_wda_ready: Optional[Callable[[str], None]]) -> bool:
"""
单次 runwda
- 异步启动 ios.exe
- 实时读 stdout/stderr
- 捕获关键日志got capabilities / authorized true视为成功
- 超时/进程退出且未成功 -> 失败
"""
cmd = [
self.ios_path,
f"--udid={udid}",
"runwda",
f"--bundleid={WdaAppBundleId}",
f"--testrunnerbundleid={WdaAppBundleId}",
"--xctestconfig=yolo.xctest",
]
print("[ios] 异步启动 runwda:", " ".join(cmd))
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
creationflags=self._creationflags,
startupinfo=self._startupinfo,
)
except Exception as e:
print(f"[ios] 启动 runwda 进程失败: {e}")
return False
success_evt = threading.Event()
def _reader(pipe, tag: str):
try:
for raw in pipe:
line = (raw or "").rstrip()
if not line:
continue
print(f"[WDA-LOG] {line}")
lower = line.lower()
# 这里是你实测的“成功特征”
if "got capabilities" in lower or '"authorized":true' in lower:
success_evt.set()
print(f"[ios] 捕获到 WDA 启动成功日志({tag})udid={udid}")
break
except Exception as e:
print(f"[ios] 读取 {tag} 日志异常: {e}")
# 日志线程
if proc.stdout:
threading.Thread(target=_reader, args=(proc.stdout, "stdout"), daemon=True).start()
if proc.stderr:
threading.Thread(target=_reader, args=(proc.stderr, "stderr"), daemon=True).start()
# 等待成功 / 退出 / 超时
start = time.time()
while True:
if success_evt.is_set():
print(f"[ios] WDA 日志确认已启动udid={udid}")
if on_wda_ready:
try:
on_wda_ready(udid)
except Exception as e:
print(f"[WDA] 回调执行异常: {e}")
# 不主动杀进程,让 WDA 挂在那儿
return True
rc = proc.poll()
if rc is not None:
print(f"[ios] runwda 进程退出 rc={rc}未检测到成功日志udid={udid}")
return False
if time.time() - start > self.runwda_wait_timeout:
print(f"[ios] runwda 等待超时({self.runwda_wait_timeout}s)未确认成功udid={udid}")
try:
proc.terminate()
except Exception:
pass
return False
time.sleep(0.2)
def _run_wda_with_retry(self, udid: str, on_wda_ready: Optional[Callable[[str], None]]) -> bool:
for attempt in range(1, self.runwda_max_retry + 1):
print(f"[ios] runwda 尝试 {attempt}/{self.runwda_max_retry}udid={udid}")
ok = self._run_wda_once_async(udid, on_wda_ready)
if ok:
print(f"[ios] runwda 第 {attempt} 次尝试成功udid={udid}")
return True
print(f"[ios] runwda 第 {attempt} 次尝试失败udid={udid}")
if attempt < self.runwda_max_retry:
time.sleep(self.runwda_retry_interval)
print(f"[ios] runwda 多次失败放弃udid={udid}")
return False
# ===== 对外主流程 =====
def activate_ios17(self, udid: str, on_wda_ready: Optional[Callable[[str], None]] = None) -> None:
print(f"[WDA] iOS17+ 激活开始udid={udid}, 回调={on_wda_ready}")
# 1. tunnel
self._spawn_tunnel()
# 2. 配对
try:
self._pair_until_success(udid)
except Exception as e:
print(f"[WDA] pair 失败,终止激活流程 udid={udid}, err={e}")
return
# 3. 挂镜像(非致命)
try:
self._mount_dev_image(udid)
except Exception as e:
print(f"[WDA] 挂载开发者镜像异常(忽略) udid={udid}, err={e}")
# 4. runwda + 回调
ok = self._run_wda_with_retry(udid, on_wda_ready)
if not ok:
print(f"[WDA] runwda 多次失败可能需要手动检查设备udid={udid}")
print(f"[WDA] iOS17+ 激活流程结束不代表一定成功udid={udid}")