修复掉画面的bug

This commit is contained in:
2025-10-24 22:04:28 +08:00
parent fe3c19fb21
commit 23f63e42c8
12 changed files with 796 additions and 470 deletions

View File

@@ -1,32 +1,307 @@
# -*- coding: utf-8 -*-
import os
import re
import sys
import atexit
import signal
import socket
import subprocess
from typing import Optional
from typing import Optional, List, Tuple, Dict, Set
from Entity.Variables import WdaAppBundleId
class IOSActivator:
"""
轻量 iOS 激活器(仅代码调用) - 打包安全
1) 启动 `pymobiledevice3 remote tunneld`子进程常驻
2) 自动挂载 Developer Disk Image进程内调用 CLI
3) 设备隧道就绪后启动 WDA(进程内调用 CLI
- 优先使用 `--rsd <host> <port>`(支持 IPv6
- 失败再用 `PYMOBILEDEVICE3_TUNNEL=127.0.0.1:<port>` 回退(仅 IPv4
轻量 iOS 激活器(仅代码调用) - 进程/网卡可控
1) 启动 `pymobiledevice3 remote tunneld`可常驻/可一次性
2) 自动挂载 DDI
3) 隧道就绪后启动 WDA
4) 程序退出或 keep_tunnel=False 时,确保 tunneld 进程与虚拟网卡被清理
"""
def __init__(self, python_executable: Optional[str] = None):
# 仅用于旧逻辑兼容;本版本已不依赖 self.python 去 -m 调 CLI
self.python = python_executable or None
# ---------- 正则 ----------
HTTP_RE = re.compile(r"http://([0-9.]+):(\d+)")
RSD_CREATED_RE = re.compile(r"Created tunnel\s+--rsd\s+([^\s]+)\s+(\d+)")
RSD_FALLBACK_RE = re.compile(r"--rsd\s+(\S+?)[\s:](\d+)")
IFACE_RE = re.compile(r"\b(pymobiledevice3-tunnel-[^\s/\\]+)\b", re.IGNORECASE)
# =========================================================================
# 内部工具:进程内调用 pymobiledevice3 CLI
# =========================================================================
def _pmd3_run(self, args: list[str], udid: str, extra_env: Optional[dict] = None) -> str:
import subprocess
def __init__(self, python_executable: Optional[str] = None):
self.python = python_executable or None
self._live_procs: Dict[str, subprocess.Popen] = {} # udid -> tunneld proc
self._live_ifaces: Dict[str, Set[str]] = {} # udid -> {iface names}
self._registered = False
# =============== 公共入口 ===============
def activate(
self,
udid: str,
wda_bundle_id: Optional[str] = WdaAppBundleId,
ready_timeout_sec: float = 120.0,
mount_retries: int = 3,
backoff_seconds: float = 2.0,
rsd_probe_retries: int = 5,
rsd_probe_delay_sec: float = 3.0,
pre_mount_first: bool = True,
keep_tunnel: bool = False, # << 新增:默认 False一次性用完就关
broad_cleanup_on_exit: bool = True, # << 退出时顺带清理所有 pmd3 残留网卡
) -> str:
"""
执行:挂镜像(可选) -> 开隧道 -> (等待 RSD 就绪)-> 启动 WDA
- 默认 keep_tunnel=FalseWDA 启动后关闭隧道(避免虚拟网卡常驻)
- keep_tunnel=True让隧道常驻交由 atexit/signal 或上层调用 stop_tunnel() 清理
"""
if not udid or not isinstance(udid, str):
raise ValueError("udid is required and must be a non-empty string")
print(f"[activate] UDID = {udid}")
self._ensure_exit_hooks(broad_cleanup_on_exit=broad_cleanup_on_exit)
# 管理员检测Windows 清理网卡需要)
if os.name == "nt":
import ctypes
try:
is_admin = ctypes.windll.shell32.IsUserAnAdmin() != 0
except Exception:
is_admin = False
if not is_admin:
print("[⚠] 未以管理员运行:若需要移除虚拟网卡,可能失败。")
import time as _t
start_ts = _t.time()
# 1) 预挂载
if pre_mount_first:
try:
self._auto_mount_developer_disk(udid, retries=mount_retries, backoff_seconds=backoff_seconds)
_t.sleep(2)
except Exception as e:
print(f"[activate] 预挂载失败(继续尝试开隧道后再挂载一次):{e}")
# 2) 启动 tunneld
http_host = http_port = rsd_host = rsd_port = None
iface_names: Set[str] = set()
proc, port = self._start_tunneld(udid)
self._live_procs[udid] = proc
self._live_ifaces[udid] = iface_names
captured: List[str] = []
wda_started = False
mount_done = pre_mount_first
try:
assert proc.stdout is not None
for line in proc.stdout:
captured.append(line)
print(f"[tunneld] {line}", end="")
# 捕获虚拟网卡名
for m in self.IFACE_RE.finditer(line):
iface_names.add(m.group(1))
if proc.poll() is not None:
break
# 捕获 HTTP 网关端口
if http_port is None:
m = self.HTTP_RE.search(line)
if m:
http_host, http_port = m.group(1), m.group(2)
print(f"[tunneld] Tunnel API: {http_host}:{http_port}")
# 捕获 RSD仅识别当前 UDID 的行)
if not self._line_is_for_udid(line, udid):
continue
m = self.RSD_CREATED_RE.search(line) or self.RSD_FALLBACK_RE.search(line)
if m and not rsd_host and not rsd_port:
rsd_host, rsd_port = m.group(1), m.group(2)
print(f"[tunneld] Device-level tunnel ready (RSD {rsd_host}:{rsd_port}).")
# 启动 WDA
if (not wda_started) and wda_bundle_id and (rsd_host and rsd_port):
if not mount_done:
self._auto_mount_developer_disk(udid, retries=mount_retries, backoff_seconds=backoff_seconds)
_t.sleep(2)
mount_done = True
if self._wait_for_rsd_ready(rsd_host, rsd_port, retries=rsd_probe_retries, delay=rsd_probe_delay_sec):
self._launch_wda_via_rsd(bundle_id=wda_bundle_id, rsd_host=rsd_host, rsd_port=rsd_port, udid=udid)
wda_started = True
elif http_host and http_port:
self._launch_wda_via_http_tunnel(bundle_id=wda_bundle_id, http_host=http_host, http_port=http_port, udid=udid)
wda_started = True
else:
raise RuntimeError("No valid tunnel endpoint for WDA.")
# 超时保护
if (not wda_started) and ready_timeout_sec > 0 and (_t.time() - start_ts > ready_timeout_sec):
print(f"[tunneld] Timeout waiting for device tunnel ({ready_timeout_sec}s). Aborting.")
break
# 结束/收尾
out = "".join(captured)
finally:
if not keep_tunnel:
# 一次性模式WDA 已启动后就关闭隧道并清理网卡
self.stop_tunnel(udid, broad_cleanup=broad_cleanup_on_exit)
print("[activate] Done.")
return out
# =============== 外部可显式调用的清理 ===============
def stop_tunnel(self, udid: str, broad_cleanup: bool = True):
"""关闭某 UDID 的 tunneld并清理已知/残留的 pmd3 虚拟网卡。"""
proc = self._live_procs.pop(udid, None)
ifaces = self._live_ifaces.pop(udid, set())
# 1) 结束进程
if proc:
try:
proc.terminate()
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
try:
proc.kill()
proc.wait(timeout=3)
except Exception:
pass
# 兜底:杀掉本进程树内可能残留的 tunneld
self._kill_stray_tunneld_children()
# 2) 清理网卡
try:
if os.name == "nt":
# 先按已知名精确删除
for name in sorted(ifaces):
self._win_remove_adapter(name)
# 宽匹配兜底
if broad_cleanup:
self._win_remove_all_pmd3_adapters()
else:
# *nix 基本不需要手动删,若有需要可在此处添加 ip link delete 等
pass
except Exception as e:
print(f"[cleanup] adapter cleanup error: {e}")
# =============== 内部:启动 tunneld ===============
def _start_tunneld(self, udid: str) -> Tuple[subprocess.Popen, int]:
port = self._pick_available_port()
launcher, env2 = self._resolve_pmd3_argv_and_env()
env2["PYTHONUNBUFFERED"] = "1"
env2.setdefault("PYTHONIOENCODING", "utf-8")
env2["PYMOBILEDEVICE3_UDID"] = udid
cmd = self._ensure_str_list([*launcher, "remote", "tunneld", "--port", str(port)])
print("[activate] 启动隧道:", " ".join(cmd))
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
universal_newlines=True,
env=env2,
**self._win_hidden_popen_kwargs()
)
return proc, port
# =============== 退出/信号回收 ===============
def _ensure_exit_hooks(self, broad_cleanup_on_exit: bool):
if self._registered:
return
self._registered = True
def _on_exit():
# 逐个关闭存活隧道
for udid in list(self._live_procs.keys()):
try:
self.stop_tunnel(udid, broad_cleanup=broad_cleanup_on_exit)
except Exception:
pass
atexit.register(_on_exit)
def _signal_handler(signum, frame):
_on_exit()
# 默认行为:再次发出信号退出
try:
signal.signal(signum, signal.SIG_DFL)
except Exception:
pass
os.kill(os.getpid(), signum)
for sig in (signal.SIGINT, signal.SIGTERM):
try:
signal.signal(sig, _signal_handler)
except Exception:
pass # 某些环境不允许设置
# =============== Windows 虚拟网卡清理 ===============
def _win_remove_adapter(self, name: str):
"""按名删除一个虚拟网卡。"""
print(f"[cleanup] remove adapter: {name}")
# 先尝试 PowerShell需要管理员
ps = [
"powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command",
f"$a=Get-NetAdapter -Name '{name}' -ErrorAction SilentlyContinue; "
f"if($a){{ Disable-NetAdapter -Name '{name}' -Confirm:$false -PassThru | Out-Null; "
f"Remove-NetAdapter -Name '{name}' -Confirm:$false -ErrorAction SilentlyContinue; }}"
]
try:
subprocess.run(ps, check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **self._win_hidden_popen_kwargs())
return
except Exception:
pass
# 兜底netsh 禁用
try:
subprocess.run(
["netsh", "interface", "set", "interface", name, "disable"],
check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **self._win_hidden_popen_kwargs()
)
except Exception:
pass
def _win_remove_all_pmd3_adapters(self):
"""宽匹配删除所有 pymobiledevice3-tunnel-* 网卡(防残留)。"""
print("[cleanup] sweeping all pymobiledevice3-tunnel-* adapters")
ps = [
"powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command",
r"$nics=Get-NetAdapter -Name 'pymobiledevice3-tunnel-*' -ErrorAction SilentlyContinue; "
r"foreach($n in $nics){ Disable-NetAdapter -Name $n.Name -Confirm:$false -PassThru | Out-Null; "
r"Remove-NetAdapter -Name $n.Name -Confirm:$false -ErrorAction SilentlyContinue; }"
]
try:
subprocess.run(ps, check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **self._win_hidden_popen_kwargs())
except Exception:
pass
def _kill_stray_tunneld_children(self):
"""在当前进程空间内尽量清理残留的 tunneld 子进程。"""
import psutil
try:
me = psutil.Process(os.getpid())
for ch in me.children(recursive=True):
try:
cmd = " ".join(ch.cmdline()).lower()
except Exception:
cmd = ""
if "pymobiledevice3" in cmd and "remote" in cmd and "tunneld" in cmd:
try:
ch.terminate()
ch.wait(2)
except Exception:
try:
ch.kill()
except Exception:
pass
except Exception:
pass
# =============== 其它工具 & 你原有的方法(未改动核心逻辑) ===============
def _pmd3_run(self, args: List[str], udid: str, extra_env: Optional[dict] = None) -> str:
launcher, env = self._resolve_pmd3_argv_and_env()
env["PYMOBILEDEVICE3_UDID"] = udid
env["PYTHONUNBUFFERED"] = "1"
@@ -50,92 +325,61 @@ class IOSActivator:
except subprocess.CalledProcessError as exc:
raise RuntimeError(exc.output or f"pymobiledevice3 执行失败,退出码 {exc.returncode}")
def _pmd3_run_subprocess(self, full_args: list[str], extra_env: Optional[dict] = None) -> str:
"""
兜底:通过子进程执行 pymobiledevice3使用 _resolve_pmd3_argv_and_env())。
"""
import subprocess
def _ensure_str_list(self, seq):
return [str(x) for x in seq]
launcher, env = self._resolve_pmd3_argv_and_env()
if extra_env:
for k, v in extra_env.items():
if v is None:
env.pop(k, None)
else:
env[k] = str(v)
def _win_hidden_popen_kwargs(self):
if os.name != "nt":
return {}
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
si.wShowWindow = 0 # SW_HIDE
return {
"startupinfo": si,
"creationflags": getattr(subprocess, "CREATE_NO_WINDOW", 0x08000000),
}
cmd = [*launcher, *full_args]
cmd = self._ensure_str_list(cmd)
print("[pmd3-subproc]", " ".join(cmd))
try:
out = subprocess.check_output(
cmd,
text=True,
stderr=subprocess.STDOUT,
env=env,
**self._win_hidden_popen_kwargs()
)
return out or ""
except subprocess.CalledProcessError as exc:
raise RuntimeError(exc.output or f"pymobiledevice3 子进程执行失败,代码 {exc.returncode}")
# =========================================================================
# 旧版环境依赖的替代方案:仅用于启动 tunneld 的子进程(需要常驻)
# =========================================================================
def _resolve_pmd3_argv_and_env(self):
import os, sys, shutil, subprocess
import shutil, subprocess
from pathlib import Path
env = os.environ.copy()
env["PYTHONUNBUFFERED"] = "1"
env.setdefault("PYTHONIOENCODING", "utf-8")
# 1) 明确使用 IOSAI_PYTHON如果上层已设置
prefer_py = env.get("IOSAI_PYTHON")
# 2) 构造一批候选路径(先根目录,再 Scripts
base = Path(sys.argv[0]).resolve()
base_dir = base.parent if base.is_file() else base
py_name = "python.exe" if os.name == "nt" else "python"
sidecar_candidates = [
base_dir / "python-rt" / py_name, # ✅ 嵌入式/你现在的摆放方式
base_dir / "python-rt" / "Scripts" / py_name, # 兼容虚拟环境式
base_dir / "python-rt" / py_name,
base_dir / "python-rt" / "Scripts" / py_name,
base_dir.parent / "python-rt" / py_name,
base_dir.parent / "python-rt" / "Scripts" / py_name,
]
# 若外层已通过 IOSAI_PYTHON 指了路径,也放进候选
if prefer_py:
sidecar_candidates.insert(0, Path(prefer_py))
# 打印探测
for cand in sidecar_candidates:
print(f"[IOSAI] 🔎 probing sidecar at: {cand}")
if cand.is_file():
try:
out = subprocess.check_output(
[str(cand), "-c", "import pymobiledevice3;print('ok')"],
text=True,
stderr=subprocess.STDOUT,
env=env,
timeout=6,
**self._win_hidden_popen_kwargs()
text=True, stderr=subprocess.STDOUT, env=env, timeout=6, **self._win_hidden_popen_kwargs()
)
if "ok" in out:
print(f"[IOSAI] ✅ sidecar selected: {cand}")
return ([str(cand), "-u", "-m", "pymobiledevice3"], env)
except Exception:
# 候选解释器存在但没装 pmd3就继续找下一个
pass
# 3) 回退:系统 PATH 里直接找 pymobiledevice3 可执行
exe = shutil.which("pymobiledevice3")
if exe:
print(f"[IOSAI] ✅ use PATH executable: {exe}")
return ([exe], env)
# 4) 兜底:从系统 Python 里找装了 pmd3 的解释器
py_candidates = []
base_exec = getattr(sys, "_base_executable", None)
if base_exec and os.path.isfile(base_exec):
@@ -150,11 +394,7 @@ class IOSActivator:
try:
out = subprocess.check_output(
[py, "-c", "import pymobiledevice3;print('ok')"],
text=True,
stderr=subprocess.STDOUT,
env=env,
timeout=6,
**self._win_hidden_popen_kwargs()
text=True, stderr=subprocess.STDOUT, env=env, timeout=6, **self._win_hidden_popen_kwargs()
)
if "ok" in out:
print(f"[IOSAI] ✅ system python selected: {py}")
@@ -164,31 +404,10 @@ class IOSActivator:
raise RuntimeError("未检测到可用的 pymobiledevice3建议携带 python-rt 或安装系统 Python+pmd3")
def _ensure_str_list(self, seq):
return [str(x) for x in seq]
def _win_hidden_popen_kwargs(self):
"""在 Windows 上隐藏子进程窗口;非 Windows 返回空参数。"""
if os.name != "nt":
return {}
import subprocess as _sp
si = _sp.STARTUPINFO()
si.dwFlags |= _sp.STARTF_USESHOWWINDOW
si.wShowWindow = 0 # SW_HIDE
return {
"startupinfo": si,
"creationflags": getattr(_sp, "CREATE_NO_WINDOW", 0x08000000),
}
# =========================================================================
# 功能函数-
# =========================================================================
# -------- DDI / RSD / 启动 WDA (与你原逻辑一致) --------
def _auto_mount_developer_disk(self, udid: str, retries: int = 3, backoff_seconds: float = 2.0) -> None:
"""
使用进程内 CLIpymobiledevice3 mounter auto-mount带重试
"""
import time
last_err_text = ""
import time as _t
last_err = ""
for i in range(max(1, retries)):
try:
out = self._pmd3_run(["mounter", "auto-mount"], udid)
@@ -201,24 +420,17 @@ class IOSActivator:
print("[mounter] Developer disk image mounted.")
return
except Exception as e:
last_err_text = str(e)
last_err = str(e)
if i < retries - 1:
print(f"[mounter] attempt {i+1}/{retries} failed, retrying in {backoff_seconds}s ...")
try:
time.sleep(backoff_seconds)
except Exception:
pass
_t.sleep(backoff_seconds)
else:
raise RuntimeError(f"Auto-mount failed after {retries} attempts.\n{last_err_text}")
raise RuntimeError(f"Auto-mount failed after {retries} attempts.\n{last_err}")
def _is_ipv4_host(self, host: str) -> bool:
import re as _re
return bool(_re.match(r"^\d{1,3}(\.\d{1,3}){3}$", host))
return bool(re.match(r"^\d{1,3}(\.\d{1,3}){3}$", host))
def _wait_for_rsd_ready(self, rsd_host: str, rsd_port: str, retries: int = 5, delay: float = 3.0) -> bool:
"""
探测 RSD 通道是否就绪:直接尝试 TCP 连接。
"""
port_int = int(rsd_port)
for i in range(1, retries + 1):
print(f"[rsd] Probing RSD {rsd_host}:{rsd_port} (attempt {i}/{retries}) ...")
@@ -234,42 +446,27 @@ class IOSActivator:
return False
def _launch_wda_via_rsd(self, bundle_id: str, rsd_host: str, rsd_port: str, udid: str) -> None:
"""
使用进程内 CLIpymobiledevice3 developer dvt launch <bundle> --rsd <host> <port>
"""
print(f"[wda] Launch via RSD {rsd_host}:{rsd_port}, bundle: {bundle_id}")
out = self._pmd3_run(
["developer", "dvt", "launch", bundle_id, "--rsd", rsd_host, rsd_port],
udid=udid,
)
out = self._pmd3_run(["developer", "dvt", "launch", bundle_id, "--rsd", rsd_host, rsd_port], udid=udid)
if out:
for line in out.splitlines():
print(f"[wda] {line}")
print("[wda] Launch via RSD completed.")
def _launch_wda_via_http_tunnel(self, bundle_id: str, http_host: str, http_port: str, udid: str) -> None:
"""
退路:通过 HTTP 网关端口设置 PYMOBILEDEVICE3_TUNNEL仅 IPv4
使用进程内 CLI 启动 WDA。
"""
if not self._is_ipv4_host(http_host):
raise RuntimeError(f"HTTP tunnel host must be IPv4, got {http_host}")
tunnel_endpoint = f"{http_host}:{http_port}"
print(f"[wda] Launch via HTTP tunnel {tunnel_endpoint}, bundle: {bundle_id}")
out = self._pmd3_run(
["developer", "dvt", "launch", bundle_id],
udid=udid,
extra_env={"PYMOBILEDEVICE3_TUNNEL": tunnel_endpoint},
)
out = self._pmd3_run(["developer", "dvt", "launch", bundle_id], udid=udid,
extra_env={"PYMOBILEDEVICE3_TUNNEL": tunnel_endpoint})
if out:
for line in out.splitlines():
print(f"[wda] {line}")
print("[wda] Launch via HTTP tunnel completed.")
# -------- 端口挑选 --------
def _pick_available_port(self, base=49151, step=10) -> int:
"""挑选一个可用端口(避免重复)"""
for i in range(0, 1000, step):
port = base + i
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
@@ -277,197 +474,9 @@ class IOSActivator:
return port
raise RuntimeError("No free port found for tunneld")
# =========================================================================
# 对外方法
# =========================================================================
def activate(
self,
udid: str,
wda_bundle_id: Optional[str] = WdaAppBundleId,
ready_timeout_sec: float = 120.0,
mount_retries: int = 3,
backoff_seconds: float = 2.0,
rsd_probe_retries: int = 5,
rsd_probe_delay_sec: float = 3.0,
pre_mount_first: bool = True,
) -> str:
"""
执行:挂镜像(可选) -> 开隧道 -> (等待 RSD 就绪)-> 启动 WDA
- 优先用 `--rsd` 启动
- 失败再用 HTTP 网关作为退路
"""
if not udid or not isinstance(udid, str):
raise ValueError("udid is required and must be a non-empty string")
print(f"[activate] UDID = {udid}")
# ⚠️ 检查管理员权限
if os.name == "nt":
import ctypes
try:
is_admin = ctypes.windll.shell32.IsUserAnAdmin() != 0
except Exception:
is_admin = False
if not is_admin:
print("[⚠️] 当前进程未以管理员身份运行tunneld 可能无法创建 USB 隧道。")
import time as _t
start_ts = _t.time()
# 1) (可选) 先挂载,避免 tidevice 并发 DDI 竞态
if pre_mount_first:
try:
self._auto_mount_developer_disk(udid, retries=mount_retries, backoff_seconds=backoff_seconds)
_t.sleep(2) # 稳定 DDI
except Exception as e:
print(f"[activate] 预挂载失败(继续尝试开隧道后再挂载一次):{e}")
# 2) 启动 tunneld 子进程
port = self._pick_available_port()
launcher, env2 = self._resolve_pmd3_argv_and_env()
env2["PYTHONUNBUFFERED"] = "1"
env2.setdefault("PYTHONIOENCODING", "utf-8")
env2["PYMOBILEDEVICE3_UDID"] = udid
cmd = self._ensure_str_list([*launcher, "remote", "tunneld", "--port", str(port)])
print("[activate] 使用命令启动隧道:", " ".join(cmd))
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
universal_newlines=True,
env=env2,
**self._win_hidden_popen_kwargs()
)
captured: list[str] = []
http_host: Optional[str] = None
http_port: Optional[str] = None
rsd_host: Optional[str] = None
rsd_port: Optional[str] = None
device_tunnel_ready = False
wda_started = False
mount_done = pre_mount_first
HTTP_RE = re.compile(r"http://([0-9.]+):(\d+)")
RSD_CREATED_RE = re.compile(r"Created tunnel\s+--rsd\s+([^\s]+)\s+(\d+)")
RSD_FALLBACK_RE = re.compile(r"--rsd\s+(\S+?)[\s:](\d+)")
try:
assert proc.stdout is not None
for line in proc.stdout:
captured.append(line)
print(f"[tunneld] {line}", end="")
if proc.poll() is not None:
break
# 捕获 HTTP 网关端口
if http_port is None:
m = HTTP_RE.search(line)
if m:
http_host, http_port = m.group(1), m.group(2)
print(f"[tunneld] Tunnel API: {http_host}:{http_port}")
# ✅ 捕获 RSD仅识别当前 UDID 的行)
if self._line_is_for_udid(line, udid):
m = RSD_CREATED_RE.search(line) or RSD_FALLBACK_RE.search(line)
if m and not device_tunnel_ready:
rsd_host, rsd_port = m.group(1), m.group(2)
device_tunnel_ready = True
print(f"[tunneld] Device-level tunnel ready (RSD {rsd_host}:{rsd_port}).")
else:
continue # 其它设备的隧道日志忽略
# 当设备隧道准备好后启动 WDA
if (not wda_started) and wda_bundle_id and device_tunnel_ready:
try:
if not mount_done:
self._auto_mount_developer_disk(
udid, retries=mount_retries, backoff_seconds=backoff_seconds
)
_t.sleep(2)
mount_done = True
rsd_ok = False
if rsd_host and rsd_port:
rsd_ok = self._wait_for_rsd_ready(
rsd_host, rsd_port,
retries=rsd_probe_retries,
delay=rsd_probe_delay_sec,
)
if rsd_ok:
self._launch_wda_via_rsd(
bundle_id=wda_bundle_id,
rsd_host=rsd_host,
rsd_port=rsd_port,
udid=udid,
)
else:
if http_host and http_port:
self._launch_wda_via_http_tunnel(
bundle_id=wda_bundle_id,
http_host=http_host,
http_port=http_port,
udid=udid,
)
else:
raise RuntimeError("No valid tunnel endpoint for fallback.")
wda_started = True
except RuntimeError as exc:
print(str(exc), file=sys.stderr)
proc.terminate()
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
raise
# 超时保护
if (not wda_started) and ready_timeout_sec > 0 and (_t.time() - start_ts > ready_timeout_sec):
print(f"[tunneld] Timeout waiting for device tunnel ({ready_timeout_sec}s). Aborting.")
proc.terminate()
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
break
# 结束清理
try:
return_code = proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
return_code = proc.returncode or -9
output = "".join(captured)
if return_code != 0 and not wda_started:
raise RuntimeError(f"tunneld exited with code {return_code}.\n{output}")
print("[activate] Done.")
return output
except KeyboardInterrupt:
print("\n[activate] Interrupted, cleaning up ...", file=sys.stderr)
try:
proc.terminate()
proc.wait(timeout=5)
except Exception:
try:
proc.kill()
except Exception:
pass
raise
# -------- UDID 过滤 --------
def _line_is_for_udid(self, line: str, udid: str) -> bool:
"""日志行是否属于目标 UDID。"""
try:
return udid.lower() in (line or "").lower()
except Exception:
return False
return False