# -*- coding: utf-8 -*- import os import signal import sys import time import wda import threading import subprocess from pathlib import Path from typing import List, Dict, Optional from tidevice import Usbmux, ConnectionType from tidevice._device import BaseDevice from Entity.DeviceModel import DeviceModel from Entity.Variables import WdaAppBundleId from Module.FlaskSubprocessManager import FlaskSubprocessManager from Utils.LogManager import LogManager class Deviceinfo(object): """设备生命周期管理:以 deviceModelList 为唯一真理源""" def __init__(self): self.deviceIndex = 0 self.screenProxy = 9110 self.pidList: List[Dict] = [] # 仅记录 iproxy 进程 self.manager = FlaskSubprocessManager.get_instance() self.deviceModelList: List[DeviceModel] = [] # 根基,不动 self.maxDeviceCount = 6 self._lock = threading.Lock() self._model_index: Dict[str, DeviceModel] = {} # udid -> model self._miss_count: Dict[str, int] = {} # udid -> 连续未扫描到次数 self._port_pool: List[int] = [] # 端口回收池 self._port_in_use: set[int] = set() # 正在使用的端口 # region iproxy 初始化 try: self.iproxy_path = self._iproxy_path() self.iproxy_dir = self.iproxy_path.parent os.environ["PATH"] = str(self.iproxy_dir) + os.pathsep + os.environ.get("PATH", "") try: os.add_dll_directory(str(self.iproxy_dir)) except Exception: pass self._creationflags = 0x08000000 if os.name == "nt" else 0 self._popen_kwargs = dict( stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=str(self.iproxy_dir), shell=False, text=True, creationflags=self._creationflags, encoding="utf-8", bufsize=1, ) def _spawn_iproxy(udid: str, local_port: int, remote_port: int = 9100) -> subprocess.Popen: args = [str(self.iproxy_path), "-u", udid, str(local_port), str(remote_port)] p = subprocess.Popen(args, **self._popen_kwargs) def _pipe_to_log(name: str, stream): try: for line in iter(stream.readline, ''): s = line.strip() if s: LogManager.info(f"[iproxy {name}] {s}", udid) except Exception: pass threading.Thread(target=_pipe_to_log, args=("STDOUT", p.stdout), daemon=True).start() threading.Thread(target=_pipe_to_log, args=("STDERR", p.stderr), daemon=True).start() return p self._spawn_iproxy = _spawn_iproxy LogManager.info(f"iproxy 启动器已就绪,目录: {self.iproxy_dir}") except Exception as e: self.iproxy_path = None self.iproxy_dir = None self._spawn_iproxy = None LogManager.error(f"初始化 iproxy 失败:{e}") # endregion def startDeviceListener(self): """死循环监听设备插拔;以 deviceModelList 为准""" while True: try: lists = Usbmux().device_list() except Exception as e: LogManager.warning(f"usbmuxd 连接失败: {e},2 秒后重试") time.sleep(2) continue now_udids = {d.udid for d in lists if d.conn_type == ConnectionType.USB} # 0. 首次失踪登记:已在线设备若突然扫不到,计数器归零 with self._lock: for udid in list(self._model_index.keys()): if udid not in now_udids and udid not in self._miss_count: self._miss_count[udid] = 0 LogManager.info(f"[DEBUG] 首次失踪登记:{udid}", udid) # 1. 处理已在线设备的失联计数 with self._lock: for udid in list(self._miss_count.keys()): if udid not in now_udids: self._miss_count[udid] += 1 LogManager.info(f"[DEBUG] 累加 {udid} -> {self._miss_count[udid]}", udid) if self._miss_count[udid] >= 3: print("设备下线了") LogManager.info(f"[DEBUG] 触发下线 {udid}", udid) self._remove_model(udid) self._miss_count.pop(udid, None) else: LogManager.info(f"[DEBUG] 设备仍在,清零 {udid}", udid) self._miss_count.pop(udid, None) # 2. 处理新插入 for d in lists: if d.conn_type != ConnectionType.USB: continue udid = d.udid with self._lock: if udid in self._model_index: continue # 已存在 if not self.is_device_trusted(udid): LogManager.warning("设备未信任,跳过", udid) continue if len(self.deviceModelList) >= self.maxDeviceCount: continue try: self.connectDevice(udid) # 内部会 _add_model except Exception as e: LogManager.error(f"连接设备失败 {udid}: {e}", udid) time.sleep(1) # endregion # region ===================== 增删改查唯一入口(线程安全) ===================== def _has_model(self, udid: str) -> bool: with self._lock: return udid in self._model_index def _add_model(self, model: DeviceModel): with self._lock: if model.deviceId in self._model_index: return # 防重复 self.deviceModelList.append(model) self._model_index[model.deviceId] = model try: self.manager.send(model.toDict()) except Exception as e: LogManager.warning(f"发送上线事件失败:{e}", model.deviceId) LogManager.info(f"设备上线,当前在线数:{len(self.deviceModelList)}", model.deviceId) def _remove_model(self, udid: str): model = self._model_index.pop(udid, None) if not model: return model.type = 2 print(model.toDict()) # ① 关键:重试 3 次,必须送达,否则崩溃 retry = 3 while retry: try: self.manager.send(model.toDict()) break except Exception as e: retry -= 1 LogManager.error(f"发送下线事件失败,剩余重试 {retry}:{e}", udid) time.sleep(0.2) else: LogManager.error("发送下线事件彻底失败,主动崩溃防止状态不一致", udid) os._exit(1) # ② 安全删除 try: idx = self.deviceModelList.index(model) self.deviceModelList.pop(idx) print(len(self.deviceModelList)) except Exception as e: print("22222222") print(f"[FlaskSubprocessManager] 发送失败,异常类型:{type(e).__name__},内容:{e}") # ③ 回收端口 self._free_port(model.screenPort) print("继续执行了") # ④ 清理 iproxy survivors = [item for item in self.pidList if item.get("id") != udid] for item in self.pidList: if item.get("id") == udid: self._terminate_proc(item.get("target")) self.pidList = survivors print("设备下线。删除设备成功") LogManager.info(f"设备下线,当前在线数:{len(self.deviceModelList)}", udid) LogManager.info(f"[Deviceinfo] 下线包已送进队列 -> type=2", udid) # endregion # region ===================== 端口分配与回收 ===================== def _alloc_port(self) -> int: with self._lock: if self._port_pool: port = self._port_pool.pop() else: self.screenProxy += 1 port = self.screenProxy self._port_in_use.add(port) return port def _free_port(self, port: int): with self._lock: if port in self._port_in_use: self._port_in_use.remove(port) self._port_pool.append(port) # endregion # region ===================== 单台设备连接 ===================== def connectDevice(self, udid: str): if not self.is_device_trusted(udid): LogManager.warning("设备未信任,跳过 WDA 启动", udid) return if self._has_model(udid): LogManager.warning("设备已存在,跳过重复连接", udid) return try: d = wda.USBClient(udid, 8100) except Exception as e: LogManager.error(f"启动 WDA 失败: {e}", udid) return width, height, scale = 0, 0, 1.0 try: size = d.window_size() width, height = size.width, size.height scale = d.scale except Exception as e: LogManager.warning(f"读取屏幕信息失败:{e}", udid) port = self._alloc_port() model = DeviceModel(udid, port, width, height, scale, type=1) self._add_model(model) try: d.app_start(WdaAppBundleId) d.home() except Exception as e: LogManager.warning(f"启动/切回桌面失败:{e}", udid) time.sleep(2) # 先清旧进程再启动新进程 with self._lock: self.pidList = [item for item in self.pidList if item.get("id") != udid] target = self.relayDeviceScreenPort(udid, port) if target: with self._lock: self.pidList.append({"target": target, "id": udid}) # endregion # region ===================== 工具方法 ===================== def is_device_trusted(self, udid: str) -> bool: try: d = BaseDevice(udid) d.get_value("DeviceName") return True except Exception: return False def relayDeviceScreenPort(self, udid: str, port: int) -> Optional[subprocess.Popen]: if not self._spawn_iproxy: LogManager.error("iproxy 启动器未就绪,无法建立端口映射", udid) return None try: p = self._spawn_iproxy(udid, port, 9100) LogManager.info(f"启动 iproxy 成功,本地 {port} -> 设备 9100", udid) return p except Exception as e: LogManager.error(f"启动 iproxy 失败:{e}", udid) return None def _terminate_proc(self, p: Optional[subprocess.Popen]): if not p or p.poll() is not None: return try: p.terminate() p.wait(timeout=3) except Exception: try: if os.name == "posix": os.killpg(os.getpgid(p.pid), signal.SIGKILL) else: p.kill() p.wait(timeout=2) except Exception: pass def _base_dir(self) -> Path: if getattr(sys, "frozen", False): return Path(sys.executable).resolve().parent return Path(__file__).resolve().parents[1] def _iproxy_path(self) -> Path: exe = "iproxy.exe" if os.name == "nt" else "iproxy" base = self._base_dir() candidates = [base / "resources" / "iproxy" / exe] for p in candidates: if p.exists(): return p raise FileNotFoundError(f"iproxy not found, tried: {[str(c) for c in candidates]}") # endregion