# -*- coding: utf-8 -*- import os import signal import sys import time import wda import threading import subprocess from pathlib import Path from typing import List, Dict, Optional from tidevice import Usbmux, ConnectionType from tidevice._device import BaseDevice from Entity.DeviceModel import DeviceModel from Entity.Variables import WdaAppBundleId from Module.FlaskSubprocessManager import FlaskSubprocessManager from Utils.LogManager import LogManager class Deviceinfo(object): """设备生命周期管理:以 deviceModelList 为唯一真理源""" def __init__(self): self.deviceIndex = 0 self.screenProxy = 9110 self.pidList: List[Dict] = [] # 仅记录 iproxy 进程 self.manager = FlaskSubprocessManager.get_instance() self.deviceModelList: List[DeviceModel] = [] # 根基,不动 self.maxDeviceCount = 6 self._lock = threading.Lock() self._model_index: Dict[str, DeviceModel] = {} # udid -> model # ✅ 1. 失踪时间戳记录(替代原来的 miss_count) self._last_seen: Dict[str, float] = {} self._port_pool: List[int] = [] self._port_in_use: set[int] = set() # region iproxy 初始化(原逻辑不变) try: self.iproxy_path = self._iproxy_path() self.iproxy_dir = self.iproxy_path.parent os.environ["PATH"] = str(self.iproxy_dir) + os.pathsep + os.environ.get("PATH", "") try: os.add_dll_directory(str(self.iproxy_dir)) except Exception: pass self._creationflags = 0x08000000 if os.name == "nt" else 0 self._popen_kwargs = dict( stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=str(self.iproxy_dir), shell=False, text=True, creationflags=self._creationflags, encoding="utf-8", bufsize=1, ) def _spawn_iproxy(udid: str, local_port: int, remote_port: int = 9100) -> subprocess.Popen: args = [str(self.iproxy_path), "-u", udid, str(local_port), str(remote_port)] p = subprocess.Popen(args, **self._popen_kwargs) def _pipe_to_log(name: str, stream): try: for line in iter(stream.readline, ''): s = line.strip() if s: LogManager.info(f"[iproxy {name}] {s}", udid) except Exception: pass threading.Thread(target=_pipe_to_log, args=("STDOUT", p.stdout), daemon=True).start() threading.Thread(target=_pipe_to_log, args=("STDERR", p.stderr), daemon=True).start() return p self._spawn_iproxy = _spawn_iproxy LogManager.info(f"iproxy 启动器已就绪,目录: {self.iproxy_dir}") except Exception as e: self.iproxy_path = None self.iproxy_dir = None self._spawn_iproxy = None LogManager.error(f"初始化 iproxy 失败:{e}") # endregion # ------------------------------------------------------------------ # ✅ 2. 主监听循环(已用“时间窗口+USB 层兜底”重写) # ------------------------------------------------------------------ def startDeviceListener(self): MISS_WINDOW = 5.0 # 5 秒连续失踪才判死刑 while True: try: lists = Usbmux().device_list() except Exception as e: LogManager.warning(f"usbmuxd 连接失败: {e},2 秒后重试") time.sleep(2) continue now_udids = {d.udid for d in lists if d.conn_type == ConnectionType.USB} # ✅ USB 层真断兜底 usb_sn_set = self._usb_enumerate_sn() need_remove = None with self._lock: for udid in list(self._model_index.keys()): if udid not in now_udids: last = self._last_seen.get(udid, time.time()) if time.time() - last > MISS_WINDOW and udid not in usb_sn_set: need_remove = udid else: self._last_seen[udid] = time.time() if need_remove: self._remove_model(need_remove) # 新增设备(原逻辑不变) for d in lists: if d.conn_type != ConnectionType.USB: continue udid = d.udid with self._lock: if udid in self._model_index: continue if not self.is_device_trusted(udid): continue if len(self.deviceModelList) >= self.maxDeviceCount: continue try: self.connectDevice(udid) except Exception as e: LogManager.error(f"连接设备失败 {udid}: {e}", udid) time.sleep(1) # ------------------------------------------------------------------ # ✅ 3. USB 层枚举 SN(跨平台) # ------------------------------------------------------------------ def _usb_enumerate_sn(self) -> set[str]: try: out = subprocess.check_output(["idevice_id", "-l"], text=True, timeout=3) return {line.strip() for line in out.splitlines() if line.strip()} except Exception: return set() # ===================== 以下代码与原文件完全一致 ===================== def _wda_health_checker(self): while True: time.sleep(1) with self._lock: online = [m for m in self.deviceModelList if m.ready] for model in online: udid = model.deviceId if not self._wda_ok(udid): LogManager.warning(f"WDA 异常,重启通道:{udid}", udid) with self._lock: self._remove_model(udid) self.connectDevice(udid) def _wda_ok(self, udid: str) -> bool: try: c = wda.USBClient(udid, 8100) st = c.status() if st.get("state") != "success": return False return True except Exception as e: LogManager.error(f"WDA health-check 异常:{e}", udid) return False # -------------------- 增删改查唯一入口(未改动) -------------------- def _has_model(self, udid: str) -> bool: return udid in self._model_index def _add_model(self, model: DeviceModel): if model.deviceId in self._model_index: return model.ready = True self.deviceModelList.append(model) self._model_index[model.deviceId] = model try: self.manager.send(model.toDict()) except Exception as e: LogManager.warning(f"{model.deviceId} 发送上线事件失败:{e}") LogManager.method_info(f"{model.deviceId} 加入设备成功,当前在线数:{len(self.deviceModelList)}", method="device_count") def _remove_model(self, udid: str): print(f"【删】进入删除方法 udid={udid}") LogManager.method_info(f"【删】进入删除方法 udid={udid}", method="device_count") with self._lock: print(f"【删】拿到锁 udid={udid}") LogManager.method_info(f"【删】拿到锁 udid={udid}", method="device_count") model = self._model_index.pop(udid, None) if not model: print(f"【删】模型已空,直接返回 udid={udid}") LogManager.method_info(f"【删】模型已空,直接返回 udid={udid}", method="device_count") return if model.deleting: print(f"【删】正在删除中,幂等返回 udid={udid}") LogManager.method_info(method="device_count", text=f"【删】正在删除中,幂等返回 udid={udid}") return model.deleting = True model.type = 2 print(f"【删】标记 deleting=True udid={udid}") LogManager.method_info("【删】标记 deleting=True udid={udid}", "device_count") before = len(self.deviceModelList) self.deviceModelList = [m for m in self.deviceModelList if m.deviceId != udid] after = len(self.deviceModelList) print(f"【删】列表过滤 before={before} → after={after} udid={udid}") LogManager.method_info(f"【删】列表过滤 before={before} → after={after} udid={udid}", "device_count") self._port_in_use.discard(model.screenPort) self._port_pool.append(model.screenPort) print(f"【删】回收端口 port={model.screenPort} udid={udid}") LogManager.method_info(f"【删】回收端口 port={model.screenPort} udid={udid}", method="device_count") to_kill = [item for item in self.pidList if item.get("id") == udid] self.pidList = [item for item in self.pidList if item.get("id") != udid] print(f"【删】待杀进程数 count={len(to_kill)} udid={udid}") LogManager.method_info(f"【删】待杀进程数 count={len(to_kill)} udid={udid}", method="device_count") for idx, item in enumerate(to_kill, 1): print(f"【删】杀进程 {idx}/{len(to_kill)} pid={item.get('target').pid} udid={udid}") LogManager.method_info(f"【删】杀进程 {idx}/{len(to_kill)} pid={item.get('target').pid} udid={udid}", method="device_count") self._terminate_proc(item.get("target")) print(f"【删】进程清理完成 udid={udid}") LogManager.method_info(f"【删】进程清理完成 udid={udid}", method="device_count") retry = 3 while retry: try: self.manager.send(model.toDict()) print(f"【删】下线事件已发送 udid={udid}") LogManager.method_info(f"【删】下线事件已发送 udid={udid}", method="device_count") break except Exception as e: retry -= 1 print(f"【删】发送事件失败 retry={retry} err={e} udid={udid}") LogManager.method_error(f"【删】发送事件失败 retry={retry} err={e} udid={udid}", method="device_count") time.sleep(0.2) else: print(f"【删】发送事件彻底失败,主动退出 udid={udid}") LogManager.method_error(f"【删】发送事件彻底失败,主动退出 udid={udid}", method="device_count") print(f"【删】===== 设备 {udid} 删除全流程结束 =====") LogManager.method_info(f"【删】===== 设备 {udid} 删除全流程结束 =====", method="device_count") print(len(self.deviceModelList)) LogManager.method_info(f"当前剩余设备数量:{len(self.deviceModelList)}", method="device_count") # -------------------- 端口分配与回收(未改动) -------------------- def _alloc_port(self) -> int: if self._port_pool: port = self._port_pool.pop() else: self.screenProxy += 1 port = self.screenProxy self._port_in_use.add(port) return port def _free_port(self, port: int): if port in self._port_in_use: self._port_in_use.remove(port) self._port_pool.append(port) # -------------------- 单台设备连接(未改动) -------------------- def connectDevice(self, udid: str): if not self.is_device_trusted(udid): LogManager.warning("设备未信任,跳过 WDA 启动", udid) return try: d = wda.USBClient(udid, 8100) except Exception as e: LogManager.error(f"启动 WDA 失败: {e}", udid) return width, height, scale = 0, 0, 1.0 try: size = d.window_size() width, height = size.width, size.height scale = d.scale except Exception as e: LogManager.warning(f"读取屏幕信息失败:{e}", udid) port = self._alloc_port() model = DeviceModel(udid, port, width, height, scale, type=1) self._add_model(model) try: d.app_start(WdaAppBundleId) d.home() except Exception as e: LogManager.warning(f"启动/切回桌面失败:{e}", udid) time.sleep(2) self.pidList = [item for item in self.pidList if item.get("id") != udid] target = self.relayDeviceScreenPort(udid, port) if target: self.pidList.append({"target": target, "id": udid}) # -------------------- 工具方法(未改动) -------------------- def is_device_trusted(self, udid: str) -> bool: try: d = BaseDevice(udid) d.get_value("DeviceName") return True except Exception: return False def relayDeviceScreenPort(self, udid: str, port: int) -> Optional[subprocess.Popen]: if not self._spawn_iproxy: LogManager.error("iproxy 启动器未就绪", udid) return None while self._port_in_use and self._is_port_open(port): pid = self._get_pid_by_port(port) if pid and pid != os.getpid(): LogManager.warning(f"端口 {port} 仍被 PID {pid} 占用,尝试释放", udid) self._kill_pid_gracefully(pid) else: break try: p = self._spawn_iproxy(udid, port, 9100) self._port_in_use.add(port) LogManager.info(f"启动 iproxy 成功,本地 {port} -> 设备 9100", udid) return p except Exception as e: LogManager.error(f"启动 iproxy 失败:{e}", udid) return None def _is_port_open(self, port: int) -> bool: import socket with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: return s.connect_ex(("127.0.0.1", port)) == 0 def _get_pid_by_port(self, port: int) -> Optional[int]: try: if os.name == "nt": cmd = ["netstat", "-ano", "-p", "tcp"] out = subprocess.check_output(cmd, text=True) for line in out.splitlines(): if f"127.0.0.1:{port}" in line and "LISTENING" in line: return int(line.strip().split()[-1]) else: cmd = ["lsof", "-t", f"-iTCP:{port}", "-sTCP:LISTEN"] out = subprocess.check_output(cmd, text=True) return int(out.strip().split()[0]) except Exception: return None def _kill_pid_gracefully(self, pid: int): try: os.kill(pid, signal.SIGTERM) time.sleep(1) os.kill(pid, signal.SIGKILL) except Exception: pass def _terminate_proc(self, p: Optional[subprocess.Popen]): if not p or p.poll() is not None: return try: p.terminate() p.wait(timeout=3) except Exception: try: if os.name == "posix": os.killpg(os.getpgid(p.pid), signal.SIGKILL) else: p.kill() p.wait(timeout=2) except Exception: pass def _base_dir(self) -> Path: if getattr(sys, "frozen", False): return Path(sys.executable).resolve().parent return Path(__file__).resolve().parents[1] def _iproxy_path(self) -> Path: exe = "iproxy.exe" if os.name == "nt" else "iproxy" base = self._base_dir() candidates = [base / "resources" / "iproxy" / exe] for p in candidates: if p.exists(): return p raise FileNotFoundError(f"iproxy not found, tried: {[str(c) for c in candidates]}")