Compare commits

...

2 Commits

Author SHA1 Message Date
3b2b6ce741 合并代码 2025-11-24 20:39:05 +08:00
22da742532 优化逻辑 2025-11-24 20:38:50 +08:00
2 changed files with 323 additions and 222 deletions

View File

@@ -7,11 +7,11 @@ import subprocess
from typing import Dict, Optional
import tidevice
# import wda # 目前先不用 wda 获取屏幕,避免触发 tidevice 的那套 WDA 启动
import wda
from tidevice import Usbmux, ConnectionType
from Entity.DeviceModel import DeviceModel
from Entity.Variables import WdaAppBundleId, wdaScreenPort, wdaFunctionPort
from Entity.Variables import WdaAppBundleId, wdaFunctionPort
from Module.FlaskSubprocessManager import FlaskSubprocessManager
from Module.IOSActivator import IOSActivator
from Utils.LogManager import LogManager
@@ -21,6 +21,9 @@ class DeviceInfo:
_instance = None
_instance_lock = threading.Lock()
# 离线宽限期
REMOVE_GRACE_SEC = float(os.getenv("REMOVE_GRACE_SEC", "6.0"))
def __new__(cls, *args, **kwargs):
if not cls._instance:
with cls._instance_lock:
@@ -28,8 +31,6 @@ class DeviceInfo:
cls._instance = super().__new__(cls)
return cls._instance
REMOVE_GRACE_SEC = float(os.getenv("REMOVE_GRACE_SEC", "6.0"))
def __init__(self) -> None:
if getattr(self, "_initialized", False):
return
@@ -55,7 +56,7 @@ class DeviceInfo:
self._creationflags = 0
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type: ignore[attr-defined]
si.wShowWindow = 0 # SW_HIDE
self._startupinfo = si
@@ -63,9 +64,9 @@ class DeviceInfo:
print("[Init] DeviceInfo 初始化完成")
self._initialized = True
# ---------------------------
# ==========================
# 主循环
# ---------------------------
# ==========================
def listen(self):
LogManager.method_info("进入主循环", "listen", udid="system")
print("[Listen] 开始监听设备上下线...")
@@ -82,18 +83,18 @@ class DeviceInfo:
with self._lock:
known = set(self._models.keys())
# ---------- 1. 新设备 ----------
# 1. 新设备
for udid in online:
self._last_seen[udid] = time.time()
if udid not in known:
try:
self._add_device(udid)
except Exception as e:
# 关键:单设备异常不能干掉整个循环
# 单设备异常不能干掉整个循环
LogManager.warning(f"[Add] 处理设备 {udid} 异常: {e}", udid=udid)
print(f"[Add] 处理设备 {udid} 异常: {e}")
# ---------- 2. 可能离线设备 ----------
# 2. 可能离线设备
now = time.time()
for udid in list(known):
if udid not in online:
@@ -107,9 +108,9 @@ class DeviceInfo:
time.sleep(1)
# ---------------------------
# ==========================
# 添加设备
# ---------------------------
# ==========================
def _add_device(self, udid: str):
with self._lock:
if udid in self._models:
@@ -125,20 +126,7 @@ class DeviceInfo:
print(f"[Add] 获取系统版本失败 {udid}: {e}")
version_major = 0
# 启动 WDA
if version_major >= 17.0:
threading.Thread(
target=IOSActivator().activate_ios17,
args=(udid,),
daemon=True,
).start()
else:
try:
tidevice.Device(udid).app_start(WdaAppBundleId)
except Exception as e:
print(f"[Add] 使用 tidevice 启动 WDA 失败 {udid}: {e}")
# 分配投屏端口 & 写入模型
# 分配投屏端口 & 写入模型先插入width/height=0后面再异步更新
with self._lock:
self.screenPort += 1
screen_port = self.screenPort
@@ -149,7 +137,7 @@ class DeviceInfo:
width=0,
height=0,
scale=0,
type=1
type=1,
)
self._models[udid] = model
@@ -162,16 +150,106 @@ class DeviceInfo:
except Exception as e:
print(f"[iproxy] 启动失败 {udid}: {e}")
# 异步等待 WDA + 更新屏幕尺寸(后面你要真用尺寸,再把 _screen_info 换成 wda 版本即可)
# 启动 WDA
if version_major >= 17.0:
# iOS17+ 走 go-ios传入回调WDA 启动后再拿屏幕尺寸
threading.Thread(
target=IOSActivator().activate_ios17,
args=(udid, self._on_wda_ready),
daemon=True,
).start()
else:
# 旧版本直接用 tidevice 启动 WDA然后异步获取屏幕尺寸
try:
tidevice.Device(udid).app_start(WdaAppBundleId)
except Exception as e:
print(f"[Add] 使用 tidevice 启动 WDA 失败 {udid}: {e}")
else:
threading.Thread(
target=self._fetch_screen_and_notify,
args=(udid,),
daemon=True,
).start()
# ==========================
# WDA 启动回调iOS17+
# ==========================
def _on_wda_ready(self, udid: str):
print(f"[WDA] 回调触发,准备获取屏幕信息 udid={udid}")
# 稍微等一下再连,避免刚启动时不稳定
time.sleep(1)
threading.Thread(
target=self._async_wait_wda_and_update_screen,
target=self._fetch_screen_and_notify,
args=(udid,),
daemon=True
daemon=True,
).start()
# ---------------------------
# 启动 iproxy投屏
# ---------------------------
# ==========================
# 通过 WDA 获取屏幕信息
# ==========================
def _screen_info(self, udid: str):
try:
# 用 USBClient通过 WDA 功能端口访问
c = wda.USBClient(udid, wdaFunctionPort)
size = c.window_size()
w = int(size.width)
h = int(size.height)
s = float(c.scale) # facebook-wda 的 scale 挂在 client 上
print(f"[Screen] 成功获取屏幕 {w}x{h} scale={s} {udid}")
return w, h, s
except Exception as e:
print(f"[Screen] 获取屏幕失败: {e} udid={udid}")
return 0, 0, 0.0
# ==========================
# 异步获取屏幕尺寸并通知 Flask
# ==========================
def _fetch_screen_and_notify(self, udid: str):
"""
后台线程里多次尝试通过 WDA 获取屏幕尺寸,
成功后更新 model 并发一次 snapshot。
"""
max_retry = 15
interval = 1.0
# 给 WDA 一点启动缓冲时间
time.sleep(2.0)
for _ in range(max_retry):
# 设备已移除就不再尝试
with self._lock:
if udid not in self._models:
print(f"[Screen] 设备已移除,停止获取屏幕信息 udid={udid}")
return
w, h, s = self._screen_info(udid)
if w > 0 and h > 0:
# 更新模型
with self._lock:
m = self._models.get(udid)
if not m:
print(f"[Screen] 模型已不存在,无法更新 udid={udid}")
return
m.width = w
m.height = h
m.scale = s
print(f"[Screen] 屏幕信息更新完成,准备推送到 Flask udid={udid}")
try:
self._manager_send()
except Exception as e:
print(f"[Screen] 发送屏幕更新到 Flask 失败 udid={udid}, err={e}")
return
time.sleep(interval)
print(f"[Screen] 多次尝试仍未获取到屏幕信息 udid={udid}")
# ==========================
# iproxy 管理
# ==========================
def _start_iproxy(self, udid: str, local_port: int):
iproxy_path = self._find_iproxy()
@@ -183,14 +261,14 @@ class DeviceInfo:
args = [
iproxy_path,
"-u", udid,
str(local_port), # 本地端口(投屏)
"9567" # 手机端口(go-ios screencast
"-u",
udid,
str(local_port), # 本地端口(投屏
"9567", # 手机端口go-ios screencast
]
print(f"[iproxy] 启动进程: {args}")
# 不用 PIPE防止没人读导致缓冲爆掉窗口用前面配置隐藏
proc = subprocess.Popen(
args,
stdout=subprocess.DEVNULL,
@@ -198,12 +276,8 @@ class DeviceInfo:
creationflags=self._creationflags,
startupinfo=self._startupinfo,
)
self._iproxy_process[udid] = proc
# ---------------------------
# 停止 iproxy
# ---------------------------
def _stop_iproxy(self, udid: str):
p = self._iproxy_process.get(udid)
if not p:
@@ -219,39 +293,13 @@ class DeviceInfo:
self._iproxy_process.pop(udid, None)
print(f"[iproxy] 已停止 {udid}")
# ---------------------------
# 异步等待 WDA 然后更新屏幕大小
# ---------------------------
def _async_wait_wda_and_update_screen(self, udid: str):
print(f"[WDA] 等待 WDA 就绪 {udid}")
try:
# 最长等待 20 秒(你后面要真用屏幕尺寸,再把 _screen_info 换回 wda 的实现)
for _ in range(20):
w, h, s = self._screen_info(udid)
if w > 0:
print(f"[WDA] 屏幕信息成功 {udid} {w}x{h} scale={s}")
with self._lock:
m = self._models.get(udid)
if m:
m.width = w
m.height = h
m.scale = s
self._manager_send()
return
time.sleep(1)
except Exception as e:
print(f"[WDA] 获取屏幕信息异常 {udid}: {e}")
print(f"[WDA] 屏幕信息获取失败(超时) {udid}")
# ---------------------------
# ==========================
# 移除设备
# ---------------------------
# ==========================
def _remove_device(self, udid: str):
print(f"[Remove] 移除设备 {udid}")
# 先停 iproxy(只管自己的,不影响其他设备)
# 先停 iproxy
self._stop_iproxy(udid)
with self._lock:
@@ -260,34 +308,17 @@ class DeviceInfo:
self._manager_send()
# ---------------------------
# WDA 屏幕查询(当前保持空实现)
# ---------------------------
def _screen_info(self, udid: str):
# 现在先不通过 wda 取屏幕,避免触发 tidevice 那套 WDA 启动逻辑
# 你如果确认 go-ios 跑起来后用 facebook-wda 取尺寸是安全的,可以改回下面这种:
#
# try:
# c = wda.USBClient(udid, wdaFunctionPort)
# size = c.window_size()
# return int(size.width), int(size.height), float(c.scale)
# except Exception as e:
# print(f"[Screen] 获取屏幕信息异常: {e} {udid}")
# return 0, 0, 0.0
return 0, 0, 0.0
# ---------------------------
# 找 iproxy
# ---------------------------
# ==========================
# 工具方法
# ==========================
def _find_iproxy(self) -> str:
base = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
name = "iproxy.exe" if os.name == "nt" else "iproxy"
path = os.path.join(base, "resources", "iproxy", name)
return path
return os.path.join(base, "resources", "iproxy", name)
# ---------------------------
# 数据同步到 Flask
# ---------------------------
# ==========================
# 同步数据到 Flask
# ==========================
def _manager_send(self):
try:
self._send_snapshot_to_flask()
@@ -306,8 +337,8 @@ class DeviceInfo:
devices = [m.toDict() for m in self._models.values()]
payload = json.dumps({"devices": devices}, ensure_ascii=False)
port = int(os.getenv("FLASK_COMM_PORT", "34566"))
with socket.create_connection(("127.0.0.1", port), timeout=1.5) as s:
s.sendall(payload.encode() + b"\n")

