import os import signal import subprocess import time from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from typing import Dict, Optional, List import tidevice import wda from tidevice import Usbmux, ConnectionType from tidevice._device import BaseDevice from Entity.DeviceModel import DeviceModel from Entity.Variables import WdaAppBundleId from Module.FlaskSubprocessManager import FlaskSubprocessManager from Utils.LogManager import LogManager class DeviceInfo: def __init__(self): self._port = 9110 self._models: Dict[str, DeviceModel] = {} self._procs: Dict[str, subprocess.Popen] = {} self._manager = FlaskSubprocessManager.get_instance() self._iproxy_path = self._find_iproxy() self._pool = ThreadPoolExecutor(max_workers=6) # ---------------- 主循环 ---------------- def listen(self): while True: online = {d.udid for d in Usbmux().device_list() if d.conn_type == ConnectionType.USB} # 拔掉——同步 for udid in list(self._models): if udid not in online: self._remove_device(udid) # 插上——异步 new = [u for u in online if u not in self._models] if new: futures = {self._pool.submit(self._add_device, u): u for u in new} for f in as_completed(futures, timeout=30): try: f.result() except Exception as e: LogManager.error(f"异步连接失败:{e}") time.sleep(1) # ---------------- 新增设备 ---------------- def _add_device(self, udid: str): if not self._trusted(udid): return r = self.startWda(udid) if r is False: LogManager.info("启动wda失败") return w, h, s = self._screen_info(udid) if w == 0 or h == 0 or s == 0: print("未获取到设备屏幕信息") return port = self._alloc_port() proc = self._start_iproxy(udid, port) if not proc: return model = DeviceModel(deviceId=udid, screenPort=port, width=w, height=h, scale=s, type=1) model.ready = True self._models[udid] = model self._procs[udid] = proc self._manager_send(model) # ---------------- 移除设备 ---------------- def _remove_device(self, udid: str): model = self._models.pop(udid, None) if not model: return model.type = 2 self._kill(self._procs.pop(udid, None)) self._manager_send(model) # ---------------- 工具函数 ---------------- def _trusted(self, udid: str) -> bool: try: BaseDevice(udid).get_value("DeviceName") return True except Exception: return False def startWda(self, udid): print("进入启动wda方法") try: dev = tidevice.Device(udid) print("获取tidevice对象成功,准备启动wda") dev.app_start(WdaAppBundleId) print("启动wda成功") time.sleep(3) return True except Exception as e: print("启动wda遇到错误:", e) return False def _screen_info(self, udid: str): try: c = wda.USBClient(udid, 8100) c.home() size = c.window_size() scale = c.scale return int(size.width), int(size.height), float(scale) return 0, 0, 0 except Exception as e: print("获取设备信息遇到错误:",e) return 0, 0, 0 ... # ---------------- 原来代码不变,只替换下面一个函数 ---------------- def _start_iproxy(self, udid: str, port: int) -> Optional[subprocess.Popen]: try: # 隐藏窗口的核心参数 kw = {"creationflags": subprocess.CREATE_NO_WINDOW} return subprocess.Popen( [self._iproxy_path, "-u", udid, str(port), "9100"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **kw ) except Exception as e: print(e) return None def _kill(self, proc: Optional[subprocess.Popen]): if not proc: return try: proc.terminate() proc.wait(timeout=2) except Exception: try: os.kill(proc.pid, signal.SIGKILL) except Exception: pass def _alloc_port(self) -> int: self._port += 1 return self._port def _manager_send(self, model: DeviceModel): try: self._manager.send(model.toDict()) except Exception: pass def _find_iproxy(self) -> str: base = Path(__file__).resolve().parent.parent name = "iproxy.exe" path = base / "resources" / "iproxy" / name print(str(path)) if path.is_file(): return str(path) raise FileNotFoundError(f"iproxy 不存在: {path}") # ------------ Windows 专用:列出所有 iproxy 命令行 ------------ def _get_all_iproxy_cmdlines(self) -> List[str]: try: raw = subprocess.check_output( ['wmic', 'process', 'where', "name='iproxy.exe'", 'get', 'CommandLine,ProcessId', '/value'], stderr=subprocess.DEVNULL, text=True ) except subprocess.CalledProcessError: return [] lines: List[str] = [] for block in raw.split('\n\n'): cmd = pid = '' for line in block.splitlines(): line = line.strip() if line.startswith('CommandLine='): cmd = line[len('CommandLine='):].strip() elif line.startswith('ProcessId='): pid = line[len('ProcessId='):].strip() if cmd and pid and '-u' in cmd: lines.append(f'{cmd} {pid}') return lines # ------------ 杀孤儿 ------------ def _cleanup_orphan_iproxy(self): live_udids = set(self._models.keys()) for ln in self._get_all_iproxy_cmdlines(): parts = ln.split() try: udid = parts[parts.index('-u') + 1] pid = int(parts[-1]) if udid not in live_udids: self._kill_pid_gracefully(pid) LogManager.warning(f'扫到孤儿 iproxy,已清理 {udid} PID={pid}') except (ValueError, IndexError): continue # ------------ 按 PID 强杀 ------------ def _kill_pid_gracefully(self, pid: int): try: os.kill(pid, signal.SIGTERM) time.sleep(1) os.kill(pid, signal.SIGKILL) except Exception: pass