# -*- coding: utf-8 -*- import os import signal import subprocess import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from typing import Dict, Optional, List import random import socket import http.client import psutil import hashlib # 仍保留,如需后续扩展 import tidevice import wda from tidevice import Usbmux, ConnectionType from tidevice._device import BaseDevice from Entity.DeviceModel import DeviceModel from Entity.Variables import WdaAppBundleId, wdaFunctionPort, wdaScreenPort from Module.FlaskSubprocessManager import FlaskSubprocessManager from Module.IOSActivator import IOSActivator from Utils.LogManager import LogManager class DeviceInfo: # --- 时序参数(更稳) --- REMOVE_GRACE_SEC = 8.0 # 设备离线宽限期(秒) ADD_STABLE_SEC = 2.5 # 设备上线稳定期(秒) ORPHAN_COOLDOWN = 8.0 # 拓扑变更后暂停孤儿清理(秒) HEAL_INTERVAL = 5.0 # 健康巡检间隔(秒) def __init__(self): # 自增端口游标仅作兜底扫描使用 self._port = 9110 self._models: Dict[str, DeviceModel] = {} self._procs: Dict[str, subprocess.Popen] = {} self._manager = FlaskSubprocessManager.get_instance() self._iproxy_path = self._find_iproxy() self._pool = ThreadPoolExecutor(max_workers=6) self._last_heal_check_ts = 0.0 self._heal_backoff: Dict[str, float] = {} # udid -> next_allowed_ts # 并发保护 & 状态表 self._lock = threading.RLock() self._port_by_udid: Dict[str, int] = {} # UDID -> 当前使用的本地端口 self._pid_by_udid: Dict[str, int] = {} # UDID -> iproxy PID # 抗抖 self._last_seen: Dict[str, float] = {} # udid -> ts self._first_seen: Dict[str, float] = {} # udid -> ts(首次在线) self._last_topology_change_ts = 0.0 LogManager.info("DeviceInfo init 完成;日志已启用", udid="system") # ---------------- 主循环 ---------------- def listen(self): method = "listen" LogManager.method_info("进入主循环", method, udid="system") orphan_gc_tick = 0 while True: now = time.time() try: usb = Usbmux().device_list() online_now = {d.udid for d in usb if d.conn_type == ConnectionType.USB} except Exception as e: LogManager.warning(f"[device_list] 异常:{e}", udid="system") time.sleep(1) continue # 记录“看到”的时间戳 for u in online_now: if u not in self._first_seen: self._first_seen[u] = now LogManager.method_info("first seen", method, udid=u) self._last_seen[u] = now with self._lock: known = set(self._models.keys()) # 真正移除(连续缺席超过宽限期) for udid in list(known): last = self._last_seen.get(udid, 0.0) if udid not in online_now and (now - last) >= self.REMOVE_GRACE_SEC: LogManager.info(f"设备判定离线(超过宽限期 {self.REMOVE_GRACE_SEC}s),last_seen={last}", udid=udid) self._remove_device(udid) self._last_topology_change_ts = now # 真正新增(连续在线超过稳定期) new_candidates = [u for u in online_now if u not in known] to_add = [u for u in new_candidates if (now - self._first_seen.get(u, now)) >= self.ADD_STABLE_SEC] if to_add: LogManager.info(f"新增设备稳定上线:{to_add}", udid="system") futures = {self._pool.submit(self._add_device, u): u for u in to_add} for f in as_completed(futures, timeout=45): try: f.result() self._last_topology_change_ts = time.time() except Exception as e: LogManager.error(f"异步连接失败:{e}", udid="system") # 定期健康检查 + 自愈 self._check_and_heal_tunnels(interval=self.HEAL_INTERVAL) # 周期性孤儿清理(拓扑变更冷却之后) orphan_gc_tick += 1 if orphan_gc_tick >= 10: orphan_gc_tick = 0 if (time.time() - self._last_topology_change_ts) >= self.ORPHAN_COOLDOWN: self._cleanup_orphan_iproxy() time.sleep(1) # ---------------- 新增设备 ---------------- def _add_device(self, udid: str): method = "_add_device" LogManager.method_info("开始新增设备", method, udid=udid) if not self._trusted(udid): LogManager.method_warning("未信任设备,跳过", method, udid=udid) return r = self.startWda(udid) if r is False: LogManager.method_error("启动 WDA 失败,放弃新增", method, udid=udid) return # iOS 17+ 激活/信任阶段更抖,稍等更稳 time.sleep(5) w, h, s = self._screen_info(udid) if w == 0 or h == 0 or s == 0: LogManager.method_warning("未获取到屏幕信息,放弃新增", method, udid=udid) return # 不复用端口:直接起一个新端口 proc = self._start_iproxy(udid, port=None) if not proc: LogManager.method_error("启动 iproxy 失败,放弃新增", method, udid=udid) return with self._lock: port = self._port_by_udid[udid] model = DeviceModel(deviceId=udid, screenPort=port, width=w, height=h, scale=s, type=1) model.ready = True self._models[udid] = model self._procs[udid] = proc LogManager.method_info(f"设备添加完成,port={port}, {w}x{h}@{s}", method, udid=udid) self._manager_send(model) # ---------------- 移除设备 ---------------- def _remove_device(self, udid: str): method = "_remove_device" LogManager.method_info("开始移除设备", method, udid=udid) with self._lock: model = self._models.pop(udid, None) proc = self._procs.pop(udid, None) pid = self._pid_by_udid.pop(udid, None) self._port_by_udid.pop(udid, None) if not model: LogManager.method_warning("未找到设备模型,可能重复移除", method, udid=udid) return model.type = 2 self._kill(proc) if pid: self._kill_pid_gracefully(pid) self._manager_send(model) LogManager.method_info("设备移除完毕", method, udid=udid) # ---------------- 工具函数 ---------------- def _trusted(self, udid: str) -> bool: try: BaseDevice(udid).get_value("DeviceName") return True except Exception: return False def startWda(self, udid): method = "startWda" LogManager.method_info("进入启动流程", method, udid=udid) try: dev = tidevice.Device(udid) systemVersion = int(dev.product_version.split(".")[0]) if systemVersion > 17: LogManager.method_info(f"iOS 主版本 {systemVersion},使用 IOSActivator", method, udid=udid) ios = IOSActivator() threading.Thread(target=ios.activate, args=(udid,), daemon=True).start() else: LogManager.method_info(f"app_start WDA: {WdaAppBundleId}", method, udid=udid) dev.app_start(WdaAppBundleId) LogManager.method_info("WDA 启动完成,等待稳定...", method, udid=udid) time.sleep(3) return True except Exception as e: LogManager.method_error(f"WDA 启动异常:{e}", method, udid=udid) return False def _screen_info(self, udid: str): method = "_screen_info" try: c = wda.USBClient(udid, wdaFunctionPort) c.home() size = c.window_size() scale = c.scale LogManager.method_info(f"屏幕信息:{int(size.width)}x{int(size.height)}@{float(scale)}", method, udid=udid) return int(size.width), int(size.height), float(scale) except Exception as e: LogManager.method_warning(f"获取屏幕信息异常:{e}", method, udid=udid) return 0, 0, 0 # ---------------- 端口/进程:不复用端口 ---------------- def _is_port_bindable(self, port: int, host: str = "127.0.0.1") -> bool: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((host, port)) return True except OSError: return False finally: s.close() def _pick_new_port(self, tries: int = 40) -> int: method = "_pick_new_port" # 先在 9111~9499 随机尝试 for _ in range(tries // 2): p = random.randint(9111, 9499) if self._is_port_bindable(p): LogManager.method_info(f"端口候选可用(9k段):{p}", method, udid="system") return p else: LogManager.method_info(f"端口候选占用(9k段):{p}", method, udid="system") # 再在 20000~48000 随机尝试 for _ in range(tries): p = random.randint(20000, 48000) if self._is_port_bindable(p): LogManager.method_info(f"端口候选可用(20k-48k):{p}", method, udid="system") return p else: LogManager.method_info(f"端口候选占用(20k-48k):{p}", method, udid="system") LogManager.method_warning("随机端口尝试耗尽,改顺序扫描", method, udid="system") return self._pick_free_port(start=49152, limit=10000) def _wait_until_listening(self, port: int, timeout: float = 2.0) -> bool: method = "_wait_until_listening" deadline = time.time() + timeout while time.time() < deadline: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(0.2) if s.connect_ex(("127.0.0.1", port)) == 0: LogManager.method_info(f"端口已开始监听:{port}", method, udid="system") return True time.sleep(0.05) LogManager.method_warning(f"监听验收超时:{port}", method, udid="system") return False def _start_iproxy(self, udid: str, port: Optional[int] = None) -> Optional[subprocess.Popen]: method = "_start_iproxy" try: with self._lock: old_pid = self._pid_by_udid.get(udid) if old_pid: LogManager.method_info(f"发现旧 iproxy,准备结束:pid={old_pid}", method, udid=udid) self._kill_pid_gracefully(old_pid) self._pid_by_udid.pop(udid, None) time.sleep(0.2) attempts = 0 while attempts < 3: attempts += 1 local_port = port if (attempts == 1 and port is not None) else self._pick_new_port() if not self._is_port_bindable(local_port): LogManager.method_info(f"[attempt {attempts}] 端口竞争,换候选:{local_port}", method, udid=udid) continue LogManager.method_info(f"[attempt {attempts}] 启动 iproxy,port={local_port}", method, udid=udid) creationflags = 0 startupinfo = None if os.name == "nt": creationflags = getattr(subprocess, "CREATE_NO_WINDOW", 0x08000000) | \ getattr(subprocess, "CREATE_NEW_PROCESS_GROUP", 0x00000200) si = subprocess.STARTUPINFO() si.dwFlags |= subprocess.STARTF_USESHOWWINDOW si.wShowWindow = 0 startupinfo = si cmd = [self._iproxy_path, "-u", udid, str(local_port), str(wdaScreenPort)] try: proc = subprocess.Popen( cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=creationflags, startupinfo=startupinfo ) except Exception as e: LogManager.method_warning(f"创建进程失败:{e}", method, udid=udid) continue if not self._wait_until_listening(local_port, timeout=2.0): LogManager.method_warning(f"[attempt {attempts}] iproxy 未监听,重试换端口", method, udid=udid) self._kill(proc) continue with self._lock: self._procs[udid] = proc self._pid_by_udid[udid] = proc.pid self._port_by_udid[udid] = local_port LogManager.method_info(f"iproxy 启动成功并监听,pid={proc.pid}, port={local_port}", method, udid=udid) return proc LogManager.method_error("iproxy 启动多次失败", method, udid=udid) return None except Exception as e: LogManager.method_error(f"_start_iproxy 异常:{e}", method, udid=udid) return None def _kill(self, proc: Optional[subprocess.Popen]): method = "_kill" if not proc: return try: proc.terminate() proc.wait(timeout=2) LogManager.method_info("进程已正常终止", method, udid="system") except Exception: try: os.kill(proc.pid, signal.SIGKILL) LogManager.method_warning("进程被强制杀死", method, udid="system") except Exception as e: LogManager.method_warning(f"强杀失败:{e}", method, udid="system") # ---------------- 自愈:直接换新端口重启 + 指数退避 ---------------- def _restart_iproxy(self, udid: str): method = "_restart_iproxy" now = time.time() next_allowed = self._heal_backoff.get(udid, 0.0) if now < next_allowed: delta = round(next_allowed - now, 2) LogManager.method_info(f"自愈被退避抑制,剩余 {delta}s", method, udid=udid) return with self._lock: proc = self._procs.get(udid) if proc: LogManager.method_info(f"为重启准备清理旧 iproxy,pid={proc.pid}", method, udid=udid) self._kill(proc) time.sleep(0.2) model = self._models.get(udid) if not model: LogManager.method_warning("模型不存在,取消自愈", method, udid=udid) return proc2 = self._start_iproxy(udid, port=None) if not proc2: backoff_old = max(1.5, next_allowed - now + 1.0) if next_allowed > now else 1.5 backoff = min(backoff_old * 1.7, 15.0) self._heal_backoff[udid] = now + backoff LogManager.method_warning(f"重启失败,扩展退避 {round(backoff,2)}s", method, udid=udid) return # 成功后短退避 self._heal_backoff[udid] = now + 1.2 # 通知前端新端口 with self._lock: model = self._models.get(udid) if model: model.screenPort = self._port_by_udid.get(udid, model.screenPort) self._models[udid] = model self._manager_send(model) LogManager.method_info(f"重启成功,使用新端口 {self._port_by_udid.get(udid)}", method, udid=udid) # ---------------- 健康检查 ---------------- def _health_check_mjpeg(self, port: int, timeout: float = 0.8) -> bool: method = "_health_check_mjpeg" try: conn = http.client.HTTPConnection("127.0.0.1", port, timeout=timeout) conn.request("HEAD", "/") resp = conn.getresponse() _ = resp.read(128) conn.close() return True except Exception: return False def _health_check_wda(self, udid: str) -> bool: method = "_health_check_wda" try: c = wda.USBClient(udid, wdaFunctionPort) st = c.status() return bool(st) except Exception: return False def _check_and_heal_tunnels(self, interval: float = 5.0): method = "_check_and_heal_tunnels" now = time.time() if now - self._last_heal_check_ts < interval: return self._last_heal_check_ts = now if (now - self._last_topology_change_ts) < max(self.ORPHAN_COOLDOWN, 6.0): LogManager.method_info("拓扑变更冷却中,本轮跳过自愈", method, udid="system") return with self._lock: items = list(self._models.items()) for udid, model in items: port = model.screenPort if port <= 0: continue ok_local = self._health_check_mjpeg(port, timeout=0.8) ok_wda = self._health_check_wda(udid) LogManager.method_info(f"健康检查:mjpeg={ok_local}, wda={ok_wda}, port={port}", method, udid=udid) if not (ok_local and ok_wda): LogManager.method_warning(f"检测到不健康,触发重启;port={port}", method, udid=udid) self._restart_iproxy(udid) # ---------------- Windows 专用:列出所有 iproxy 命令行 ---------------- def _get_all_iproxy_cmdlines(self) -> List[str]: method = "_get_all_iproxy_cmdlines" lines: List[str] = [] with self._lock: live_pids = set(self._pid_by_udid.values()) for p in psutil.process_iter(attrs=["name", "cmdline", "pid"]): try: name = (p.info.get("name") or "").lower() if name != "iproxy.exe": continue if p.info["pid"] in live_pids: continue cmdline = p.info.get("cmdline") or [] if not cmdline: continue if "-u" in cmdline: cmd = " ".join(cmdline) lines.append(f"{cmd} {p.info['pid']}") except (psutil.NoSuchProcess, psutil.AccessDenied): continue LogManager.method_info(f"扫描到候选 iproxy 进程数={len(lines)}", method, udid="system") return lines # ---------------- 杀孤儿 ---------------- def _cleanup_orphan_iproxy(self): method = "_cleanup_orphan_iproxy" with self._lock: live_udids = set(self._models.keys()) live_pids = set(self._pid_by_udid.values()) cleaned = 0 for ln in self._get_all_iproxy_cmdlines(): parts = ln.split() try: udid = parts[parts.index('-u') + 1] pid = int(parts[-1]) if pid not in live_pids and udid not in live_udids: self._kill_pid_gracefully(pid) cleaned += 1 LogManager.method_warning(f"孤儿 iproxy 已清理:udid={udid}, pid={pid}", method, udid="system") except (ValueError, IndexError): continue if cleaned: LogManager.method_info(f"孤儿清理完成,数量={cleaned}", method, udid="system") # ---------------- 按 PID 强杀 ---------------- def _kill_pid_gracefully(self, pid: int): method = "_kill_pid_gracefully" try: p = psutil.Process(pid) p.terminate() try: p.wait(timeout=1.0) LogManager.method_info(f"进程已终止:pid={pid}", method, udid="system") except psutil.TimeoutExpired: p.kill() LogManager.method_warning(f"进程被强制 kill:pid={pid}", method, udid="system") except Exception as e: LogManager.method_warning(f"kill 进程异常:pid={pid}, err={e}", method, udid="system") # ---------------- 端口工具(兜底) ---------------- def _is_port_free(self, port: int) -> bool: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.settimeout(0.2) try: s.bind(("127.0.0.1", port)) return True except OSError: return False def _pick_free_port(self, start: int = None, limit: int = 2000) -> int: method = "_pick_free_port" p = self._port if start is None else start tried = 0 while tried < limit: p += 1 tried += 1 if self._is_port_free(p): LogManager.method_info(f"顺序扫描找到端口:{p}", method, udid="system") return p LogManager.method_error("顺序扫描未找到可用端口(范围内)", method, udid="system") raise RuntimeError("未找到可用端口(扫描范围内)") # ---------------- 其他 ---------------- def _manager_send(self, model: DeviceModel): method = "_manager_send" try: self._manager.send(model.toDict()) LogManager.method_info("已通知管理器(前端)", method, udid=model.deviceId) except Exception as e: LogManager.method_warning(f"通知管理器异常:{e}", method, udid=model.deviceId) def _find_iproxy(self) -> str: method = "_find_iproxy" base = Path(__file__).resolve().parent.parent name = "iproxy.exe" path = base / "resources" / "iproxy" / name LogManager.method_info(f"查找 iproxy 路径:{path}", method, udid="system") if path.is_file(): return str(path) err = f"iproxy 不存在: {path}" LogManager.method_error(err, method, udid="system") raise FileNotFoundError(err)