diff --git a/Module/DeviceInfo.py b/Module/DeviceInfo.py index 699c641..97ebf9b 100644 --- a/Module/DeviceInfo.py +++ b/Module/DeviceInfo.py @@ -1,20 +1,15 @@ - -import http.client import json import os import socket -import subprocess -import sys import threading import time -from concurrent.futures import ThreadPoolExecutor -from pathlib import Path +import subprocess from typing import Dict, Optional -import psutil + import tidevice -import wda +# import wda # 目前先不用 wda 获取屏幕,避免触发 tidevice 的那套 WDA 启动 from tidevice import Usbmux, ConnectionType -from tidevice._device import BaseDevice + from Entity.DeviceModel import DeviceModel from Entity.Variables import WdaAppBundleId, wdaScreenPort, wdaFunctionPort from Module.FlaskSubprocessManager import FlaskSubprocessManager @@ -22,795 +17,298 @@ from Module.IOSActivator import IOSActivator from Utils.LogManager import LogManager -def _monotonic() -> float: - return time.monotonic() - -def _is_port_free(port: int, host: str = "127.0.0.1") -> bool: - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - try: - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - s.bind((host, port)) - return True - except OSError: - return False - finally: - s.close() - -def _pick_free_port(low: int = 20000, high: int = 48000) -> int: - """全局兜底的端口选择:先随机后顺扫,避免固定起点导致碰撞。支持通过环境变量覆盖范围: - PORT_RANGE_LOW / PORT_RANGE_HIGH - """ - try: - low = int(os.getenv("PORT_RANGE_LOW", str(low))) - high = int(os.getenv("PORT_RANGE_HIGH", str(high))) - except Exception: - pass - if high - low < 100: - high = low + 100 - import random - # 随机尝试 64 次 - tried = set() - for _ in range(64): - p = random.randint(low, high) - if p in tried: - continue - tried.add(p) - if _is_port_free(p): - return p - # 顺序兜底 - for p in range(low, high): - if p in tried: - continue - if _is_port_free(p): - return p - raise RuntimeError("未找到可用端口") - - class DeviceInfo: _instance = None _instance_lock = threading.Lock() def __new__(cls, *args, **kwargs): - # 双重检查锁,确保线程安全单例 if not cls._instance: with cls._instance_lock: if not cls._instance: cls._instance = super().__new__(cls) return cls._instance - # ---- 端口分配:加一个最小的“保留池”,避免并发选到同一个端口 ---- - def _alloc_port(self) -> int: - with self._lock: - busy = set(self._port_by_udid.values()) | set(self._reserved_ports) - # 优先随机尝试若干次,减少并发碰撞 - import random - low = int(os.getenv("PORT_RANGE_LOW", "20000")) - high = int(os.getenv("PORT_RANGE_HIGH", "48000")) - for _ in range(128): - p = random.randint(low, high) - with self._lock: - if p not in busy and p not in self._reserved_ports and _is_port_free(p): - self._reserved_ports.add(p) - return p - # 兜底顺序扫描 - for p in range(low, high): - with self._lock: - if p in self._reserved_ports or p in busy: - continue - if _is_port_free(p): - with self._lock: - self._reserved_ports.add(p) - return p - raise RuntimeError("端口分配失败:没有可用端口") - - def _release_port(self, port: int): - with self._lock: - self._reserved_ports.discard(port) - - ADD_STABLE_SEC = float(os.getenv("ADD_STABLE_SEC", "2.0")) REMOVE_GRACE_SEC = float(os.getenv("REMOVE_GRACE_SEC", "6.0")) def __init__(self) -> None: - # 防止多次初始化(因为 __init__ 每次调用 DeviceInfo() 都会执行) if getattr(self, "_initialized", False): return self._lock = threading.RLock() self._models: Dict[str, DeviceModel] = {} - self._iproxy: Dict[str, subprocess.Popen] = {} - self._port_by_udid: Dict[str, int] = {} - self._reserved_ports: set[int] = set() - self._first_seen: Dict[str, float] = {} - self._last_seen: Dict[str, float] = {} self._manager = FlaskSubprocessManager.get_instance() - self._iproxy_path = self._find_iproxy() + self.screenPort = 9110 - # 懒加载线程池属性(供 _add_device 并发使用) - self._add_lock: Optional[threading.RLock] = None - self._adding_udids: Optional[set[str]] = None - self._add_executor: Optional["ThreadPoolExecutor"] = None + # 设备心跳时间 + self._last_seen: Dict[str, float] = {} - # iproxy 连续失败计数(守护用) - self._iproxy_fail_count: Dict[str, int] = {} + # iproxy 子进程:udid -> Popen + self._iproxy_process: Dict[str, subprocess.Popen] = {} + + # Windows 下隐藏子进程窗口(给 iproxy 用) + self._creationflags = 0 + self._startupinfo = None + if os.name == "nt": + try: + self._creationflags = subprocess.CREATE_NO_WINDOW # type: ignore[attr-defined] + except Exception: + self._creationflags = 0 + + si = subprocess.STARTUPINFO() + si.dwFlags |= subprocess.STARTF_USESHOWWINDOW + si.wShowWindow = 0 # SW_HIDE + self._startupinfo = si LogManager.info("DeviceInfo 初始化完成", udid="system") print("[Init] DeviceInfo 初始化完成") + self._initialized = True - # iproxy 守护线程(端口+HTTP探活 → 自愈重启 → 达阈值才移除) - threading.Thread(target=self.check_iproxy_ports, daemon=True).start() - - self._initialized = True # 标记已初始化 - - # =============== 并发添加设备:最小改动(包装 _add_device) =============== - def _ensure_add_executor(self): - """ - 懒加载:首次调用 _add_device 时初始化线程池与去重集合。 - 注意:不要只用 hasattr;属性可能已在 __init__ 里置为 None。 - """ - # 1) 锁 - if getattr(self, "_add_lock", None) is None: - self._add_lock = threading.RLock() - - # 2) 去重集合 - if getattr(self, "_adding_udids", None) is None: - self._adding_udids = set() - - # 3) 线程池 - if getattr(self, "_add_executor", None) is None: - from concurrent.futures import ThreadPoolExecutor - import os - max_workers = 6 - self._add_executor = ThreadPoolExecutor( - max_workers=max_workers, - thread_name_prefix="dev-add" - ) - try: - LogManager.info(f"[Init] Device add executor started, max_workers={max_workers}", udid="system") - except Exception: - pass - - def _safe_add_device(self, udid: str): - """ - 后台执行真正的新增实现(_add_device_impl): - - 任何异常只记日志,不抛出 - - 无论成功与否,都在 finally 里清理“正在添加”标记 - """ - try: - self._add_device_impl(udid) # ← 这是你原来的重逻辑(见下方) - except Exception as e: - try: - LogManager.method_error(f"_add_device_impl 异常:{e}", "_safe_add_device", udid=udid) - except Exception: - pass - finally: - lock = getattr(self, "_add_lock", None) - if lock is None: - # 极端容错,避免再次抛异常 - self._add_lock = lock = threading.RLock() - with lock: - self._adding_udids.discard(udid) - - def _add_device(self, udid: str): - """并发包装器:保持所有调用点不变。""" - self._ensure_add_executor() - - # 保险:即使极端情况下属性仍是 None,也就地补齐一次 - lock = getattr(self, "_add_lock", None) - if lock is None: - self._add_lock = lock = threading.RLock() - adding = getattr(self, "_adding_udids", None) - if adding is None: - self._adding_udids = adding = set() - - # 去重:同一 udid 只提交一次 - with lock: - if udid in adding: - return - adding.add(udid) - - try: - # 注意:submit(fn, udid) —— 这里不是 *args=udid,直接传第二个位置参数即可 - self._add_executor.submit(self._safe_add_device, udid) - except Exception as e: - # 提交失败要把去重标记清掉 - with lock: - adding.discard(udid) - try: - LogManager.method_error(text=f"提交新增任务失败:{e}", method="_add_device", udid=udid) - except Exception: - pass - - def _iproxy_health_ok(self, port: int) -> bool: - try: - conn = http.client.HTTPConnection("127.0.0.1", int(port), timeout=1.5) - conn.request("GET", "/") - resp = conn.getresponse() - - status = getattr(resp, "status", 0) - ctype = resp.getheader("Content-Type", "") or "" - - conn.close() - - ok = (200 <= status < 400) and ("multipart" in ctype.lower()) - - if not ok: - LogManager.error( - f"[iproxy] 健康检查失败: status={status}, ctype={ctype!r}, port={port}" - ) - return ok - - except Exception as e: - LogManager.error(f"[iproxy] 健康检查异常 port={port}: {e}") - return False - - - def _restart_iproxy(self, udid: str, port: int) -> bool: - """干净重启 iproxy:先杀旧的,再启动新的,并等待监听。""" - print(f"[iproxy-guard] 准备重启 iproxy {udid} on {port}") - proc = None - with self._lock: - old = self._iproxy.get(udid) - try: - if old: - self._kill(old) - except Exception as e: - print(f"[iproxy-guard] 杀旧进程异常 {udid}: {e}") - - # 重新拉起 - try: - proc = self._start_iproxy(udid, local_port=port) - except Exception as e: - print(f"[iproxy-guard] 重启失败 {udid}: {e}") - proc = None - - if not proc: - return False - - # 写回进程表 - with self._lock: - self._iproxy[udid] = proc - - print(f"[iproxy-guard] 重启成功 {udid} port={port}") - return True - - def _restart_wda(self, udid: str) -> bool: - """ - 重启指定设备上的 WDA(用于已在系统中“在线”的设备): - - 假定该设备已经完成过信任/配对,不再重复配对 - - iOS 17+:直接调用 IOSActivator().activate(udid) - - iOS <=16:走 tidevice.app_start(WdaAppBundleId) - 如果当前已存在映射端口,则在该端口上等待 WDA /status 就绪。 - """ - print(f"[WDA-guard] 尝试重启 WDA: {udid}") - - try: - dev = tidevice.Device(udid) - try: - major = int(dev.product_version.split(".")[0]) - except Exception: - major = 0 - print(f"[WDA-guard] 设备 {udid} iOS 主版本号 = {major}") - - if major >= 17: - # -------- iOS 17+:不再重复配对,直接激活 -------- - print(f"[WDA-guard] iOS17+ 设备,直接通过 IOSActivator 激活 {udid}") - try: - IOSActivator().activate(udid) - print(f"[WDA-guard] iOS17+ 通过 IOSActivator 激活完成 {udid}") - except Exception as e: - print(f"[WDA-guard] iOS17+ 激活 WDA 异常: {e}") - return False - - else: - # -------- iOS 16 及以下:直接 app_start WDA -------- - print(f"[WDA-guard] iOS<=16 设备,准备通过 tidevice.app_start 启动 WDA {udid}") - # app_stop 失败不致命,做一下容错 - try: - dev.app_stop(WdaAppBundleId) - except Exception as e: - print(f"[WDA-guard] app_stop 异常(忽略):{e}") - try: - dev.app_start(WdaAppBundleId) - print(f"[WDA-guard] app_start 已调用 {udid}") - except Exception as e: - print(f"[WDA-guard] app_start 异常: {e}") - return False - - # -------- 如果这台设备已经有固定的 screenPort,就在该端口上等 WDA Ready -------- - port = None - with self._lock: - port = self._port_by_udid.get(udid) - - if port: - print(f"[WDA-guard] 已有现成端口 {port},等待 WDA 在该端口就绪 {udid}") - ok = self._wait_wda_ready_on_port( - udid, - local_port=wdaFunctionPort, - ) - if not ok: - print(f"[WDA-guard] WDA 在端口 {port} 未在超时内就绪 {udid}") - return False - else: - print(f"[WDA-guard] 当前无已记录端口(_port_by_udid 无 {udid}),仅完成 WDA 启动,不做就绪检测") - - print(f"[WDA-guard] WDA 重启完成 {udid}") - return True - - except Exception as e: - print(f"[WDA-guard] 重启 WDA 总体异常: {e}") - return False - - # =============== 一轮检查:先自愈,仍失败才考虑移除 ================= - def check_iproxy_ports(self): - - print("[Guard] iproxy+WDA 守护线程启动") - - while True: - try: - with self._lock: - udids = list(self._models.keys()) - - for udid in udids: - with self._lock: - port = self._port_by_udid.get(udid) - fail = self._iproxy_fail_count.get(udid, 0) - - if not port: - continue - - # ==== 第一层:视频流健康检查 ==== - ok_stream = self._iproxy_health_ok(port) - - if ok_stream: - # 成功 → 清零失败计数 - with self._lock: - self._iproxy_fail_count[udid] = 0 - continue - - # ------ 以下为失败处理 ------ - fail += 1 - with self._lock: - self._iproxy_fail_count[udid] = fail - - print(f"[Guard] 第 {fail} 次失败 udid={udid}, port={port}") - - # ==== 第 1~2 次失败 → 优先重启 iproxy ==== - if fail in (1, 2): - print(f"[Guard] 尝试重启 iproxy(第 {fail} 次){udid}") - if self._restart_iproxy(udid, port): - time.sleep(1.5) - continue - - # ==== 第 3~4 次失败 → 检查 WDA 状态 ==== - if fail in (3, 4): - print(f"[Guard] 检查 WDA 状态 {udid}") - wda_ok = self._wait_wda_ready_on_port(udid, wdaFunctionPort) - - if not wda_ok: - print(f"[Guard] WDA 异常 → 尝试重启 WDA(第 {fail} 次){udid}") - if self._restart_wda(udid): - time.sleep(1.2) - continue - else: - print(f"[Guard] WDA 正常,但视频流挂了 → 再重启 iproxy") - if self._restart_iproxy(udid, port): - time.sleep(1.5) - continue - - # ==== 第 5 次失败 → 移除设备 ==== - if fail >= 5: - print(f"[Guard] 连续 5 次失败,移除设备 {udid}") - try: - self._remove_device(udid) - except Exception as e: - print(f"[Guard] 移除异常: {e}") - continue - - except Exception as e: - print(f"[Guard] 守护线程异常: {e}") - - time.sleep(2.0) - - + # --------------------------- + # 主循环 + # --------------------------- def listen(self): LogManager.method_info("进入主循环", "listen", udid="system") print("[Listen] 开始监听设备上下线...") + while True: try: usb = Usbmux().device_list() online = {d.udid for d in usb if d.conn_type == ConnectionType.USB} except Exception as e: LogManager.warning(f"[device_list] 异常:{e}", udid="system") - print(f"[Listen] 获取设备列表异常: {e}") time.sleep(1) continue - now = _monotonic() - for u in online: - self._first_seen.setdefault(u, now) - self._last_seen[u] = now - with self._lock: known = set(self._models.keys()) - for udid in online - known: - if (now - self._first_seen.get(udid, now)) >= self.ADD_STABLE_SEC: + # ---------- 1. 新设备 ---------- + for udid in online: + self._last_seen[udid] = time.time() + if udid not in known: try: - self._add_device(udid) # ← 并发包装器 + self._add_device(udid) except Exception as e: - LogManager.method_error(f"新增失败:{e}", "listen", udid=udid) - print(f"[Add] 新增失败 {udid}: {e}") + # 关键:单设备异常不能干掉整个循环 + LogManager.warning(f"[Add] 处理设备 {udid} 异常: {e}", udid=udid) + print(f"[Add] 处理设备 {udid} 异常: {e}") + # ---------- 2. 可能离线设备 ---------- + now = time.time() for udid in list(known): - if udid in online: - continue - last = self._last_seen.get(udid, 0.0) - if (now - last) >= self.REMOVE_GRACE_SEC: - print(f"[Remove] 检测到设备离线: {udid}") - try: - self._remove_device(udid) - except Exception as e: - LogManager.method_error(f"移除失败:{e}", "listen", udid=udid) - print(f"[Remove] 移除失败 {udid}: {e}") + if udid not in online: + last = self._last_seen.get(udid, 0) + if now - last > self.REMOVE_GRACE_SEC: + try: + self._remove_device(udid) + except Exception as e: + LogManager.method_error(f"移除失败:{e}", "listen", udid=udid) + print(f"[Remove] 移除失败 {udid}: {e}") time.sleep(1) - # 检测设备wda状态 - def _wait_wda_ready_on_port(self, udid: str, local_port: int) -> bool: - try: - dev = wda.USBClient(udid, local_port) - info = dev.status() # 调用成功即可说明 WDA 正常 - return info["ready"] - except Exception as e: - print(f"[WDA] status 异常({udid}): {e}") - return False - - - def _send_snapshot_to_flask(self): - """把当前 _models 的全量快照发送给 Flask 进程""" - try: - # 1. 把 _models 里的设备转成可 JSON 的 dict 列表 - with self._lock: - devices = [m.toDict() for m in self._models.values()] - - payload = json.dumps({"devices": devices}, ensure_ascii=False) - - # 2. 建立到 Flask 的本地 socket 连接并发送 - port = int(os.getenv("FLASK_COMM_PORT", "34566")) - if port <= 0: - LogManager.warning("[SNAPSHOT] 无有效端口,跳过发送") + # --------------------------- + # 添加设备 + # --------------------------- + def _add_device(self, udid: str): + with self._lock: + if udid in self._models: + print(f"[Add] 已存在,跳过: {udid}") return + print(f"[Add] 新增设备 {udid}") - with socket.create_connection(("127.0.0.1", port), timeout=1.5) as s: - s.sendall(payload.encode("utf-8") + b"\n") - print(f"[SNAPSHOT] 已发送 {len(devices)} 台设备快照到 Flask") - LogManager.info(f"[SNAPSHOT] 已发送 {len(devices)} 台设备快照到 Flask") - except Exception as e: - # 不要让异常影响主循环,只打个日志 - LogManager.warning(f"[SNAPSHOT] 发送快照失败: {e}") - - def _device_online_via_tidevice(self, udid: str) -> bool: + # 判断 iOS 版本 try: - from tidevice import Usbmux, ConnectionType - devs = Usbmux().device_list() - return any(d.udid == udid and d.conn_type == ConnectionType.USB for d in devs) - except Exception: - # 容错:tidevice 异常时,假定在线,避免误判;后续命令会再校验 - return True + t = tidevice.Device(udid) + version_major = float(t.product_version.split(".")[0]) + except Exception as e: + print(f"[Add] 获取系统版本失败 {udid}: {e}") + version_major = 0 - def _pair_if_needed_for_ios17(self, udid: str, timeout_sec: float | None = None) -> bool: - """ - iOS 17+ 配对:已信任直接 True;否则触发配对并无限等待(设备离线则 False) - 使用 “python -m pymobiledevice3 lockdown pair -u ” 做配对,规避 API 版本差异。 - timeout_sec=None 表示无限等待;若传入数字则为最多等待秒数。 - """ - # 已信任直接过 - if self._trusted(udid): - return True - - print(f"[Pair][CLI] iOS17+ 需要配对,等待手机上点击“信任”… {udid}") - last_log = 0.0 - - # 轮询直到配对成功/超时/设备离线 - while True: - # 1) 设备在线性检查(防止卡等已拔掉的设备) - if not self._device_online_via_tidevice(udid): - print(f"[Pair][CLI] 设备已离线,停止等待 {udid}") - return False - - # 2) 触发一次配对尝试 - cmd = [sys.executable, "-m", "pymobiledevice3", "lockdown", "pair", "-u", udid] + # 启动 WDA + if version_major >= 17.0: + threading.Thread( + target=IOSActivator().activate_ios17, + args=(udid,), + daemon=True, + ).start() + else: try: - # 不打印子进程输出,保持你现有日志风格;需要可改为 PIPE 查看 - res = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - if res.returncode == 0: - print(f"[Pair][CLI] 配对成功 {udid}") - return True + tidevice.Device(udid).app_start(WdaAppBundleId) except Exception as e: - print(f"[Pair][CLI] 调用失败:{e}") + print(f"[Add] 使用 tidevice 启动 WDA 失败 {udid}: {e}") - # 3) 日志节流 + 可选超时 - now = time.monotonic() - if now - last_log >= 10.0: - print(f"[Pair][CLI] 仍在等待用户在手机上点“信任”… {udid}") - last_log = now - - if timeout_sec is not None: - timeout_sec -= 2.0 - if timeout_sec <= 0: - print(f"[Pair][CLI] 等待配对超时(达到设定时长){udid}") - return False - - time.sleep(2.0) - - # ---------------- 原 _add_device 实现:整体改名为 _add_device_impl ---------------- - def _add_device_impl(self, udid: str): - print(f"[Add] 开始新增设备 {udid}") - - if not self._trusted(udid): - print(f"[Add] 未信任设备 {udid}, 跳过") - return - - # 先分配一个“正式使用”的本地端口,并立即起 iproxy(只起这一回) - port = self._alloc_port() - print(f"[iproxy] 准备启动 iproxy 映射 {port}->{wdaScreenPort} (正式)") - proc = self._start_iproxy(udid, local_port=port) - if not proc: - self._release_port(port) - print(f"[iproxy] 启动失败,放弃新增 {udid}") - return - - # 判断 WDA 是否已就绪;如果未就绪,按原逻辑拉起 WDA 并等到就绪 - try: - dev = tidevice.Device(udid) - major = int(dev.product_version.split(".")[0]) - except Exception: - major = 0 - - # 直接用“正式端口”探测 /status,避免再启一次临时 iproxy - if not self._wait_wda_ready_on_port(udid, local_port=wdaFunctionPort): - # 如果还没起来,按你原逻辑拉起 WDA 再等 - if major >= 17: - print("进入 iOS17+ 设备的分支") - if not self._pair_if_needed_for_ios17(udid, timeout_sec=None): # 无限等;传秒数则有限时 - print(f"[WDA] iOS17+ 配对失败或设备离线,放弃新增 {udid}") - try: - self._kill(proc) - except Exception: - pass - self._release_port(port) - return - - try: - IOSActivator().activate(udid) - print("wda启动完成") - except Exception as e: - print(f"[WDA] iOS17 激活异常: {e}") - else: - print(f"[WDA] iOS<=17 启动 WDA app_start (port={wdaScreenPort})") - try: - dev = tidevice.Device(udid) - dev.app_start(WdaAppBundleId) - time.sleep(2) - except Exception as e: - print(f"[WDA] app_start 异常: {e}") - - if not self._wait_wda_ready_on_port(udid, local_port=wdaFunctionPort): - print(f"[WDA] WDA 未在超时内就绪, 放弃新增 {udid}") - # 清理已起的正式 iproxy - try: - self._kill(proc) - except Exception: - pass - self._release_port(port) - return - - print(f"[WDA] WDA 就绪,准备获取屏幕信息 {udid}") - time.sleep(0.5) - - # 带超时的屏幕信息获取(保留你原有容错/重试) - w, h, s = self._screen_info_with_timeout(udid, timeout=3.5) - if not (w and h and s): - for i in range(4): - print(f"[Screen] 第{i + 1}次获取失败, 重试中... {udid}") - time.sleep(0.6) - w, h, s = self._screen_info_with_timeout(udid, timeout=3.5) - if w and h and s: - break - if not (w and h and s): - print(f"[Screen] 屏幕信息仍为空,继续添加 {udid}") - - # 写入模型 & 发送前端 + # 分配投屏端口 & 写入模型 with self._lock: - model = DeviceModel(deviceId=udid, screenPort=port, width=w, height=h, scale=s, type=1) - model.ready = True - self._models[udid] = model - self._iproxy[udid] = proc - self._port_by_udid[udid] = port - if hasattr(self, "_iproxy_fail_count"): - self._iproxy_fail_count[udid] = 0 + self.screenPort += 1 + screen_port = self.screenPort - print(f"[Manager] 准备发送设备数据到前端 {udid}") - self._manager_send() - print(f"[Add] 设备添加成功 {udid}, port={port}, {w}x{h}@{s}") - - def _remove_device(self, udid: str): - """ - 移除设备及其转发,通知上层。 - 幂等:重复调用不会出错。 - """ - print(f"[Remove] 正在移除设备 {udid}") - - # --- 1. 锁内执行所有轻量字典操作 --- - with self._lock: - model = self._models.pop(udid, None) - proc = self._iproxy.pop(udid, None) - self._port_by_udid.pop(udid, None) - self._first_seen.pop(udid, None) - self._last_seen.pop(udid, None) - self._iproxy_fail_count.pop(udid, None) - - # --- 2. 锁外执行重操作 --- - # 杀进程 - try: - self._kill(proc) - except Exception as e: - print(f"[Remove] 杀进程异常 {udid}: {e}") - - # 准备下线模型(model 可能为 None) - if model is None: model = DeviceModel( - deviceId=udid, screenPort=-1, width=0, height=0, scale=0.0, type=2 + deviceId=udid, + screenPort=screen_port, + width=0, + height=0, + scale=0, + type=1 ) + self._models[udid] = model - # 标记状态为离线 - model.type = 2 - model.ready = False - model.screenPort = -1 + print(f"[Add] 新设备完成 {udid}, screenPort={screen_port}") + self._manager_send() - # 通知上层 + # 启动 iproxy(投屏转发) try: - self._manager_send() + self._start_iproxy(udid, screen_port) except Exception as e: - print(f"[Remove] 通知上层异常 {udid}: {e}") + print(f"[iproxy] 启动失败 {udid}: {e}") - print(f"[Remove] 设备移除完成 {udid}") + # 异步等待 WDA + 更新屏幕尺寸(后面你要真用尺寸,再把 _screen_info 换成 wda 版本即可) + threading.Thread( + target=self._async_wait_wda_and_update_screen, + args=(udid,), + daemon=True + ).start() - def _trusted(self, udid: str) -> bool: - try: - BaseDevice(udid).get_value("DeviceName") - print(f"[Trust] 设备 {udid} 已信任") - return True - except Exception: - print(f"[Trust] 设备 {udid} 未信任") - return False + # --------------------------- + # 启动 iproxy(投屏) + # --------------------------- + def _start_iproxy(self, udid: str, local_port: int): + iproxy_path = self._find_iproxy() - def _screen_info(self, udid: str): - try: - # 避免 c.home() 可能触发的阻塞,直接取 window_size - c = wda.USBClient(udid, wdaFunctionPort) - size = c.window_size() - print(f"[Screen] 成功获取屏幕 {int(size.width)}x{int(size.height)} {udid}") - return int(size.width), int(size.height), float(c.scale) - except Exception as e: - print(f"[Screen] 获取屏幕信息异常: {e} {udid}") - return 0, 0, 0.0 + # 已有进程并且还在跑,就不重复起 + p = self._iproxy_process.get(udid) + if p is not None and p.poll() is None: + print(f"[iproxy] 已存在运行中的进程,跳过 {udid}") + return - def _screen_info_with_timeout(self, udid: str, timeout: float = 3.5): - """在线程里调用 _screen_info,超时返回 0 值,防止卡死。""" - import threading - result = {"val": (0, 0, 0.0)} - done = threading.Event() + args = [ + iproxy_path, + "-u", udid, + str(local_port), # 本地端口(投屏) + "9567" # 手机端口(go-ios screencast) + ] - def _target(): - try: - result["val"] = self._screen_info(udid) - finally: - done.set() + print(f"[iproxy] 启动进程: {args}") - t = threading.Thread(target=_target, daemon=True) - t.start() - if not done.wait(timeout): - print(f"[Screen] 获取屏幕信息超时({timeout}s) {udid}") - return 0, 0, 0.0 - return result["val"] + # 不用 PIPE,防止没人读导致缓冲爆掉;窗口用前面配置隐藏 + proc = subprocess.Popen( + args, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + creationflags=self._creationflags, + startupinfo=self._startupinfo, + ) - def _wait_until_listening(self, port: int, timeout: float) -> bool: - for to in (1.5, 2.5, 3.5): - deadline = _monotonic() + to - while _monotonic() < deadline: - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.settimeout(0.25) - if s.connect_ex(("127.0.0.1", port)) == 0: - print(f"[Port] 端口 {port} 已监听") - return True - time.sleep(0.05) - print(f"[Port] 端口 {port} 未监听") - return False + self._iproxy_process[udid] = proc - def _spawn_iproxy(self, udid: str, local_port: int, remote_port: int) -> Optional[subprocess.Popen]: - creationflags = 0 - startupinfo = None - if os.name == "nt": - creationflags = getattr(subprocess, "CREATE_NO_WINDOW", 0x08000000) | \ - getattr(subprocess, "CREATE_NEW_PROCESS_GROUP", 0x00000200) - si = subprocess.STARTUPINFO() - si.dwFlags |= subprocess.STARTF_USESHOWWINDOW - si.wShowWindow = 0 - startupinfo = si - cmd = [self._iproxy_path, "-u", udid, str(local_port), str(remote_port)] - try: - print(f"[iproxy] 启动进程 {cmd}") - return subprocess.Popen( - cmd, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - creationflags=creationflags, - startupinfo=startupinfo, - ) - except Exception as e: - print(f"[iproxy] 创建进程失败: {e}") - return None - - def _start_iproxy(self, udid: str, local_port: int) -> Optional[subprocess.Popen]: - proc = self._spawn_iproxy(udid, local_port=local_port, remote_port=wdaScreenPort) - if not proc: - print(f"[iproxy] 启动失败 {udid}") - return None - if not self._wait_until_listening(local_port, 3.0): - self._kill(proc) - print(f"[iproxy] 未监听, 已杀死 {udid}") - return None - print(f"[iproxy] 启动成功 port={local_port} {udid}") - return proc - - def _kill(self, proc: Optional[subprocess.Popen]): - if not proc: + # --------------------------- + # 停止 iproxy + # --------------------------- + def _stop_iproxy(self, udid: str): + p = self._iproxy_process.get(udid) + if not p: return try: - p = psutil.Process(proc.pid) p.terminate() try: - p.wait(timeout=1.5) - except psutil.TimeoutExpired: - p.kill(); p.wait(timeout=1.5) - print(f"[Proc] 已结束进程 PID={proc.pid}") - except Exception as e: - print(f"[Proc] 结束进程异常: {e}") + p.wait(timeout=2) + except Exception: + p.kill() + except Exception: + pass + self._iproxy_process.pop(udid, None) + print(f"[iproxy] 已停止 {udid}") + + # --------------------------- + # 异步等待 WDA 然后更新屏幕大小 + # --------------------------- + def _async_wait_wda_and_update_screen(self, udid: str): + print(f"[WDA] 等待 WDA 就绪 {udid}") - def _manager_send(self): - """对外统一的“通知 Flask 有设备变动”的入口(无参数)。 - 作用:把当前所有设备的全量快照发给 Flask。 - """ - # 第 1 次:直接发快照 try: - self._send_snapshot_to_flask() - return + # 最长等待 20 秒(你后面要真用屏幕尺寸,再把 _screen_info 换回 wda 的实现) + for _ in range(20): + w, h, s = self._screen_info(udid) + if w > 0: + print(f"[WDA] 屏幕信息成功 {udid} {w}x{h} scale={s}") + with self._lock: + m = self._models.get(udid) + if m: + m.width = w + m.height = h + m.scale = s + self._manager_send() + return + time.sleep(1) except Exception as e: - print(f"[Manager] 首次发送快照异常: {e}") + print(f"[WDA] 获取屏幕信息异常 {udid}: {e}") - # 自愈:尝试拉起 Flask 子进程 - try: - self._manager.start() - except Exception as e: - print(f"[Manager] 拉起 Flask 子进程异常: {e}") + print(f"[WDA] 屏幕信息获取失败(超时) {udid}") - # 第 2 次:再发快照 - try: - self._send_snapshot_to_flask() - print(f"[Manager] 重试发送快照成功") - except Exception as e: - print(f"[Manager] 重试发送快照仍失败: {e}") + # --------------------------- + # 移除设备 + # --------------------------- + def _remove_device(self, udid: str): + print(f"[Remove] 移除设备 {udid}") + # 先停 iproxy(只管自己的,不影响其他设备) + self._stop_iproxy(udid) + + with self._lock: + self._models.pop(udid, None) + self._last_seen.pop(udid, None) + + self._manager_send() + + # --------------------------- + # WDA 屏幕查询(当前保持空实现) + # --------------------------- + def _screen_info(self, udid: str): + # 现在先不通过 wda 取屏幕,避免触发 tidevice 那套 WDA 启动逻辑 + # 你如果确认 go-ios 跑起来后用 facebook-wda 取尺寸是安全的,可以改回下面这种: + # + # try: + # c = wda.USBClient(udid, wdaFunctionPort) + # size = c.window_size() + # return int(size.width), int(size.height), float(c.scale) + # except Exception as e: + # print(f"[Screen] 获取屏幕信息异常: {e} {udid}") + # return 0, 0, 0.0 + return 0, 0, 0.0 + + # --------------------------- + # 找 iproxy + # --------------------------- def _find_iproxy(self) -> str: - env_path = os.getenv("IPROXY_PATH") - if env_path and Path(env_path).is_file(): - print(f"[iproxy] 使用环境变量路径 {env_path}") - return env_path - base = Path(__file__).resolve().parent.parent + base = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) name = "iproxy.exe" if os.name == "nt" else "iproxy" - path = base / "resources" / "iproxy" / name - if path.is_file(): - print(f"[iproxy] 使用默认路径 {path}") - return str(path) - raise FileNotFoundError(f"iproxy 不存在: {path}") \ No newline at end of file + path = os.path.join(base, "resources", "iproxy", name) + return path + + # --------------------------- + # 数据同步到 Flask + # --------------------------- + def _manager_send(self): + try: + self._send_snapshot_to_flask() + except Exception: + try: + self._manager.start() + except Exception: + pass + try: + self._send_snapshot_to_flask() + except Exception: + pass + + def _send_snapshot_to_flask(self): + with self._lock: + devices = [m.toDict() for m in self._models.values()] + + payload = json.dumps({"devices": devices}, ensure_ascii=False) + + port = int(os.getenv("FLASK_COMM_PORT", "34566")) + with socket.create_connection(("127.0.0.1", port), timeout=1.5) as s: + s.sendall(payload.encode() + b"\n") + + print(f"[SNAPSHOT] 已发送 {len(devices)} 台设备") \ No newline at end of file diff --git a/Module/IOSActivator.py b/Module/IOSActivator.py index 3769409..04a58d4 100644 --- a/Module/IOSActivator.py +++ b/Module/IOSActivator.py @@ -1,540 +1,273 @@ -# -*- coding: utf-8 -*- -import os -import re -import sys -import atexit -import signal -import socket import subprocess -from typing import Optional, List, Tuple, Dict, Set +import threading +import time +import os +import sys +from typing import Tuple, Optional from Entity.Variables import WdaAppBundleId -import time as _t class IOSActivator: """ - 轻量 iOS 激活器(仅代码调用) - 进程/网卡可控版 - 1) 启动 `pymobiledevice3 remote tunneld`(可常驻/可一次性) - 2) 自动挂载 DDI - 3) 隧道就绪后启动 WDA - 4) 程序退出或 keep_tunnel=False 时,确保 tunneld 进程与虚拟网卡被清理 + 专门给 iOS17+ 设备用的 go-ios 激活器: + - 外部先探测 WDA,不存在时再调用 activate_ios17 + - 内部流程:tunnel start -> pair(等待成功) -> image auto -> runwda """ - # ---------- 正则 ---------- - HTTP_RE = re.compile(r"http://([0-9.]+):(\d+)") - RSD_CREATED_RE = re.compile(r"Created tunnel\s+--rsd\s+([^\s]+)\s+(\d+)") - RSD_FALLBACK_RE = re.compile(r"--rsd\s+(\S+?)[\s:](\d+)") - IFACE_RE = re.compile(r"\b(pymobiledevice3-tunnel-[^\s/\\]+)\b", re.IGNORECASE) - - def __init__(self, python_executable: Optional[str] = None): - self.python = python_executable or None - self._live_procs: Dict[str, subprocess.Popen] = {} # udid -> tunneld proc - self._live_ifaces: Dict[str, Set[str]] = {} # udid -> {iface names} - self._registered = False - - - # =============== 公共入口 =============== - def activate( - self, - udid: str, - wda_bundle_id: Optional[str] = WdaAppBundleId, - ready_timeout_sec: float = 120.0, - mount_retries: int = 3, - backoff_seconds: float = 2.0, - rsd_probe_retries: int = 5, - rsd_probe_delay_sec: float = 3.0, - pre_mount_first: bool = True, - keep_tunnel: bool = False, # 默认 False:WDA 拉起后关闭隧道 - broad_cleanup_on_exit: bool = True, # 退出时顺带清理所有 pmd3 残留网卡 - ) -> str: + def __init__( + self, + ios_path: Optional[str] = None, + pair_timeout: int = 60, # 配对最多等多久 + pair_retry_interval: int = 3, # 每次重试间隔 + ): """ - 流程:挂镜像(可选) -> 开隧道 -> 等 RSD -> 启动 WDA - - keep_tunnel=False:WDA 启动后关闭隧道并清理 - - keep_tunnel=True:隧道常驻,由上层/atexit 清理 + :param ios_path: ios.exe 的绝对路径,例如 E:\\code\\Python\\iOSAi\\resources\\ios.exe + 如果为 None,则自动从项目的 resources 目录中寻找 ios.exe """ - if not udid or not isinstance(udid, str): - raise ValueError("udid is required and must be a non-empty string") - print(f"[activate] UDID = {udid}") - self._ensure_exit_hooks(broad_cleanup_on_exit=broad_cleanup_on_exit) + # ==== 统一获取 resources 目录(支持源码运行 + Nuitka EXE) ==== + if "__compiled__" in globals(): + # 被 Nuitka 编译后的 exe 运行时 + base_dir = os.path.dirname(sys.executable) # exe 所在目录 + else: + # 开发环境,直接跑 .py + cur_file = os.path.abspath(__file__) # 当前 .py 文件所在目录 + base_dir = os.path.dirname(os.path.dirname(cur_file)) # 回到项目根 iOSAi - # Windows 管理员检测 + resource_dir = os.path.join(base_dir, "resources") + + # 如果外部没有显式传 ios_path,就用 resources/ios.exe + if ios_path is None or ios_path == "": + ios_path = os.path.join(resource_dir, "ios.exe") + + self.ios_path = ios_path + self.pair_timeout = pair_timeout + self.pair_retry_interval = pair_retry_interval + self._lock = threading.Lock() + + # go-ios tunnel 的后台进程 + self._tunnel_proc: Optional[subprocess.Popen] = None + + # Windows: 避免弹黑框 + self._creationflags = 0 + self._startupinfo = None # ⭐ 新增:统一控制窗口隐藏 if os.name == "nt": - import ctypes try: - is_admin = ctypes.windll.shell32.IsUserAnAdmin() != 0 + self._creationflags = subprocess.CREATE_NO_WINDOW # type: ignore[attr-defined] except Exception: - is_admin = False - if not is_admin: - print("[⚠] 未以管理员运行:若需要移除虚拟网卡,可能失败。") + self._creationflags = 0 - start_ts = _t.time() + # ⭐ 用 STARTUPINFO + STARTF_USESHOWWINDOW 彻底隐藏窗口 + si = subprocess.STARTUPINFO() + si.dwFlags |= subprocess.STARTF_USESHOWWINDOW + si.wShowWindow = 0 # SW_HIDE + self._startupinfo = si - # 1) 预挂载(失败不致命) - if pre_mount_first: - try: - self._auto_mount_developer_disk( - udid, retries=mount_retries, backoff_seconds=backoff_seconds - ) - _t.sleep(2) - except Exception as e: - print(f"[activate] 预挂载失败(稍后再试):{e}") - - # 2) 启动 tunneld - http_host: Optional[str] = None - http_port: Optional[str] = None # ⚠️ 端口以 str 存储 - rsd_host: Optional[str] = None - rsd_port: Optional[str] = None # ⚠️ 端口以 str 存储 - iface_names: Set[str] = set() - - proc, _port_ignored = self._start_tunneld(udid) - self._live_procs[udid] = proc - self._live_ifaces[udid] = iface_names - - captured: List[str] = [] - out: str = "" - wda_started = False - mount_done = pre_mount_first + # ======================= + # 基础执行封装 + # ======================= + def _run( + self, + args, + desc: str = "", + timeout: Optional[int] = None, + check: bool = True, + ) -> Tuple[int, str, str]: + """ + 同步执行 ios.exe,等它返回。 + :return: (returncode, stdout, stderr) + """ + cmd = [self.ios_path] + list(args) try: - assert proc.stdout is not None - for line in proc.stdout: - captured.append(line) - # 日志长度控制,防止常驻时内存涨太多 - if len(captured) > 20000: - captured = captured[-10000:] - - print(f"[tunneld] {line}", end="") - - # 捕获虚拟网卡名 - for m in self.IFACE_RE.finditer(line): - iface_names.add(m.group(1)) - - # 子进程若退出则停止读取 - if proc.poll() is not None: - break - - # 捕获 HTTP 网关端口(保持为字符串) - if http_port is None: - m = self.HTTP_RE.search(line) - if m: - http_host = m.group(1) - http_port = m.group(2) - # 简单校验 - try: - _ = int(http_port) - except Exception: - print(f"[tunneld] bad http port: {http_port}") - http_host, http_port = None, None - else: - print(f"[tunneld] Tunnel API: {http_host}:{http_port}") - - # 只处理当前 UDID 的 RSD 行 - if not self._line_is_for_udid(line, udid): - continue - - m = self.RSD_CREATED_RE.search(line) or self.RSD_FALLBACK_RE.search(line) - if m and rsd_host is None and rsd_port is None: - rsd_host = m.group(1) - rsd_port = m.group(2) - try: - _ = int(rsd_port) # 仅作数字校验 - except Exception: - print(f"[tunneld] bad rsd port: {rsd_port}") - rsd_host, rsd_port = None, None - else: - print(f"[tunneld] Device-level tunnel ready (RSD {rsd_host}:{rsd_port}).") - - # ========= 尝试启动 WDA ========= - if (not wda_started) and wda_bundle_id and (rsd_host is not None) and (rsd_port is not None): - if not mount_done: - self._auto_mount_developer_disk( - udid, retries=mount_retries, backoff_seconds=backoff_seconds - ) - _t.sleep(2) - mount_done = True - - # RSD 优先;探测时临时转 int;启动命令仍传 str 端口 - rsd_port_int = int(rsd_port) - if self._wait_for_rsd_ready( - rsd_host, rsd_port_int, retries=rsd_probe_retries, delay=rsd_probe_delay_sec - ): - # 这里的实现通常会拼 subprocess 命令行,故端口保持 str - self._launch_wda_via_rsd( - bundle_id=wda_bundle_id, - rsd_host=rsd_host, - rsd_port=rsd_port, # ⚠️ 传入 str,避免 subprocess 报错 - udid=udid, - ) - wda_started = True - elif (http_host is not None) and (http_port is not None): - self._launch_wda_via_http_tunnel( - bundle_id=wda_bundle_id, - http_host=http_host, - http_port=http_port, # ⚠️ 传入 str - udid=udid, - ) - wda_started = True - else: - raise RuntimeError("No valid tunnel endpoint for WDA.") - - # ✅ WDA 已启动;默认一次性模式直接退出读取循环 - if wda_started and not keep_tunnel: - _t.sleep(0.5) # 给隧道多刷几行 - print("[activate] WDA launched; exiting reader loop (keep_tunnel=False).") - break - - # 超时保护(仅在 WDA 尚未启动时生效) - if (not wda_started) and ready_timeout_sec > 0 and (_t.time() - start_ts > ready_timeout_sec): - print(f"[tunneld] Timeout waiting for device tunnel ({ready_timeout_sec}s). Aborting.") - break - - print("[activate] 启动 WDA 读取阶段结束") - out = "".join(captured) - - except Exception as e: - print(f"[activate] 发生异常:{e}") - raise - finally: - if not keep_tunnel: - try: - self.stop_tunnel(udid, broad_cleanup=broad_cleanup_on_exit) - except Exception as ce: - print(f"[activate] stop_tunnel 清理异常:{ce}") - - print("[activate] Done.") - return out - - - # =============== 外部可显式调用的清理 =============== - def stop_tunnel(self, udid: str, broad_cleanup: bool = True): - """关闭某 UDID 的 tunneld,并清理已知/残留的 pmd3 虚拟网卡。""" - proc = self._live_procs.pop(udid, None) - ifaces = self._live_ifaces.pop(udid, set()) - - # 1) 结束进程 - if proc: - try: - proc.terminate() - proc.wait(timeout=5) - except subprocess.TimeoutExpired: - try: - proc.kill() - proc.wait(timeout=3) - except Exception: - pass - # 兜底:杀掉本进程树内可能残留的 tunneld - self._kill_stray_tunneld_children() - - # 2) 清理网卡 - try: - if os.name == "nt": - # 先按已知名精确删除 - for name in sorted(ifaces): - self._win_remove_adapter(name) - # 宽匹配兜底 - if broad_cleanup: - self._win_remove_all_pmd3_adapters() - else: - # *nix 基本不需要手动删,若有需要可在此处添加 ip link delete 等 - pass - except Exception as e: - print(f"[cleanup] adapter cleanup error: {e}") - - # =============== 内部:启动 tunneld =============== - def _start_tunneld(self, udid: str) -> Tuple[subprocess.Popen, int]: - port = self._pick_available_port() - launcher, env2 = self._resolve_pmd3_argv_and_env() - env2["PYTHONUNBUFFERED"] = "1" - env2.setdefault("PYTHONIOENCODING", "utf-8") - env2["PYMOBILEDEVICE3_UDID"] = udid - - cmd = self._ensure_str_list([*launcher, "remote", "tunneld", "--port", str(port)]) - print("[activate] 启动隧道:", " ".join(cmd)) - - proc = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True, - bufsize=1, - universal_newlines=True, - env=env2, - **self._win_hidden_popen_kwargs() - ) - return proc, port - - # =============== 退出/信号回收 =============== - def _ensure_exit_hooks(self, broad_cleanup_on_exit: bool): - if self._registered: - return - self._registered = True - - def _on_exit(): - # 逐个关闭存活隧道 - for udid in list(self._live_procs.keys()): - try: - self.stop_tunnel(udid, broad_cleanup=broad_cleanup_on_exit) - except Exception: - pass - - atexit.register(_on_exit) - - def _signal_handler(signum, frame): - _on_exit() - # 默认行为:再次发出信号退出 - try: - signal.signal(signum, signal.SIG_DFL) - except Exception: - pass - os.kill(os.getpid(), signum) - - for sig in (signal.SIGINT, signal.SIGTERM): - try: - signal.signal(sig, _signal_handler) - except Exception: - pass # 某些环境不允许设置 - - # =============== Windows 虚拟网卡清理 =============== - def _win_remove_adapter(self, name: str): - """按名删除一个虚拟网卡。""" - print(f"[cleanup] remove adapter: {name}") - # 先尝试 PowerShell(需要管理员) - ps = [ - "powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command", - f"$a=Get-NetAdapter -Name '{name}' -ErrorAction SilentlyContinue; " - f"if($a){{ Disable-NetAdapter -Name '{name}' -Confirm:$false -PassThru | Out-Null; " - f"Remove-NetAdapter -Name '{name}' -Confirm:$false -ErrorAction SilentlyContinue; }}" - ] - try: - subprocess.run(ps, check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **self._win_hidden_popen_kwargs()) - return - except Exception: - pass - - # 兜底:netsh 禁用 - try: - subprocess.run( - ["netsh", "interface", "set", "interface", name, "disable"], - check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **self._win_hidden_popen_kwargs() - ) - except Exception: - pass - - def _win_remove_all_pmd3_adapters(self): - """宽匹配删除所有 pymobiledevice3-tunnel-* 网卡(防残留)。""" - print("[cleanup] sweeping all pymobiledevice3-tunnel-* adapters") - ps = [ - "powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command", - r"$nics=Get-NetAdapter -Name 'pymobiledevice3-tunnel-*' -ErrorAction SilentlyContinue; " - r"foreach($n in $nics){ Disable-NetAdapter -Name $n.Name -Confirm:$false -PassThru | Out-Null; " - r"Remove-NetAdapter -Name $n.Name -Confirm:$false -ErrorAction SilentlyContinue; }" - ] - try: - subprocess.run(ps, check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **self._win_hidden_popen_kwargs()) - except Exception: - pass - - def _kill_stray_tunneld_children(self): - """在当前进程空间内尽量清理残留的 tunneld 子进程。""" - import psutil - try: - me = psutil.Process(os.getpid()) - for ch in me.children(recursive=True): - try: - cmd = " ".join(ch.cmdline()).lower() - except Exception: - cmd = "" - if "pymobiledevice3" in cmd and "remote" in cmd and "tunneld" in cmd: - try: - ch.terminate() - ch.wait(2) - except Exception: - try: - ch.kill() - except Exception: - pass - except Exception: - pass - - # =============== 其它工具 & 你原有的方法(未改动核心逻辑) =============== - def _pmd3_run(self, args: List[str], udid: str, extra_env: Optional[dict] = None) -> str: - launcher, env = self._resolve_pmd3_argv_and_env() - env["PYMOBILEDEVICE3_UDID"] = udid - env["PYTHONUNBUFFERED"] = "1" - env.setdefault("PYTHONIOENCODING", "utf-8") - if extra_env: - for k, v in extra_env.items(): - if v is None: - env.pop(k, None) - else: - env[k] = str(v) - cmd = [*launcher, *args] - print("[pmd3]", " ".join(map(str, cmd))) - try: - return subprocess.check_output( + proc = subprocess.run( cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, text=True, - stderr=subprocess.STDOUT, - env=env, - **self._win_hidden_popen_kwargs() - ) or "" - except subprocess.CalledProcessError as exc: - raise RuntimeError(exc.output or f"pymobiledevice3 执行失败,退出码 {exc.returncode}") + timeout=timeout, + creationflags=self._creationflags, + startupinfo=self._startupinfo, # ⭐ 关键:隐藏窗口 + ) + except subprocess.TimeoutExpired: + if check: + raise + return -1, "", "timeout" - def _ensure_str_list(self, seq): - return [str(x) for x in seq] + out = proc.stdout or "" + err = proc.stderr or "" - def _win_hidden_popen_kwargs(self): - if os.name != "nt": - return {} - si = subprocess.STARTUPINFO() - si.dwFlags |= subprocess.STARTF_USESHOWWINDOW - si.wShowWindow = 0 # SW_HIDE - return { - "startupinfo": si, - "creationflags": getattr(subprocess, "CREATE_NO_WINDOW", 0x08000000), - } + if check and proc.returncode != 0: + raise RuntimeError(f"[ios] 命令失败({desc}), returncode={proc.returncode}") - def _resolve_pmd3_argv_and_env(self): - import shutil, subprocess - from pathlib import Path + return proc.returncode, out, err - env = os.environ.copy() - env["PYTHONUNBUFFERED"] = "1" - env.setdefault("PYTHONIOENCODING", "utf-8") - - prefer_py = env.get("IOSAI_PYTHON") - base = Path(sys.argv[0]).resolve() - base_dir = base.parent if base.is_file() else base - py_name = "python.exe" if os.name == "nt" else "python" - - sidecar_candidates = [ - base_dir / "python-rt" / py_name, - base_dir / "python-rt" / "Scripts" / py_name, - base_dir.parent / "python-rt" / py_name, - base_dir.parent / "python-rt" / "Scripts" / py_name, - ] - if prefer_py: - sidecar_candidates.insert(0, Path(prefer_py)) - - for cand in sidecar_candidates: - print(f"[IOSAI] 🔎 probing sidecar at: {cand}") - if cand.is_file(): - try: - out = subprocess.check_output( - [str(cand), "-c", "import pymobiledevice3;print('ok')"], - text=True, stderr=subprocess.STDOUT, env=env, timeout=6, **self._win_hidden_popen_kwargs() - ) - if "ok" in out: - print(f"[IOSAI] ✅ sidecar selected: {cand}") - return ([str(cand), "-u", "-m", "pymobiledevice3"], env) - except Exception: - pass - - exe = shutil.which("pymobiledevice3") - if exe: - print(f"[IOSAI] ✅ use PATH executable: {exe}") - return ([exe], env) - - py_candidates = [] - base_exec = getattr(sys, "_base_executable", None) - if base_exec and os.path.isfile(base_exec): - py_candidates.append(base_exec) - for name in ("python3.exe", "python.exe", "py.exe", "python3", "python"): - p = shutil.which(name) - if p and p not in py_candidates: - py_candidates.append(p) - - for py in py_candidates: - print(f"[IOSAI] 🔎 probing system python: {py}") - try: - out = subprocess.check_output( - [py, "-c", "import pymobiledevice3;print('ok')"], - text=True, stderr=subprocess.STDOUT, env=env, timeout=6, **self._win_hidden_popen_kwargs() - ) - if "ok" in out: - print(f"[IOSAI] ✅ system python selected: {py}") - return ([py, "-u", "-m", "pymobiledevice3"], env) - except Exception: - continue - - raise RuntimeError("未检测到可用的 pymobiledevice3(建议携带 python-rt 或安装系统 Python+pmd3)。") - - # -------- DDI / RSD / 启动 WDA (与你原逻辑一致) -------- - def _auto_mount_developer_disk(self, udid: str, retries: int = 3, backoff_seconds: float = 2.0) -> None: - import time as _t - last_err = "" - for i in range(max(1, retries)): - try: - out = self._pmd3_run(["mounter", "auto-mount"], udid) - if out: - for line in out.splitlines(): - print(f"[mounter] {line}") - if "already mounted" in (out or "").lower(): - print("[mounter] Developer disk image already mounted.") - else: - print("[mounter] Developer disk image mounted.") + def _spawn_tunnel(self) -> None: + with self._lock: + if self._tunnel_proc is not None and self._tunnel_proc.poll() is None: + print("[ios] tunnel 已经在运行,跳过重新启动") return - except Exception as e: - last_err = str(e) - if i < retries - 1: - print(f"[mounter] attempt {i+1}/{retries} failed, retrying in {backoff_seconds}s ...") - _t.sleep(backoff_seconds) - else: - raise RuntimeError(f"Auto-mount failed after {retries} attempts.\n{last_err}") - def _is_ipv4_host(self, host: str) -> bool: - return bool(re.match(r"^\d{1,3}(\.\d{1,3}){3}$", host)) + cmd = [self.ios_path, "tunnel", "start"] + print("[ios] 启动 go-ios tunnel: %s", " ".join(cmd)) - def _wait_for_rsd_ready(self, rsd_host: str, rsd_port: str, retries: int = 5, delay: float = 3.0) -> bool: - port_int = int(rsd_port) - for i in range(1, retries + 1): - print(f"[rsd] Probing RSD {rsd_host}:{rsd_port} (attempt {i}/{retries}) ...") try: - with socket.create_connection((rsd_host, port_int), timeout=2): - print("[rsd] ✅ RSD is reachable and ready.") - return True - except (socket.timeout, ConnectionRefusedError, OSError) as e: - print(f"[rsd] Not ready yet ({e}). Retrying...") - import time as _t - _t.sleep(delay) - print("[rsd] ❌ RSD did not become ready after retries.") - return False + proc = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + creationflags=self._creationflags, + startupinfo=self._startupinfo, # ⭐ 关键:隐藏窗口 + ) + except Exception as e: + # 这里改成 warning,并且直接返回,不往外抛 + print("[ios] 启动 tunnel 失败(忽略): %s", e) + return - def _launch_wda_via_rsd(self, bundle_id: str, rsd_host: str, rsd_port: str, udid: str) -> None: - print(f"[wda] Launch via RSD {rsd_host}:{rsd_port}, bundle: {bundle_id}") - out = self._pmd3_run(["developer", "dvt", "launch", bundle_id, "--rsd", rsd_host, rsd_port], udid=udid) - if out: - for line in out.splitlines(): - print(f"[wda] {line}") - print("[wda] Launch via RSD completed.") + self._tunnel_proc = proc + print("[ios] tunnel 启动成功, PID=%s", proc.pid) - def _launch_wda_via_http_tunnel(self, bundle_id: str, http_host: str, http_port: str, udid: str) -> None: - if not self._is_ipv4_host(http_host): - raise RuntimeError(f"HTTP tunnel host must be IPv4, got {http_host}") - tunnel_endpoint = f"{http_host}:{http_port}" - print(f"[wda] Launch via HTTP tunnel {tunnel_endpoint}, bundle: {bundle_id}") - out = self._pmd3_run(["developer", "dvt", "launch", bundle_id], udid=udid, - extra_env={"PYMOBILEDEVICE3_TUNNEL": tunnel_endpoint}) - if out: - for line in out.splitlines(): - print(f"[wda] {line}") - print("[wda] Launch via HTTP tunnel completed.") + threading.Thread( + target=self._drain_process_output, + args=(proc, "tunnel"), + daemon=True, + ).start() - # -------- 端口挑选 -------- - def _pick_available_port(self, base=49151, step=10) -> int: - for i in range(0, 1000, step): - port = base + i - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - if s.connect_ex(("127.0.0.1", port)) != 0: - return port - raise RuntimeError("No free port found for tunneld") - - # -------- UDID 过滤 -------- - def _line_is_for_udid(self, line: str, udid: str) -> bool: + def _drain_process_output(self, proc: subprocess.Popen, name: str): + """简单把后台进程的输出打到日志里,避免缓冲区阻塞。""" try: - return udid.lower() in (line or "").lower() - except Exception: - return False \ No newline at end of file + if proc.stdout: + for line in proc.stdout: + line = line.rstrip() + print(line) + except Exception as e: + print("[ios][%s] 读取 stdout 异常: %s", name, e) + + try: + if proc.stderr: + for line in proc.stderr: + line = line.rstrip() + if line: + print("[ios][%s][stderr] %s", name, line) + except Exception as e: + print("[ios][%s] 读取 stderr 异常: %s", name, e) + + # ======================= + # 具体步骤封装 + # ======================= + def _pair_until_success(self, udid: str) -> None: + """ + 调用 `ios --udid pair`,直到成功或者超时。 + 成功条件:stdout 中出现 "Successfully paired" + """ + deadline = time.time() + self.pair_timeout + attempt = 0 + + while True: + attempt += 1 + print("[ios] 开始配对设备(%s),第 %d 次尝试", udid, attempt) + + rc, out, err = self._run( + ["--udid", udid, "pair"], + desc=f"pair({udid})", + timeout=20, + check=False, + ) + + text = (out or "") + "\n" + (err or "") + if "Successfully paired" in text: + print("[ios] 设备 %s 配对成功", udid) + return + + if time.time() >= deadline: + raise RuntimeError(f"[ios] 设备 {udid} 在超时时间内配对失败") + + time.sleep(self.pair_retry_interval) + + def _mount_dev_image(self, udid: str) -> None: + """ + `ios --udid image auto` 挂载开发者镜像。 + 成功条件:输出里出现 "success mounting image" 或 + "there is already a developer image mounted" 之类的提示。 + 挂载失败现在只打 warning,不再抛异常,避免阻断后续 runwda。 + """ + print("[ios] 开始为设备 %s 挂载开发者镜像 (image auto)", udid) + + rc, out, err = self._run( + ["--udid", udid, "image", "auto"], + desc=f"image auto({udid})", + timeout=300, + check=False, + ) + + text = (out or "") + "\n" + (err or "") + text_lower = text.lower() + + # 这些都当成功处理 + success_keywords = [ + "success mounting image", + "there is already a developer image mounted", + ] + if any(k in text_lower for k in success_keywords): + print("[ios] 设备 %s 开发者镜像挂载完成", udid) + if text.strip(): + print("[ios][image auto] output:\n%s", text.strip()) + return + + # 到这里说明没找到成功关键字,当成“不可靠但非致命” + print( + "[ios] 设备 %s 挂载开发者镜像可能失败(rc=%s),输出:\n%s", + udid, rc, text.strip() + ) + # 关键:不再 raise,直接 return,让后续 runwda 继续试 + return + + def _run_wda(self, udid: str) -> None: + # ⭐ 按你验证的命令构造参数(绝对正确) + args = [ + f"--udid={udid}", + "runwda", + f"--bundleid={WdaAppBundleId}", + f"--testrunnerbundleid={WdaAppBundleId}", + "--xctestconfig=yolo.xctest", # ⭐ 你亲自验证成功的值 + ] + + rc, out, err = self._run( + args, + desc=f"runwda({udid})", + timeout=300, + check=False, + ) + + # ======================= + # 对外主流程 + # ======================= + def activate_ios17(self, udid: str) -> None: + print("[WDA] iOS17+ 激活开始,udid=%s", udid) + + # 1. 启动 tunnel + self._spawn_tunnel() + + # 2. 一直等到 pair 成功(pair 不成功就没法玩了,直接返回) + try: + self._pair_until_success(udid) + except Exception as e: + print("[WDA] pair 失败,终止激活流程 udid=%s, err=%s", udid, e) + return + + # 3. 挂载开发者镜像(现在是非致命错误) + try: + self._mount_dev_image(udid) + except Exception as e: + # 理论上不会再进到这里,但为了稳妥,多一层保护 + print("[WDA] 挂载开发者镜像出现异常,忽略继续 udid=%s, err=%s", udid, e) + + # 4. 尝试启动 WDA + try: + self._run_wda(udid) + except Exception as e: + print("[WDA] runwda 调用异常 udid=%s, err=%s", udid, e) + + print("[WDA] iOS17+ 激活流程结束(不代表一定成功),udid=%s", udid) \ No newline at end of file diff --git a/Module/Main.py b/Module/Main.py index bae5465..930d259 100644 --- a/Module/Main.py +++ b/Module/Main.py @@ -4,6 +4,7 @@ import os import sys from pathlib import Path +import tidevice from hypercorn.asyncio import serve from hypercorn.config import Config diff --git a/resources/ios.exe b/resources/ios.exe new file mode 100644 index 0000000..520f015 Binary files /dev/null and b/resources/ios.exe differ diff --git a/resources/wintun.dll b/resources/wintun.dll new file mode 100644 index 0000000..aee04e7 Binary files /dev/null and b/resources/wintun.dll differ