View File

@@ -1,84 +1,96 @@
import subprocess
import threading
import time
import os
import sys
from typing import Tuple, Optional
import time
import threading
import subprocess
from typing import Optional, Callable
from Entity.Variables import WdaAppBundleId
class IOSActivator:
"""
专门给 iOS17+ 设备用的 go-ios 激活器:
- 外部先探测 WDA不存在时再调用 activate_ios17
- 内部流程tunnel start -> pair(等待成功) -> image auto -> runwda
给 iOS17+ 用的 go-ios 激活器(单例)
- 维护一条全局 tunnel 进程
- 流程tunnel start -> pair(重试) -> image auto(不致命) -> runwda(多次重试+日志判定成功)
- WDA 启动成功后触发回调 on_wda_ready(udid)
"""
# ===== 单例 & 全局 tunnel =====
_instance = None
_instance_lock = threading.Lock()
_tunnel_proc: Optional[subprocess.Popen] = None
_tunnel_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
with cls._instance_lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(
self,
ios_path: Optional[str] = None,
pair_timeout: int = 60, # 配对最多等多久
pair_retry_interval: int = 3, # 每次重试间隔
pair_timeout: int = 60, # 配对最多等多久
pair_retry_interval: int = 3, # 配对重试间隔
runwda_max_retry: int = 3, # runwda 最大重试次数
runwda_retry_interval: int = 3,# runwda 重试间隔
runwda_wait_timeout: int = 25 # 单次 runwda 等待“成功日志”的超时时间
):
"""
:param ios_path: ios.exe 的绝对路径,例如 E:\\code\\Python\\iOSAi\\resources\\ios.exe
如果为 None则自动从项目的 resources 目录中寻找 ios.exe
"""
if getattr(self, "_inited", False):
return
# ==== 统一获取 resources 目录(支持源码运行 + Nuitka EXE ====
# 运行路径处理(源码 / Nuitka EXE
if "__compiled__" in globals():
# 被 Nuitka 编译后的 exe 运行时
base_dir = os.path.dirname(sys.executable) # exe 所在目录
base_dir = os.path.dirname(sys.executable)
else:
# 开发环境,直接跑 .py
cur_file = os.path.abspath(__file__) # 当前 .py 文件所在目录
base_dir = os.path.dirname(os.path.dirname(cur_file)) # 回到项目根 iOSAi
cur_file = os.path.abspath(__file__)
base_dir = os.path.dirname(os.path.dirname(cur_file))
resource_dir = os.path.join(base_dir, "resources")
# 如果外部没有显式传 ios_path就用 resources/ios.exe
if ios_path is None or ios_path == "":
if not ios_path:
ios_path = os.path.join(resource_dir, "ios.exe")
self.ios_path = ios_path
self.pair_timeout = pair_timeout
self.pair_retry_interval = pair_retry_interval
self.runwda_max_retry = runwda_max_retry
self.runwda_retry_interval = runwda_retry_interval
self.runwda_wait_timeout = runwda_wait_timeout
self._lock = threading.Lock()
# go-ios tunnel 的后台进程
self._tunnel_proc: Optional[subprocess.Popen] = None
# Windows: 避免弹黑框
# Windows 隐藏黑框
self._creationflags = 0
self._startupinfo = None # ⭐ 新增:统一控制窗口隐藏
self._startupinfo = None
if os.name == "nt":
try:
self._creationflags = subprocess.CREATE_NO_WINDOW # type: ignore[attr-defined]
except Exception:
self._creationflags = 0
# ⭐ 用 STARTUPINFO + STARTF_USESHOWWINDOW 彻底隐藏窗口
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type: ignore[attr-defined]
si.wShowWindow = 0 # SW_HIDE
self._startupinfo = si
# =======================
# 基础执行封装
# =======================
self._inited = True
# ===== 通用同步命令执行 =====
def _run(
self,
args,
desc: str = "",
timeout: Optional[int] = None,
check: bool = True,
) -> Tuple[int, str, str]:
"""
同步执行 ios.exe等它返回。
:return: (returncode, stdout, stderr)
"""
):
cmd = [self.ios_path] + list(args)
cmd_str = " ".join(cmd)
if desc:
print(f"[ios] 执行命令({desc}): {cmd_str}")
else:
print(f"[ios] 执行命令: {cmd_str}")
try:
proc = subprocess.run(
@@ -88,7 +100,7 @@ class IOSActivator:
text=True,
timeout=timeout,
creationflags=self._creationflags,
startupinfo=self._startupinfo, # ⭐ 关键:隐藏窗口
startupinfo=self._startupinfo,
)
except subprocess.TimeoutExpired:
if check:
@@ -97,21 +109,42 @@ class IOSActivator:
out = proc.stdout or ""
err = proc.stderr or ""
if check and proc.returncode != 0:
print(f"[ios] 命令失败({desc}), rc={proc.returncode}")
raise RuntimeError(f"[ios] 命令失败({desc}), returncode={proc.returncode}")
return proc.returncode, out, err
def _spawn_tunnel(self) -> None:
with self._lock:
if self._tunnel_proc is not None and self._tunnel_proc.poll() is None:
# ===== tunnel 相关 =====
def _drain_process_output(self, proc: subprocess.Popen, name: str):
"""吃掉后台进程输出,防止缓冲区阻塞"""
try:
if proc.stdout:
for line in proc.stdout:
line = line.rstrip()
if line:
print(f"[ios][{name}] {line}")
except Exception as e:
print(f"[ios][{name}] 读取 stdout 异常: {e}")
try:
if proc.stderr:
for line in proc.stderr:
line = line.rstrip()
if line:
print(f"[ios][{name}][stderr] {line}")
except Exception as e:
print(f"[ios][{name}] 读取 stderr 异常: {e}")
def _spawn_tunnel(self):
"""启动 / 复用全局 tunnel"""
with IOSActivator._tunnel_lock:
if IOSActivator._tunnel_proc is not None and IOSActivator._tunnel_proc.poll() is None:
print("[ios] tunnel 已经在运行,跳过重新启动")
return
cmd = [self.ios_path, "tunnel", "start"]
print("[ios] 启动 go-ios tunnel: %s", " ".join(cmd))
print("[ios] 启动 go-ios tunnel:", " ".join(cmd))
try:
proc = subprocess.Popen(
cmd,
@@ -119,15 +152,14 @@ class IOSActivator:
stderr=subprocess.PIPE,
text=True,
creationflags=self._creationflags,
startupinfo=self._startupinfo, # ⭐ 关键:隐藏窗口
startupinfo=self._startupinfo,
)
except Exception as e:
# 这里改成 warning并且直接返回不往外抛
print("[ios] 启动 tunnel 失败(忽略): %s", e)
print("[ios] 启动 tunnel 失败(忽略):", e)
return
self._tunnel_proc = proc
print("[ios] tunnel 启动成功, PID=%s", proc.pid)
IOSActivator._tunnel_proc = proc
print("[ios] tunnel 启动成功, PID=", proc.pid)
threading.Thread(
target=self._drain_process_output,
@@ -135,39 +167,14 @@ class IOSActivator:
daemon=True,
).start()
def _drain_process_output(self, proc: subprocess.Popen, name: str):
"""简单把后台进程的输出打到日志里,避免缓冲区阻塞。"""
try:
if proc.stdout:
for line in proc.stdout:
line = line.rstrip()
print(line)
except Exception as e:
print("[ios][%s] 读取 stdout 异常: %s", name, e)
try:
if proc.stderr:
for line in proc.stderr:
line = line.rstrip()
if line:
print("[ios][%s][stderr] %s", name, line)
except Exception as e:
print("[ios][%s] 读取 stderr 异常: %s", name, e)
# =======================
# 具体步骤封装
# =======================
def _pair_until_success(self, udid: str) -> None:
"""
调用 `ios --udid <udid> pair`,直到成功或者超时。
成功条件stdout 中出现 "Successfully paired"
"""
# ===== pair & image =====
def _pair_until_success(self, udid: str):
deadline = time.time() + self.pair_timeout
attempt = 0
while True:
attempt += 1
print("[ios] 开始配对设备(%s),第 %d 次尝试", udid, attempt)
print(f"[ios] 开始配对设备({udid}),第 {attempt} 次尝试")
rc, out, err = self._run(
["--udid", udid, "pair"],
@@ -178,23 +185,16 @@ class IOSActivator:
text = (out or "") + "\n" + (err or "")
if "Successfully paired" in text:
print("[ios] 设备 %s 配对成功", udid)
print(f"[ios] 设备 {udid} 配对成功")
return
if time.time() >= deadline:
raise RuntimeError(f"[ios] 设备 {udid} 在超时时间内配对失败")
raise RuntimeError(f"[ios] 设备 {udid} 在超时时间内配对失败(rc={rc})")
time.sleep(self.pair_retry_interval)
def _mount_dev_image(self, udid: str) -> None:
"""
`ios --udid <udid> image auto` 挂载开发者镜像。
成功条件:输出里出现 "success mounting image"
"there is already a developer image mounted" 之类的提示。
挂载失败现在只打 warning不再抛异常避免阻断后续 runwda。
"""
print("[ios] 开始为设备 %s 挂载开发者镜像 (image auto)", udid)
def _mount_dev_image(self, udid: str):
print(f"[ios] 开始为设备 {udid} 挂载开发者镜像 (image auto)")
rc, out, err = self._run(
["--udid", udid, "image", "auto"],
desc=f"image auto({udid})",
@@ -204,70 +204,140 @@ class IOSActivator:
text = (out or "") + "\n" + (err or "")
text_lower = text.lower()
# 这些都当成功处理
success_keywords = [
"success mounting image",
"there is already a developer image mounted",
]
if any(k in text_lower for k in success_keywords):
print("[ios] 设备 %s 开发者镜像挂载完成", udid)
print(f"[ios] 设备 {udid} 开发者镜像挂载完成")
if text.strip():
print("[ios][image auto] output:\n%s", text.strip())
print("[ios][image auto] output:\n", text.strip())
return
# 到这里说明没找到成功关键字,当成“不可靠但非致命”
print(
"[ios] 设备 %s 挂载开发者镜像可能失败(rc=%s),输出:\n%s",
udid, rc, text.strip()
)
# 关键:不再 raise直接 return让后续 runwda 继续试
return
print(f"[ios] 设备 {udid} 挂载开发者镜像可能失败(rc={rc}),输出:\n{text.strip()}")
def _run_wda(self, udid: str) -> None:
# ⭐ 按你验证的命令构造参数(绝对正确)
args = [
# ===== runwda(关键逻辑) =====
def _run_wda_once_async(self, udid: str, on_wda_ready: Optional[Callable[[str], None]]) -> bool:
"""
单次 runwda
- 异步启动 ios.exe
- 实时读 stdout/stderr
- 捕获关键日志got capabilities / authorized true视为成功
- 超时/进程退出且未成功 -> 失败
"""
cmd = [
self.ios_path,
f"--udid={udid}",
"runwda",
f"--bundleid={WdaAppBundleId}",
f"--testrunnerbundleid={WdaAppBundleId}",
"--xctestconfig=yolo.xctest", # ⭐ 你亲自验证成功的值
"--xctestconfig=yolo.xctest",
]
print("[ios] 异步启动 runwda:", " ".join(cmd))
rc, out, err = self._run(
args,
desc=f"runwda({udid})",
timeout=300,
check=False,
)
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
creationflags=self._creationflags,
startupinfo=self._startupinfo,
)
except Exception as e:
print(f"[ios] 启动 runwda 进程失败: {e}")
return False
# =======================
# 对外主流程
# =======================
def activate_ios17(self, udid: str) -> None:
print("[WDA] iOS17+ 激活开始udid=%s", udid)
success_evt = threading.Event()
# 1. 启动 tunnel
def _reader(pipe, tag: str):
try:
for raw in pipe:
line = (raw or "").rstrip()
if not line:
continue
print(f"[WDA-LOG] {line}")
lower = line.lower()
# 这里是你实测的“成功特征”
if "got capabilities" in lower or '"authorized":true' in lower:
success_evt.set()
print(f"[ios] 捕获到 WDA 启动成功日志({tag})udid={udid}")
break
except Exception as e:
print(f"[ios] 读取 {tag} 日志异常: {e}")
# 日志线程
if proc.stdout:
threading.Thread(target=_reader, args=(proc.stdout, "stdout"), daemon=True).start()
if proc.stderr:
threading.Thread(target=_reader, args=(proc.stderr, "stderr"), daemon=True).start()
# 等待成功 / 退出 / 超时
start = time.time()
while True:
if success_evt.is_set():
print(f"[ios] WDA 日志确认已启动udid={udid}")
if on_wda_ready:
try:
on_wda_ready(udid)
except Exception as e:
print(f"[WDA] 回调执行异常: {e}")
# 不主动杀进程,让 WDA 挂在那儿
return True
rc = proc.poll()
if rc is not None:
print(f"[ios] runwda 进程退出 rc={rc}未检测到成功日志udid={udid}")
return False
if time.time() - start > self.runwda_wait_timeout:
print(f"[ios] runwda 等待超时({self.runwda_wait_timeout}s)未确认成功udid={udid}")
try:
proc.terminate()
except Exception:
pass
return False
time.sleep(0.2)
def _run_wda_with_retry(self, udid: str, on_wda_ready: Optional[Callable[[str], None]]) -> bool:
for attempt in range(1, self.runwda_max_retry + 1):
print(f"[ios] runwda 尝试 {attempt}/{self.runwda_max_retry}udid={udid}")
ok = self._run_wda_once_async(udid, on_wda_ready)
if ok:
print(f"[ios] runwda 第 {attempt} 次尝试成功udid={udid}")
return True
print(f"[ios] runwda 第 {attempt} 次尝试失败udid={udid}")
if attempt < self.runwda_max_retry:
time.sleep(self.runwda_retry_interval)
print(f"[ios] runwda 多次失败放弃udid={udid}")
return False
# ===== 对外主流程 =====
def activate_ios17(self, udid: str, on_wda_ready: Optional[Callable[[str], None]] = None) -> None:
print(f"[WDA] iOS17+ 激活开始udid={udid}, 回调={on_wda_ready}")
# 1. tunnel
self._spawn_tunnel()
# 2. 一直等到 pair 成功pair 不成功就没法玩了,直接返回)
# 2. 配对
try:
self._pair_until_success(udid)
except Exception as e:
print("[WDA] pair 失败,终止激活流程 udid=%s, err=%s", udid, e)
print(f"[WDA] pair 失败,终止激活流程 udid={udid}, err={e}")
return
# 3. 挂载开发者镜像(现在是非致命错误
# 3. 挂镜像(非致命)
try:
self._mount_dev_image(udid)
except Exception as e:
# 理论上不会再进到这里,但为了稳妥,多一层保护
print("[WDA] 挂载开发者镜像出现异常,忽略继续 udid=%s, err=%s", udid, e)
print(f"[WDA] 挂载开发者镜像异常(忽略) udid={udid}, err={e}")
# 4. 尝试启动 WDA
try:
self._run_wda(udid)
except Exception as e:
print("[WDA] runwda 调用异常 udid=%s, err=%s", udid, e)
# 4. runwda + 回调
ok = self._run_wda_with_retry(udid, on_wda_ready)
if not ok:
print(f"[WDA] runwda 多次失败可能需要手动检查设备udid={udid}")
print("[WDA] iOS17+ 激活流程结束不代表一定成功udid=%s", udid)
print(f"[WDA] iOS17+ 激活流程结束不代表一定成功udid={udid}")