import os import signal import subprocess import time from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from typing import Dict, Optional, List import tidevice import wda from tidevice import Usbmux, ConnectionType from tidevice._device import BaseDevice from Entity.DeviceModel import DeviceModel from Entity.Variables import WdaAppBundleId from Module.FlaskSubprocessManager import FlaskSubprocessManager from Utils.LogManager import LogManager import socket import http.client from collections import defaultdict import psutil class DeviceInfo: def __init__(self): self._port = 9110 self._models: Dict[str, DeviceModel] = {} self._procs: Dict[str, subprocess.Popen] = {} self._manager = FlaskSubprocessManager.get_instance() self._iproxy_path = self._find_iproxy() self._pool = ThreadPoolExecutor(max_workers=6) self._last_heal_check_ts = 0.0 self._heal_backoff: Dict[str, float] = defaultdict(float) # udid -> next_allowed_ts # ---------------- 主循环 ---------------- def listen(self): orphan_gc_tick = 0 while True: online = {d.udid for d in Usbmux().device_list() if d.conn_type == ConnectionType.USB} # 拔掉——同步 for udid in list(self._models): if udid not in online: self._remove_device(udid) # 插上——异步 new = [u for u in online if u not in self._models] if new: futures = {self._pool.submit(self._add_device, u): u for u in new} for f in as_completed(futures, timeout=30): try: f.result() except Exception as e: LogManager.error(f"异步连接失败:{e}") # 定期健康检查 + 自愈 self._check_and_heal_tunnels(interval=2.0) # 每 10 次(约10秒)清理一次孤儿 iproxy orphan_gc_tick += 1 if orphan_gc_tick >= 10: orphan_gc_tick = 0 self._cleanup_orphan_iproxy() time.sleep(1) # ---------------- 新增设备 ---------------- def _add_device(self, udid: str): if not self._trusted(udid): return r = self.startWda(udid) if r is False: LogManager.info("启动wda失败") return w, h, s = self._screen_info(udid) if w == 0 or h == 0 or s == 0: print("未获取到设备屏幕信息") return port = self._alloc_port() proc = self._start_iproxy(udid, port) if not proc: return model = DeviceModel(deviceId=udid, screenPort=port, width=w, height=h, scale=s, type=1) model.ready = True self._models[udid] = model self._procs[udid] = proc self._manager_send(model) # ---------------- 移除设备 ---------------- def _remove_device(self, udid: str): model = self._models.pop(udid, None) if not model: return model.type = 2 self._kill(self._procs.pop(udid, None)) self._manager_send(model) # ---------------- 工具函数 ---------------- def _trusted(self, udid: str) -> bool: try: BaseDevice(udid).get_value("DeviceName") return True except Exception: return False def startWda(self, udid): print("进入启动wda方法") try: dev = tidevice.Device(udid) print("获取tidevice对象成功,准备启动wda") dev.app_start(WdaAppBundleId) print("启动wda成功") time.sleep(3) return True except Exception as e: print("启动wda遇到错误:", e) return False def _screen_info(self, udid: str): try: c = wda.USBClient(udid, 8100) c.home() size = c.window_size() scale = c.scale return int(size.width), int(size.height), float(scale) except Exception as e: print("获取设备信息遇到错误:", e) return 0, 0, 0 ... # ---------------- 原来代码不变,只替换下面一个函数 ---------------- def _start_iproxy(self, udid: str, port: int) -> Optional[subprocess.Popen]: try: # 确保端口空闲;不空闲则尝试换一个 if not self._is_port_free(port): port = self._pick_free_port(max(self._port, port)) # 隐藏窗口 & 独立进程组(更好地终止) flags = subprocess.CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP return subprocess.Popen( [self._iproxy_path, "-u", udid, str(port), "9100"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=flags ) except Exception as e: print(e) return None def _kill(self, proc: Optional[subprocess.Popen]): if not proc: return try: proc.terminate() proc.wait(timeout=2) except Exception: try: os.kill(proc.pid, signal.SIGKILL) except Exception: pass def _alloc_port(self) -> int: return self._pick_free_port(max(self._port, self._port)) def _manager_send(self, model: DeviceModel): try: self._manager.send(model.toDict()) except Exception: pass def _find_iproxy(self) -> str: base = Path(__file__).resolve().parent.parent name = "iproxy.exe" path = base / "resources" / "iproxy" / name print(str(path)) if path.is_file(): return str(path) raise FileNotFoundError(f"iproxy 不存在: {path}") # ------------ Windows 专用:列出所有 iproxy 命令行 ------------ def _get_all_iproxy_cmdlines(self) -> List[str]: """ 使用 psutil 枚举 iproxy 进程,避免调用 wmic 造成的黑框闪烁。 返回形如:"<完整命令行> " 的列表(兼容你后续的解析逻辑)。 """ lines: List[str] = [] for p in psutil.process_iter(attrs=["name", "cmdline", "pid"]): try: name = (p.info.get("name") or "").lower() if name != "iproxy.exe": continue cmdline = p.info.get("cmdline") or [] if not cmdline: continue # 与原逻辑保持一致:仅收集包含 -u 的 iproxy(我们需要解析 udid) if "-u" in cmdline: cmd = " ".join(cmdline) lines.append(f"{cmd} {p.info['pid']}") except (psutil.NoSuchProcess, psutil.AccessDenied): continue return lines # ------------ 杀孤儿 ------------ def _cleanup_orphan_iproxy(self): live_udids = set(self._models.keys()) for ln in self._get_all_iproxy_cmdlines(): parts = ln.split() try: udid = parts[parts.index('-u') + 1] pid = int(parts[-1]) if udid not in live_udids: self._kill_pid_gracefully(pid) LogManager.warning(f'扫到孤儿 iproxy,已清理 {udid} PID={pid}') except (ValueError, IndexError): continue # ------------ 按 PID 强杀 ------------ def _kill_pid_gracefully(self, pid: int): try: p = psutil.Process(pid) p.terminate() try: p.wait(timeout=1.0) except psutil.TimeoutExpired: p.kill() except Exception: pass def _is_port_free(self, port: int) -> bool: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.settimeout(0.2) try: s.bind(("127.0.0.1", port)) return True except OSError: return False def _pick_free_port(self, start: int = None, limit: int = 2000) -> int: """从 start 起向上找一个空闲端口。""" p = self._port if start is None else start tried = 0 while tried < limit: p += 1 tried += 1 if self._is_port_free(p): self._port = p # 更新游标 return p raise RuntimeError("未找到可用端口(扫描范围内)") def _health_check_mjpeg(self, port: int, timeout: float = 1.0) -> bool: """ 对 http://127.0.0.1:/ 做非常轻量的探活。 WDA mjpegServer(默认9100)通常根路径就会有 multipart/x-mixed-replace。 """ try: conn = http.client.HTTPConnection("127.0.0.1", port, timeout=timeout) conn.request("GET", "/") resp = conn.getresponse() # 2xx/3xx 都算活;某些构建下会是 200 带 multipart,也可能 302 alive = 200 <= resp.status < 400 # 尽量少读:只读很少字节避免成本 try: resp.read(256) except Exception: pass conn.close() return alive except Exception: return False def _restart_iproxy(self, udid: str): """重启某个 udid 的 iproxy(带退避)""" now = time.time() next_allowed = self._heal_backoff[udid] if now < next_allowed: return # 处于退避窗口内,先不重启 proc = self._procs.get(udid) if proc: self._kill(proc) # 让端口真正释放 time.sleep(0.3) model = self._models.get(udid) if not model: return # 如果端口被别的进程占用了,换一个新端口并通知管理器 if not self._is_port_free(model.screenPort): new_port = self._pick_free_port(max(self._port, model.screenPort)) model.screenPort = new_port self._models[udid] = model self._manager_send(model) # 通知前端/上位机端口变化 proc2 = self._start_iproxy(udid, model.screenPort) if not proc2: # 启动失败,设置退避(逐步增加上限) self._heal_backoff[udid] = now + 2.0 return self._procs[udid] = proc2 # 成功后缩短退避 self._heal_backoff[udid] = now + 0.5 def _check_and_heal_tunnels(self, interval: float = 2.0): """ 定期巡检所有在线设备的本地映射端口是否“活着”,不活就重启 iproxy。 """ now = time.time() if now - self._last_heal_check_ts < interval: return self._last_heal_check_ts = now for udid, model in list(self._models.items()): port = model.screenPort if port <= 0: continue ok = self._health_check_mjpeg(port, timeout=0.8) if not ok: LogManager.warning(f"端口失活,准备自愈:udid={udid} port={port}") self._restart_iproxy(udid)