import json import os import socket import threading import time import subprocess from typing import Dict import tidevice import wda from tidevice import Usbmux, ConnectionType from Entity.DeviceModel import DeviceModel from Entity.Variables import WdaAppBundleId, wdaFunctionPort from Module.FlaskSubprocessManager import FlaskSubprocessManager from Module.IOSActivator import IOSActivator from Utils.LogManager import LogManager class DeviceInfo: _instance = None _instance_lock = threading.Lock() # 离线宽限期 REMOVE_GRACE_SEC = float(os.getenv("REMOVE_GRACE_SEC", "6.0")) def __new__(cls, *args, **kwargs): if not cls._instance: with cls._instance_lock: if not cls._instance: cls._instance = super().__new__(cls) return cls._instance def __init__(self) -> None: if getattr(self, "_initialized", False): return self._lock = threading.RLock() self._models: Dict[str, DeviceModel] = {} self._manager = FlaskSubprocessManager.get_instance() self.screenPort = 9110 # 设备心跳时间 self._last_seen: Dict[str, float] = {} # iproxy 子进程:udid -> Popen self._iproxy_process: Dict[str, subprocess.Popen] = {} # Windows 下隐藏子进程窗口(给 iproxy 用) self._creationflags = 0 self._startupinfo = None if os.name == "nt": try: self._creationflags = subprocess.CREATE_NO_WINDOW # type: ignore[attr-defined] except Exception: self._creationflags = 0 si = subprocess.STARTUPINFO() si.dwFlags |= subprocess.STARTF_USESHOWWINDOW si.wShowWindow = 0 # SW_HIDE self._startupinfo = si LogManager.info("DeviceInfo 初始化完成", udid="system") print("[Init] DeviceInfo 初始化完成") self._initialized = True # ========================== # 主循环 # ========================== def listen(self): LogManager.method_info("进入主循环", "listen", udid="system") print("[Listen] 开始监听设备上下线...") last_broadcast = 0.0 # 用来做“心跳全量同步”的时间戳 while True: try: usb = Usbmux().device_list() online = {d.udid for d in usb if d.conn_type == ConnectionType.USB} except Exception as e: LogManager.warning(f"[device_list] 异常:{e}", udid="system") print("[Listen] device_list 出错,本轮视为无设备,防止状态脏死") usb = [] online = set() with self._lock: known = set(self._models.keys()) # ---------- 1. 新设备 ---------- now = time.time() for udid in online: self._last_seen[udid] = now if udid not in known: try: self._add_device(udid) except Exception as e: LogManager.warning(f"[Add] 处理设备 {udid} 异常: {e}", udid=udid) print(f"[Add] 处理设备 {udid} 异常: {e}") # ---------- 2. 可能离线设备 ---------- for udid in list(known): if udid not in online: last = self._last_seen.get(udid, 0) if now - last > self.REMOVE_GRACE_SEC: try: self._remove_device(udid) except Exception as e: LogManager.method_error(f"移除失败:{e}", "listen", udid=udid) print(f"[Remove] 移除失败 {udid}: {e}") # ---------- 3. 心跳:每 5 秒强制同步一次到 Flask ---------- if now - last_broadcast > 5.0: try: self._manager_send() except Exception as e: print(f"[Listen] 周期同步到 Flask 失败: {e}") else: last_broadcast = now time.sleep(1) # ========================== # 添加设备 # ========================== def _add_device(self, udid: str): with self._lock: if udid in self._models: print(f"[Add] 已存在,跳过: {udid}") return print(f"[Add] 新增设备 {udid}") # 判断 iOS 版本 try: t = tidevice.Device(udid) version_major = float(t.product_version.split(".")[0]) except Exception as e: print(f"[Add] 获取系统版本失败 {udid}: {e}") version_major = 0 # 分配投屏端口 & 写入模型(先插入,width/height=0,后面再异步更新) with self._lock: self.screenPort += 1 screen_port = self.screenPort model = DeviceModel( deviceId=udid, screenPort=screen_port, width=0, height=0, scale=0, type=1, ) self._models[udid] = model print(f"[Add] 新设备完成 {udid}, screenPort={screen_port}") self._manager_send() # 启动 iproxy(投屏转发) try: self._start_iproxy(udid, screen_port) except Exception as e: print(f"[iproxy] 启动失败 {udid}: {e}") # 启动 WDA if version_major >= 17.0: # iOS17+ 走 go-ios,传入回调:WDA 启动后再拿屏幕尺寸 threading.Thread( target=IOSActivator().activate_ios17, args=(udid, self._on_wda_ready), daemon=True, ).start() else: # 旧版本直接用 tidevice 启动 WDA,然后异步获取屏幕尺寸 try: tidevice.Device(udid).app_start(WdaAppBundleId) except Exception as e: print(f"[Add] 使用 tidevice 启动 WDA 失败 {udid}: {e}") else: threading.Thread( target=self._fetch_screen_and_notify, args=(udid,), daemon=True, ).start() # ========================== # WDA 启动回调(iOS17+) # ========================== def _on_wda_ready(self, udid: str): print(f"[WDA] 回调触发,准备获取屏幕信息 udid={udid}") # 稍微等一下再连,避免刚启动时不稳定 time.sleep(1) threading.Thread( target=self._fetch_screen_and_notify, args=(udid,), daemon=True, ).start() # ========================== # 通过 WDA 获取屏幕信息 # ========================== def _screen_info(self, udid: str): try: # 用 USBClient,通过 WDA 功能端口访问 c = wda.USBClient(udid, wdaFunctionPort) size = c.window_size() w = int(size.width) h = int(size.height) s = float(c.scale) # facebook-wda 的 scale 挂在 client 上 print(f"[Screen] 成功获取屏幕 {w}x{h} scale={s} {udid}") return w, h, s except Exception as e: print(f"[Screen] 获取屏幕失败: {e} udid={udid}") return 0, 0, 0.0 # ========================== # 异步获取屏幕尺寸并通知 Flask # ========================== def _fetch_screen_and_notify(self, udid: str): """ 后台线程里多次尝试通过 WDA 获取屏幕尺寸, 成功后更新 model 并发一次 snapshot。 """ max_retry = 15 interval = 1.0 # 给 WDA 一点启动缓冲时间 time.sleep(2.0) for _ in range(max_retry): # 设备已移除就不再尝试 with self._lock: if udid not in self._models: print(f"[Screen] 设备已移除,停止获取屏幕信息 udid={udid}") return w, h, s = self._screen_info(udid) if w > 0 and h > 0: # 更新模型 with self._lock: m = self._models.get(udid) if not m: print(f"[Screen] 模型已不存在,无法更新 udid={udid}") return m.width = w m.height = h m.scale = s print(f"[Screen] 屏幕信息更新完成,准备推送到 Flask udid={udid}") try: self._manager_send() except Exception as e: print(f"[Screen] 发送屏幕更新到 Flask 失败 udid={udid}, err={e}") return time.sleep(interval) print(f"[Screen] 多次尝试仍未获取到屏幕信息 udid={udid}") # ========================== # iproxy 管理 # ========================== def _start_iproxy(self, udid: str, local_port: int): iproxy_path = self._find_iproxy() # 已有进程并且还在跑,就不重复起 p = self._iproxy_process.get(udid) if p is not None and p.poll() is None: print(f"[iproxy] 已存在运行中的进程,跳过 {udid}") return args = [ iproxy_path, "-u", udid, str(local_port), # 本地端口(投屏) "9567" # 手机端口(go-ios screencast) ] print(f"[iproxy] 启动进程: {args}") # 不用 PIPE,防止没人读导致缓冲爆掉;窗口用前面配置隐藏 proc = subprocess.Popen( args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=self._creationflags, startupinfo=self._startupinfo, ) self._iproxy_process[udid] = proc def _stop_iproxy(self, udid: str): p = self._iproxy_process.get(udid) if not p: return try: p.terminate() try: p.wait(timeout=2) except Exception: p.kill() except Exception: pass self._iproxy_process.pop(udid, None) print(f"[iproxy] 已停止 {udid}") # ========================== # 移除设备 # ========================== def _remove_device(self, udid: str): print(f"[Remove] 移除设备 {udid}") # 先停 iproxy self._stop_iproxy(udid) with self._lock: self._models.pop(udid, None) self._last_seen.pop(udid, None) self._manager_send() # ========================== # 工具方法 # ========================== def _find_iproxy(self) -> str: base = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) name = "iproxy.exe" if os.name == "nt" else "iproxy" return os.path.join(base, "resources", "iproxy", name) # ========================== # 同步数据到 Flask # ========================== def _manager_send(self): try: self._send_snapshot_to_flask() except Exception: try: self._manager.start() except Exception: pass try: self._send_snapshot_to_flask() except Exception: pass def _send_snapshot_to_flask(self): with self._lock: devices = [m.toDict() for m in self._models.values()] payload = json.dumps({"devices": devices}, ensure_ascii=False) port = int(os.getenv("FLASK_COMM_PORT", "34566")) with socket.create_connection(("127.0.0.1", port), timeout=1.5) as s: s.sendall(payload.encode() + b"\n") print(f"[SNAPSHOT] 已发送 {len(devices)} 台设备")