修复bug

This commit is contained in:
2025-10-28 15:09:36 +08:00
parent 2254284625
commit 2ad53eb1db
7 changed files with 140 additions and 100 deletions

View File

@@ -204,11 +204,10 @@ class DeviceInfo:
if major > 17: if major > 17:
print("进入iOS17设备的分支") print("进入iOS17设备的分支")
print(f"[WDA] iOS>17 调用 IOSActivator (port={wdaScreenPort})") print(f"[WDA] iOS>17 调用 IOSActivator (port={wdaScreenPort})")
try: print("准备启动隧道")
IOSActivator().activate(udid) out = IOSActivator().activate(udid)
print("wda启动完成") print("------------------",out)
except Exception as e: print("wda启动完成")
print("错误信息:",e)
else: else:
print(f"[WDA] iOS<=17 启动 WDA app_start (port={wdaScreenPort})") print(f"[WDA] iOS<=17 启动 WDA app_start (port={wdaScreenPort})")
dev = tidevice.Device(udid) dev = tidevice.Device(udid)

View File

@@ -9,6 +9,7 @@ import subprocess
from typing import Optional, List, Tuple, Dict, Set from typing import Optional, List, Tuple, Dict, Set
from Entity.Variables import WdaAppBundleId from Entity.Variables import WdaAppBundleId
import time as _t
class IOSActivator: class IOSActivator:
@@ -32,139 +33,178 @@ class IOSActivator:
self._live_ifaces: Dict[str, Set[str]] = {} # udid -> {iface names} self._live_ifaces: Dict[str, Set[str]] = {} # udid -> {iface names}
self._registered = False self._registered = False
# =============== 公共入口 =============== # =============== 公共入口 ===============
def activate( def activate(
self, self,
udid: str, udid: str,
wda_bundle_id: str = WdaAppBundleId, wda_bundle_id: Optional[str] = WdaAppBundleId,
ready_timeout_sec: float = 60.0, ready_timeout_sec: float = 120.0,
mount_retries: int = 3,
backoff_seconds: float = 2.0,
rsd_probe_retries: int = 5,
rsd_probe_delay_sec: float = 3.0,
pre_mount_first: bool = True, pre_mount_first: bool = True,
mount_retries: int = 2, keep_tunnel: bool = False, # 默认 FalseWDA 拉起后关闭隧道
backoff_seconds: float = 1.5, broad_cleanup_on_exit: bool = True, # 退出时顺带清理所有 pmd3 残留网卡
keep_tunnel: bool = False,
broad_cleanup_on_exit: bool = True,
) -> str: ) -> str:
""" """
Windows 简版:不读任何 tunneld 日志,也不做 RSD 解析。 流程:挂镜像(可选) -> 开隧道 -> 等 RSD -> 启动 WDA
逻辑:先探活 -> 开隧道 -> 直接用 HTTP 隧道端口反复尝试启动 WDA -> 探活成功即返回。 - keep_tunnel=FalseWDA 启动后关闭隧道并清理
- keep_tunnel=True隧道常驻由上层/atexit 清理
""" """
import time, ctypes, traceback
if not udid or not isinstance(udid, str): if not udid or not isinstance(udid, str):
raise ValueError("udid is required and must be a non-empty string") raise ValueError("udid is required and must be a non-empty string")
print(f"[activate] UDID={udid}", flush=True) print(f"[activate] UDID = {udid}")
self._ensure_exit_hooks(broad_cleanup_on_exit=broad_cleanup_on_exit)
# —— 管理员提示Windows 清理虚拟网卡常用)—— # Windows 管理员检测
try: if os.name == "nt":
if ctypes.windll.shell32.IsUserAnAdmin() == 0: import ctypes
print("[⚠] 未以管理员运行:若需要移除虚拟网卡,可能失败。", flush=True)
except Exception:
pass
# —— 退出钩子(可选)——
try:
self._ensure_exit_hooks(broad_cleanup_on_exit=broad_cleanup_on_exit) # type: ignore[attr-defined]
except Exception as e:
print(f"[activate] _ensure_exit_hooks warn: {e}", flush=True)
# —— 小工具:探活 WDA —— #
def _wda_alive(timeout: float = 2.0) -> bool:
try: try:
if hasattr(self, "_wda_alive_now"): is_admin = ctypes.windll.shell32.IsUserAnAdmin() != 0
return bool(self._wda_alive_now(udid, timeout=timeout)) # type: ignore[attr-defined]
if hasattr(self, "_wda_client"):
cli = self._wda_client(udid) # type: ignore[attr-defined]
if hasattr(cli, "wait_ready"):
return bool(cli.wait_ready(timeout=timeout))
except Exception: except Exception:
return False is_admin = False
return False if not is_admin:
print("[⚠] 未以管理员运行:若需要移除虚拟网卡,可能失败。")
# 0) 快路径WDA 已活 start_ts = _t.time()
if _wda_alive(2.0):
print("[activate] WDA already alive, skip launching.", flush=True)
return "WDA already alive"
# 1) 预挂载(失败不致命) # 1) 预挂载(失败不致命)
if pre_mount_first and hasattr(self, "_auto_mount_developer_disk"): if pre_mount_first:
try: try:
self._auto_mount_developer_disk(udid, retries=mount_retries, self._auto_mount_developer_disk(
backoff_seconds=backoff_seconds) # type: ignore[attr-defined] udid, retries=mount_retries, backoff_seconds=backoff_seconds
time.sleep(1.5) )
_t.sleep(2)
except Exception as e: except Exception as e:
print(f"[activate] 预挂载失败(继续{e}", flush=True) print(f"[activate] 预挂载失败(稍后再试{e}")
# 2) 开隧道(关键:拿到 HTTP 端口即可;不读取任何 stdout/stderr # 2) 启动 tunneld
proc = None http_host: Optional[str] = None
http_host, http_port = "127.0.0.1", None http_port: Optional[str] = None # ⚠️ 端口以 str 存储
try: rsd_host: Optional[str] = None
ret = self._start_tunneld(udid) # type: ignore[attr-defined] rsd_port: Optional[str] = None # ⚠️ 端口以 str 存储
if isinstance(ret, tuple): iface_names: Set[str] = set()
proc, http_port = ret[0], ret[1]
else:
proc = ret
if http_port is None:
# 若你的 _start_tunneld 固定端口,可在这里写死(例如 8100/某自定义端口)
raise RuntimeError("未获取到 HTTP 隧道端口_start_tunneld 未返回端口)")
except Exception:
# 即便开隧道失败,也再探活一次(可能本来就活)
if _wda_alive(2.0):
print("[activate] WDA already alive (tunnel start failed but OK).", flush=True)
return "WDA already alive"
raise
print(f"[tunneld] HTTP tunnel at {http_host}:{http_port}", flush=True) proc, _port_ignored = self._start_tunneld(udid)
self._live_procs[udid] = proc
self._live_ifaces[udid] = iface_names
# 3) 直接用 HTTP 隧道反复尝试启动 WDA + 探活 captured: List[str] = []
deadline = time.time() + (ready_timeout_sec if ready_timeout_sec > 0 else 60.0) out: str = ""
launched = False wda_started = False
mount_done = pre_mount_first
try: try:
while time.time() < deadline: assert proc.stdout is not None
# 已活则成功返回 for line in proc.stdout:
if _wda_alive(1.5): captured.append(line)
print("[activate] WDA detected alive.", flush=True) # 日志长度控制,防止常驻时内存涨太多
launched = True if len(captured) > 20000:
captured = captured[-10000:]
print(f"[tunneld] {line}", end="")
# 捕获虚拟网卡名
for m in self.IFACE_RE.finditer(line):
iface_names.add(m.group(1))
# 子进程若退出则停止读取
if proc.poll() is not None:
break break
# 尝试发起一次 HTTP 启动(失败就下一轮重试 # 捕获 HTTP 网关端口(保持为字符串
try: if http_port is None:
if hasattr(self, "_launch_wda_via_http_tunnel"): m = self.HTTP_RE.search(line)
self._launch_wda_via_http_tunnel( # type: ignore[attr-defined] if m:
http_host = m.group(1)
http_port = m.group(2)
# 简单校验
try:
_ = int(http_port)
except Exception:
print(f"[tunneld] bad http port: {http_port}")
http_host, http_port = None, None
else:
print(f"[tunneld] Tunnel API: {http_host}:{http_port}")
# 只处理当前 UDID 的 RSD 行
if not self._line_is_for_udid(line, udid):
continue
m = self.RSD_CREATED_RE.search(line) or self.RSD_FALLBACK_RE.search(line)
if m and rsd_host is None and rsd_port is None:
rsd_host = m.group(1)
rsd_port = m.group(2)
try:
_ = int(rsd_port) # 仅作数字校验
except Exception:
print(f"[tunneld] bad rsd port: {rsd_port}")
rsd_host, rsd_port = None, None
else:
print(f"[tunneld] Device-level tunnel ready (RSD {rsd_host}:{rsd_port}).")
# ========= 尝试启动 WDA =========
if (not wda_started) and wda_bundle_id and (rsd_host is not None) and (rsd_port is not None):
if not mount_done:
self._auto_mount_developer_disk(
udid, retries=mount_retries, backoff_seconds=backoff_seconds
)
_t.sleep(2)
mount_done = True
# RSD 优先;探测时临时转 int启动命令仍传 str 端口
rsd_port_int = int(rsd_port)
if self._wait_for_rsd_ready(
rsd_host, rsd_port_int, retries=rsd_probe_retries, delay=rsd_probe_delay_sec
):
# 这里的实现通常会拼 subprocess 命令行,故端口保持 str
self._launch_wda_via_rsd(
bundle_id=wda_bundle_id, bundle_id=wda_bundle_id,
http_host=http_host, rsd_host=rsd_host,
http_port=str(http_port), rsd_port=rsd_port, # ⚠️ 传入 str避免 subprocess 报错
udid=udid, udid=udid,
) )
except Exception as e: wda_started = True
# 仅打印,不中断;下一次循环再试 elif (http_host is not None) and (http_port is not None):
print(f"[activate] _launch_wda_via_http_tunnel error: {e}", flush=True) self._launch_wda_via_http_tunnel(
bundle_id=wda_bundle_id,
http_host=http_host,
http_port=http_port, # ⚠️ 传入 str
udid=udid,
)
wda_started = True
else:
raise RuntimeError("No valid tunnel endpoint for WDA.")
# 启动后给一点时间让 WDA ready # ✅ WDA 已启动;默认一次性模式直接退出读取循环
for _ in range(3): if wda_started and not keep_tunnel:
if _wda_alive(1.0): _t.sleep(0.5) # 给隧道多刷几行
launched = True print("[activate] WDA launched; exiting reader loop (keep_tunnel=False).")
break break
time.sleep(0.5)
if launched: # 超时保护(仅在 WDA 尚未启动时生效)
if (not wda_started) and ready_timeout_sec > 0 and (_t.time() - start_ts > ready_timeout_sec):
print(f"[tunneld] Timeout waiting for device tunnel ({ready_timeout_sec}s). Aborting.")
break break
time.sleep(1.0) # 下一轮重试 print("[activate] 启动 WDA 读取阶段结束")
out = "".join(captured)
if not launched:
raise RuntimeError(f"WDA not ready within {ready_timeout_sec}s via HTTP tunnel")
print("[activate] Done.", flush=True)
return f"http://{http_host}:{http_port}"
except Exception as e:
print(f"[activate] 发生异常:{e}")
raise
finally: finally:
if not keep_tunnel: if not keep_tunnel:
try: try:
self.stop_tunnel(udid, broad_cleanup=broad_cleanup_on_exit) # type: ignore[attr-defined] self.stop_tunnel(udid, broad_cleanup=broad_cleanup_on_exit)
except Exception as e: except Exception as ce:
print(f"[activate] stop_tunnel warn: {e}", flush=True) print(f"[activate] stop_tunnel 清理异常:{ce}")
print("[activate] Done.")
return out
# =============== 外部可显式调用的清理 =============== # =============== 外部可显式调用的清理 ===============

View File

@@ -556,6 +556,7 @@ class ScriptManager():
return return
else: else:
print(f"找不到输入框") print(f"找不到输入框")
raise Exception("找不到输入框")
input = session.xpath('//XCUIElementTypeSearchField') input = session.xpath('//XCUIElementTypeSearchField')
if input.exists: if input.exists: