优化设备链接逻辑

This commit is contained in:
2025-10-24 16:24:09 +08:00
parent 34b1d1ec77
commit ba4bcff7e1
2 changed files with 250 additions and 109 deletions

View File

@@ -4,14 +4,16 @@ import signal
import subprocess import subprocess
import threading import threading
import time import time
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed, TimeoutError
from pathlib import Path from pathlib import Path
from typing import Dict, Optional, List from typing import Dict, Optional, List
import random import random
import socket import socket
import http.client import http.client
import psutil import psutil
import hashlib # 保留,如需后续扩展 import hashlib # 保留扩展
import platform
import tidevice import tidevice
import wda import wda
from tidevice import Usbmux, ConnectionType from tidevice import Usbmux, ConnectionType
@@ -24,13 +26,34 @@ from Module.IOSActivator import IOSActivator
from Utils.LogManager import LogManager from Utils.LogManager import LogManager
def _monotonic() -> float:
"""统一用 monotonic 计时,避免系统时钟跳变影响定时/退避。"""
return time.monotonic()
class DeviceInfo: class DeviceInfo:
# --- 时序参数(更稳 --- # --- 时序参数(支持环境变量覆盖 ---
REMOVE_GRACE_SEC = 8.0 # 设备离线宽限期(秒) REMOVE_GRACE_SEC = float(os.getenv("REMOVE_GRACE_SEC", "8.0")) # 设备离线宽限期
ADD_STABLE_SEC = 2.5 # 设备上线稳定期(秒) ADD_STABLE_SEC = float(os.getenv("ADD_STABLE_SEC", "2.5")) # 设备上线稳定期
ORPHAN_COOLDOWN = 8.0 # 拓扑变更后暂停孤儿清理(秒) ORPHAN_COOLDOWN = float(os.getenv("ORPHAN_COOLDOWN", "8.0")) # 拓扑变更后暂停孤儿清理
HEAL_INTERVAL = 5.0 # 健康巡检间隔(秒) HEAL_INTERVAL = float(os.getenv("HEAL_INTERVAL", "5.0")) # 健康巡检间隔
# 端口策略(支持环境变量覆盖)
PORT_RAND_LOW_1 = int(os.getenv("PORT_RAND_LOW_1", "9111"))
PORT_RAND_HIGH_1 = int(os.getenv("PORT_RAND_HIGH_1", "9499"))
PORT_RAND_LOW_2 = int(os.getenv("PORT_RAND_LOW_2", "20000"))
PORT_RAND_HIGH_2 = int(os.getenv("PORT_RAND_HIGH_2", "48000"))
PORT_SCAN_START = int(os.getenv("PORT_SCAN_START", "49152"))
PORT_SCAN_LIMIT = int(os.getenv("PORT_SCAN_LIMIT", "10000"))
# 自愈退避
BACKOFF_MAX_SEC = float(os.getenv("BACKOFF_MAX_SEC", "15.0"))
BACKOFF_MIN_SEC = float(os.getenv("BACKOFF_MIN_SEC", "1.5"))
BACKOFF_GROWTH = float(os.getenv("BACKOFF_GROWTH", "1.7"))
# WDA Ready 等待HTTP 轮询方式,不触发 xctest
WDA_READY_TIMEOUT = float(os.getenv("WDA_READY_TIMEOUT", "35.0"))
def __init__(self): def __init__(self):
# 自增端口游标仅作兜底扫描使用 # 自增端口游标仅作兜底扫描使用
@@ -54,6 +77,10 @@ class DeviceInfo:
self._first_seen: Dict[str, float] = {} # udid -> ts(首次在线) self._first_seen: Dict[str, float] = {} # udid -> ts(首次在线)
self._last_topology_change_ts = 0.0 self._last_topology_change_ts = 0.0
# 短缓存设备信任、WDA运行态仅作节流
self._trusted_cache: Dict[str, float] = {} # udid -> expire_ts
self._wda_ok_cache: Dict[str, float] = {} # udid -> expire_ts
LogManager.info("DeviceInfo init 完成;日志已启用", udid="system") LogManager.info("DeviceInfo init 完成;日志已启用", udid="system")
# ---------------- 主循环 ---------------- # ---------------- 主循环 ----------------
@@ -62,7 +89,7 @@ class DeviceInfo:
LogManager.method_info("进入主循环", method, udid="system") LogManager.method_info("进入主循环", method, udid="system")
orphan_gc_tick = 0 orphan_gc_tick = 0
while True: while True:
now = time.time() now = _monotonic()
try: try:
usb = Usbmux().device_list() usb = Usbmux().device_list()
online_now = {d.udid for d in usb if d.conn_type == ConnectionType.USB} online_now = {d.udid for d in usb if d.conn_type == ConnectionType.USB}
@@ -95,12 +122,18 @@ class DeviceInfo:
if to_add: if to_add:
LogManager.info(f"新增设备稳定上线:{to_add}", udid="system") LogManager.info(f"新增设备稳定上线:{to_add}", udid="system")
futures = {self._pool.submit(self._add_device, u): u for u in to_add} futures = {self._pool.submit(self._add_device, u): u for u in to_add}
for f in as_completed(futures, timeout=45): try:
try: for f in as_completed(futures, timeout=45):
f.result() try:
self._last_topology_change_ts = time.time() f.result()
except Exception as e: self._last_topology_change_ts = _monotonic()
LogManager.error(f"异步连接失败:{e}", udid="system") except Exception as e:
LogManager.error(f"异步连接失败:{e}", udid="system")
except TimeoutError:
for fut, u in futures.items():
if not fut.done():
fut.cancel()
LogManager.method_warning("新增设备任务超时已取消", method, udid=u)
# 定期健康检查 + 自愈 # 定期健康检查 + 自愈
self._check_and_heal_tunnels(interval=self.HEAL_INTERVAL) self._check_and_heal_tunnels(interval=self.HEAL_INTERVAL)
@@ -109,7 +142,7 @@ class DeviceInfo:
orphan_gc_tick += 1 orphan_gc_tick += 1
if orphan_gc_tick >= 10: if orphan_gc_tick >= 10:
orphan_gc_tick = 0 orphan_gc_tick = 0
if (time.time() - self._last_topology_change_ts) >= self.ORPHAN_COOLDOWN: if (_monotonic() - self._last_topology_change_ts) >= self.ORPHAN_COOLDOWN:
self._cleanup_orphan_iproxy() self._cleanup_orphan_iproxy()
time.sleep(1) time.sleep(1)
@@ -123,20 +156,48 @@ class DeviceInfo:
LogManager.method_warning("未信任设备,跳过", method, udid=udid) LogManager.method_warning("未信任设备,跳过", method, udid=udid)
return return
r = self.startWda(udid) # 获取系统主版本
if r is False: try:
LogManager.method_error("启动 WDA 失败,放弃新增", method, udid=udid) dev = tidevice.Device(udid)
return system_version_major = int(dev.product_version.split(".")[0])
except Exception as e:
LogManager.method_warning(f"读取系统版本失败:{e}", method, udid=udid)
system_version_major = 0 # 保底
# iOS 17+ 激活/信任阶段更抖,稍等更稳 # === iOS>17先“被动探测”WDA未运行则交给 IOSActivator并通过 HTTP 轮询等待 ===
time.sleep(5) if system_version_major > 17:
if self._wda_is_running(udid):
LogManager.method_info("检测到 WDA 已运行,直接映射", method, udid=udid)
else:
LogManager.method_info("WDA 未运行,调用 IOSActivatorpymobiledevice3 自动挂载)", method, udid=udid)
try:
ios = IOSActivator()
threading.Thread(target=ios.activate, args=(udid,), daemon=True).start()
except Exception as e:
LogManager.method_error(f"IOSActivator 启动异常:{e}", method, udid=udid)
return
# 关键HTTP 轮询等待 WDA Ready默认最多 35s不会触发 xctest
if not self._wait_wda_ready_http(udid, total_timeout_sec=self.WDA_READY_TIMEOUT):
LogManager.method_error("WDA 未在超时内就绪iOS>17 分支)", method, udid=udid)
return
else:
# iOS <= 17保持原逻辑app_start + 简单等待)
try:
dev = tidevice.Device(udid)
LogManager.method_info(f"app_start WDA: {WdaAppBundleId}", method, udid=udid)
dev.app_start(WdaAppBundleId)
time.sleep(3)
except Exception as e:
LogManager.method_error(f"WDA 启动异常:{e}", method, udid=udid)
return
# 获取屏幕信息
w, h, s = self._screen_info(udid) w, h, s = self._screen_info(udid)
if w == 0 or h == 0 or s == 0: if w == 0 or h == 0 or s == 0:
LogManager.method_warning("未获取到屏幕信息,放弃新增", method, udid=udid) LogManager.method_warning("未获取到屏幕信息,放弃新增", method, udid=udid)
return return
# 不复用端口:直接起一个新端口 # 启动 iproxy不复用端口:直接新端口
proc = self._start_iproxy(udid, port=None) proc = self._start_iproxy(udid, port=None)
if not proc: if not proc:
LogManager.method_error("启动 iproxy 失败,放弃新增", method, udid=udid) LogManager.method_error("启动 iproxy 失败,放弃新增", method, udid=udid)
@@ -152,7 +213,7 @@ class DeviceInfo:
LogManager.method_info(f"设备添加完成port={port}, {w}x{h}@{s}", method, udid=udid) LogManager.method_info(f"设备添加完成port={port}, {w}x{h}@{s}", method, udid=udid)
self._manager_send(model) self._manager_send(model)
# ---------------- 移除设备 ---------------- # ---------------- 移除设备(修复:总是发送离线通知) ----------------
def _remove_device(self, udid: str): def _remove_device(self, udid: str):
method = "_remove_device" method = "_remove_device"
LogManager.method_info("开始移除设备", method, udid=udid) LogManager.method_info("开始移除设备", method, udid=udid)
@@ -161,45 +222,118 @@ class DeviceInfo:
proc = self._procs.pop(udid, None) proc = self._procs.pop(udid, None)
pid = self._pid_by_udid.pop(udid, None) pid = self._pid_by_udid.pop(udid, None)
self._port_by_udid.pop(udid, None) self._port_by_udid.pop(udid, None)
# 清缓存,防止误判
self._trusted_cache.pop(udid, None)
self._wda_ok_cache.pop(udid, None)
self._last_seen.pop(udid, None)
self._first_seen.pop(udid, None)
if not model:
LogManager.method_warning("未找到设备模型,可能重复移除", method, udid=udid)
return
model.type = 2
self._kill(proc) self._kill(proc)
if pid: if pid:
self._kill_pid_gracefully(pid) self._kill_pid_gracefully(pid)
self._manager_send(model)
LogManager.method_info("设备移除完毕", method, udid=udid) if model is None:
# 构造一个最小的“离线模型”通知前端
try:
offline = DeviceModel(deviceId=udid, screenPort=-1, width=0, height=0, scale=0.0, type=2)
offline.ready = False
self._manager_send(offline)
LogManager.method_info("设备移除完毕(无原模型,已发送离线通知)", method, udid=udid)
except Exception as e:
LogManager.method_warning(f"离线通知(构造模型)异常:{e}", method, udid=udid)
return
model.type = 2
model.ready = False
model.screenPort = -1
try:
self._manager_send(model)
finally:
LogManager.method_info("设备移除完毕(已发送离线通知)", method, udid=udid)
# ---------------- 工具函数 ---------------- # ---------------- 工具函数 ----------------
def _trusted(self, udid: str) -> bool: def _trusted(self, udid: str) -> bool:
# 30s 短缓存,减少 IO
now = _monotonic()
exp = self._trusted_cache.get(udid, 0.0)
if exp > now:
return True
try: try:
BaseDevice(udid).get_value("DeviceName") BaseDevice(udid).get_value("DeviceName")
self._trusted_cache[udid] = now + 30.0
return True return True
except Exception: except Exception:
return False return False
def startWda(self, udid): # ======= WDA 探测/等待(仅走 iproxy+HTTP不触发 xctest =======
method = "startWda" def _wda_http_status_ok(self, udid: str, timeout_sec: float = 1.2) -> bool:
LogManager.method_info("进入启动流程", method, udid=udid) """临时 iproxy 转发到 wdaFunctionPortGET /status 成功视为 OK。"""
method = "_wda_http_status_ok"
tmp_port = self._pick_new_port()
proc = None
try: try:
dev = tidevice.Device(udid) cmd = [self._iproxy_path, "-u", udid, str(tmp_port), str(wdaFunctionPort)]
systemVersion = int(dev.product_version.split(".")[0]) proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if systemVersion > 17: if not self._wait_until_listening(tmp_port, initial_timeout=0.8):
LogManager.method_info(f"iOS 主版本 {systemVersion},使用 IOSActivator", method, udid=udid) LogManager.method_info(f"WDA探测临时端口未监听{tmp_port}", method, udid=udid)
ios = IOSActivator() return False
threading.Thread(target=ios.activate, args=(udid,), daemon=True).start()
else: conn = http.client.HTTPConnection("127.0.0.1", tmp_port, timeout=timeout_sec)
LogManager.method_info(f"app_start WDA: {WdaAppBundleId}", method, udid=udid) try:
dev.app_start(WdaAppBundleId) conn.request("GET", "/status")
LogManager.method_info("WDA 启动完成,等待稳定...", method, udid=udid) resp = conn.getresponse()
time.sleep(3) _ = resp.read(256)
code = getattr(resp, "status", 0)
ok = 200 <= code < 400
LogManager.method_info(f"WDA探测/status code={code}, ok={ok}", method, udid=udid)
return ok
except Exception as e:
LogManager.method_info(f"WDA探测异常{e}", method, udid=udid)
return False
finally:
try:
conn.close()
except Exception:
pass
finally:
if proc:
try:
p = psutil.Process(proc.pid)
p.terminate()
p.wait(timeout=0.6)
except Exception:
try:
p.kill()
except Exception:
pass
def _wait_wda_ready_http(self, udid: str, total_timeout_sec: float = None, interval_sec: float = 0.6) -> bool:
"""
通过 _wda_http_status_ok 轮询等待 WDA Ready。
total_timeout_sec 默认取环境变量 WDA_READY_TIMEOUT默认 35s
"""
method = "_wait_wda_ready_http"
if total_timeout_sec is None:
total_timeout_sec = self.WDA_READY_TIMEOUT
deadline = _monotonic() + total_timeout_sec
while _monotonic() < deadline:
if self._wda_http_status_ok(udid, timeout_sec=1.2):
LogManager.method_info("WDA 就绪HTTP轮询", method, udid=udid)
return True
time.sleep(interval_sec)
LogManager.method_warning(f"WDA 等待超时HTTP轮询{total_timeout_sec}s", method, udid=udid)
return False
def _wda_is_running(self, udid: str, cache_sec: float = 2.0) -> bool:
"""轻量速查,走 HTTP /status短缓存节流避免触发 xctest。"""
now = _monotonic()
exp = self._wda_ok_cache.get(udid, 0.0)
if exp > now:
return True return True
except Exception as e: ok = self._wda_http_status_ok(udid, timeout_sec=1.2)
LogManager.method_error(f"WDA 启动异常:{e}", method, udid=udid) if ok:
return False self._wda_ok_cache[udid] = now + cache_sec
return ok
def _screen_info(self, udid: str): def _screen_info(self, udid: str):
method = "_screen_info" method = "_screen_info"
@@ -215,7 +349,7 @@ class DeviceInfo:
return 0, 0, 0 return 0, 0, 0
# ---------------- 端口/进程:不复用端口 ---------------- # ---------------- 端口/进程:不复用端口 ----------------
def _is_port_bindable(self, port: int, host: str = "127.0.0.1") -> bool: def _is_port_free(self, port: int, host: str = "127.0.0.1") -> bool:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try: try:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
@@ -228,36 +362,38 @@ class DeviceInfo:
def _pick_new_port(self, tries: int = 40) -> int: def _pick_new_port(self, tries: int = 40) -> int:
method = "_pick_new_port" method = "_pick_new_port"
# 先在 9111~9499 随机尝试 for _ in range(max(1, tries // 2)):
for _ in range(tries // 2): p = random.randint(self.PORT_RAND_LOW_1, self.PORT_RAND_HIGH_1)
p = random.randint(9111, 9499) if self._is_port_free(p):
if self._is_port_bindable(p): LogManager.method_info(f"端口候选可用(首段){p}", method, udid="system")
LogManager.method_info(f"端口候选可用(9k段){p}", method, udid="system")
return p return p
else: else:
LogManager.method_info(f"端口候选占用(9k段){p}", method, udid="system") LogManager.method_info(f"端口候选占用(段){p}", method, udid="system")
# 再在 20000~48000 随机尝试
for _ in range(tries): for _ in range(tries):
p = random.randint(20000, 48000) p = random.randint(self.PORT_RAND_LOW_2, self.PORT_RAND_HIGH_2)
if self._is_port_bindable(p): if self._is_port_free(p):
LogManager.method_info(f"端口候选可用(20k-48k){p}", method, udid="system") LogManager.method_info(f"端口候选可用(次段){p}", method, udid="system")
return p return p
else: else:
LogManager.method_info(f"端口候选占用(20k-48k){p}", method, udid="system") LogManager.method_info(f"端口候选占用(次段){p}", method, udid="system")
LogManager.method_warning("随机端口尝试耗尽,改顺序扫描", method, udid="system") LogManager.method_warning("随机端口尝试耗尽,改顺序扫描", method, udid="system")
return self._pick_free_port(start=49152, limit=10000) return self._pick_free_port(start=self.PORT_SCAN_START, limit=self.PORT_SCAN_LIMIT)
def _wait_until_listening(self, port: int, timeout: float = 2.0) -> bool: def _wait_until_listening(self, port: int, initial_timeout: float = 2.0) -> bool:
"""自适应等待端口监听2s -> 3s -> 5s最多约10s"""
method = "_wait_until_listening" method = "_wait_until_listening"
deadline = time.time() + timeout timeouts = [initial_timeout, 3.0, 5.0]
while time.time() < deadline: for to in timeouts:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: deadline = _monotonic() + to
s.settimeout(0.2) while _monotonic() < deadline:
if s.connect_ex(("127.0.0.1", port)) == 0: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
LogManager.method_info(f"端口已开始监听:{port}", method, udid="system") s.settimeout(0.2)
return True if s.connect_ex(("127.0.0.1", port)) == 0:
time.sleep(0.05) LogManager.method_info(f"端口已开始监听:{port}", method, udid="system")
LogManager.method_warning(f"监听验收超时:{port}", method, udid="system") return True
time.sleep(0.05)
LogManager.method_info(f"监听验收阶段超时:{port},扩展等待", method, udid="system")
LogManager.method_warning(f"监听验收最终超时:{port}", method, udid="system")
return False return False
def _start_iproxy(self, udid: str, port: Optional[int] = None) -> Optional[subprocess.Popen]: def _start_iproxy(self, udid: str, port: Optional[int] = None) -> Optional[subprocess.Popen]:
@@ -269,13 +405,12 @@ class DeviceInfo:
LogManager.method_info(f"发现旧 iproxy准备结束pid={old_pid}", method, udid=udid) LogManager.method_info(f"发现旧 iproxy准备结束pid={old_pid}", method, udid=udid)
self._kill_pid_gracefully(old_pid) self._kill_pid_gracefully(old_pid)
self._pid_by_udid.pop(udid, None) self._pid_by_udid.pop(udid, None)
time.sleep(0.2)
attempts = 0 attempts = 0
while attempts < 3: while attempts < 3:
attempts += 1 attempts += 1
local_port = port if (attempts == 1 and port is not None) else self._pick_new_port() local_port = port if (attempts == 1 and port is not None) else self._pick_new_port()
if not self._is_port_bindable(local_port): if not self._is_port_free(local_port):
LogManager.method_info(f"[attempt {attempts}] 端口竞争,换候选:{local_port}", method, udid=udid) LogManager.method_info(f"[attempt {attempts}] 端口竞争,换候选:{local_port}", method, udid=udid)
continue continue
@@ -301,10 +436,10 @@ class DeviceInfo:
startupinfo=startupinfo startupinfo=startupinfo
) )
except Exception as e: except Exception as e:
LogManager.method_warning(f"创建进程失败:{e}", method, udid=udid) LogManager.method_warning(f"创建 iproxy 进程失败:{e}", method, udid=udid)
continue continue
if not self._wait_until_listening(local_port, timeout=2.0): if not self._wait_until_listening(local_port, initial_timeout=2.0):
LogManager.method_warning(f"[attempt {attempts}] iproxy 未监听,重试换端口", method, udid=udid) LogManager.method_warning(f"[attempt {attempts}] iproxy 未监听,重试换端口", method, udid=udid)
self._kill(proc) self._kill(proc)
continue continue
@@ -329,20 +464,26 @@ class DeviceInfo:
if not proc: if not proc:
return return
try: try:
proc.terminate() p = psutil.Process(proc.pid)
proc.wait(timeout=2) p.terminate()
LogManager.method_info("进程已正常终止", method, udid="system")
except Exception:
try: try:
os.kill(proc.pid, signal.SIGKILL) p.wait(timeout=2.0)
LogManager.method_info("进程已正常终止", method, udid="system")
except psutil.TimeoutExpired:
p.kill()
LogManager.method_warning("进程被强制杀死", method, udid="system") LogManager.method_warning("进程被强制杀死", method, udid="system")
except Exception as e: except Exception as e:
LogManager.method_warning(f"强杀失败{e}", method, udid="system") LogManager.method_warning(f"结束进程异常{e}", method, udid="system")
# ---------------- 自愈:直接换新端口重启 + 指数退避 ---------------- # ---------------- 自愈:直接换新端口重启 + 指数退避 ----------------
def _next_backoff(self, prev_backoff: float) -> float:
if prev_backoff <= 0:
return self.BACKOFF_MIN_SEC
return min(prev_backoff * self.BACKOFF_GROWTH, self.BACKOFF_MAX_SEC)
def _restart_iproxy(self, udid: str): def _restart_iproxy(self, udid: str):
method = "_restart_iproxy" method = "_restart_iproxy"
now = time.time() now = _monotonic()
next_allowed = self._heal_backoff.get(udid, 0.0) next_allowed = self._heal_backoff.get(udid, 0.0)
if now < next_allowed: if now < next_allowed:
delta = round(next_allowed - now, 2) delta = round(next_allowed - now, 2)
@@ -354,7 +495,6 @@ class DeviceInfo:
if proc: if proc:
LogManager.method_info(f"为重启准备清理旧 iproxypid={proc.pid}", method, udid=udid) LogManager.method_info(f"为重启准备清理旧 iproxypid={proc.pid}", method, udid=udid)
self._kill(proc) self._kill(proc)
time.sleep(0.2)
model = self._models.get(udid) model = self._models.get(udid)
if not model: if not model:
LogManager.method_warning("模型不存在,取消自愈", method, udid=udid) LogManager.method_warning("模型不存在,取消自愈", method, udid=udid)
@@ -362,13 +502,13 @@ class DeviceInfo:
proc2 = self._start_iproxy(udid, port=None) proc2 = self._start_iproxy(udid, port=None)
if not proc2: if not proc2:
backoff_old = max(1.5, next_allowed - now + 1.0) if next_allowed > now else 1.5 prev = max(0.0, next_allowed - now)
backoff = min(backoff_old * 1.7, 15.0) backoff = self._next_backoff(prev)
self._heal_backoff[udid] = now + backoff self._heal_backoff[udid] = now + backoff
LogManager.method_warning(f"重启失败,扩展退避 {round(backoff,2)}s", method, udid=udid) LogManager.method_warning(f"重启失败,扩展退避 {round(backoff,2)}s", method, udid=udid)
return return
# 成功后短退避 # 成功后短退避(抑制频繁重启)
self._heal_backoff[udid] = now + 1.2 self._heal_backoff[udid] = now + 1.2
# 通知前端新端口 # 通知前端新端口
@@ -388,23 +528,19 @@ class DeviceInfo:
conn.request("HEAD", "/") conn.request("HEAD", "/")
resp = conn.getresponse() resp = conn.getresponse()
_ = resp.read(128) _ = resp.read(128)
code = getattr(resp, "status", 0)
conn.close() conn.close()
return True return 200 <= code < 400
except Exception: except Exception:
return False return False
def _health_check_wda(self, udid: str) -> bool: def _health_check_wda(self, udid: str) -> bool:
method = "_health_check_wda" # 使用 HTTP 探测(带短缓存),避免触发 xctest
try: return self._wda_is_running(udid, cache_sec=1.0)
c = wda.USBClient(udid, wdaFunctionPort)
st = c.status()
return bool(st)
except Exception:
return False
def _check_and_heal_tunnels(self, interval: float = 5.0): def _check_and_heal_tunnels(self, interval: float = 5.0):
method = "_check_and_heal_tunnels" method = "_check_and_heal_tunnels"
now = time.time() now = _monotonic()
if now - self._last_heal_check_ts < interval: if now - self._last_heal_check_ts < interval:
return return
self._last_heal_check_ts = now self._last_heal_check_ts = now
@@ -429,16 +565,20 @@ class DeviceInfo:
LogManager.method_warning(f"检测到不健康触发重启port={port}", method, udid=udid) LogManager.method_warning(f"检测到不健康触发重启port={port}", method, udid=udid)
self._restart_iproxy(udid) self._restart_iproxy(udid)
# ---------------- Windows 专用:列出所有 iproxy 命令行 ---------------- # ---------------- Windows/*nix:列出所有 iproxy 命令行 ----------------
def _get_all_iproxy_cmdlines(self) -> List[str]: def _get_all_iproxy_cmdlines(self) -> List[str]:
method = "_get_all_iproxy_cmdlines" method = "_get_all_iproxy_cmdlines"
lines: List[str] = [] lines: List[str] = []
with self._lock: with self._lock:
live_pids = set(self._pid_by_udid.values()) live_pids = set(self._pid_by_udid.values())
is_windows = os.name == "nt"
target_name = "iproxy.exe" if is_windows else "iproxy"
for p in psutil.process_iter(attrs=["name", "cmdline", "pid"]): for p in psutil.process_iter(attrs=["name", "cmdline", "pid"]):
try: try:
name = (p.info.get("name") or "").lower() name = (p.info.get("name") or "").lower()
if name != "iproxy.exe": if name != target_name:
continue continue
if p.info["pid"] in live_pids: if p.info["pid"] in live_pids:
continue continue
@@ -464,12 +604,14 @@ class DeviceInfo:
for ln in self._get_all_iproxy_cmdlines(): for ln in self._get_all_iproxy_cmdlines():
parts = ln.split() parts = ln.split()
try: try:
if "-u" not in parts:
continue
udid = parts[parts.index('-u') + 1] udid = parts[parts.index('-u') + 1]
pid = int(parts[-1]) pid = int(parts[-1])
if pid not in live_pids and udid not in live_udids: if pid not in live_pids and udid not in live_udids:
self._kill_pid_gracefully(pid) self._kill_pid_gracefully(pid)
cleaned += 1 cleaned += 1
LogManager.method_warning(f"孤儿 iproxy 已清理udid={udid}, pid={pid}", method, udid="system") LogManager.method_warning(f"孤儿 iproxy 已清理udid={udid}, pid={pid}", method, udid=udid)
except (ValueError, IndexError): except (ValueError, IndexError):
continue continue
@@ -492,16 +634,6 @@ class DeviceInfo:
LogManager.method_warning(f"kill 进程异常pid={pid}, err={e}", method, udid="system") LogManager.method_warning(f"kill 进程异常pid={pid}, err={e}", method, udid="system")
# ---------------- 端口工具(兜底) ---------------- # ---------------- 端口工具(兜底) ----------------
def _is_port_free(self, port: int) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.settimeout(0.2)
try:
s.bind(("127.0.0.1", port))
return True
except OSError:
return False
def _pick_free_port(self, start: int = None, limit: int = 2000) -> int: def _pick_free_port(self, start: int = None, limit: int = 2000) -> int:
method = "_pick_free_port" method = "_pick_free_port"
p = self._port if start is None else start p = self._port if start is None else start
@@ -525,13 +657,22 @@ class DeviceInfo:
LogManager.method_warning(f"通知管理器异常:{e}", method, udid=model.deviceId) LogManager.method_warning(f"通知管理器异常:{e}", method, udid=model.deviceId)
def _find_iproxy(self) -> str: def _find_iproxy(self) -> str:
"""优先环境变量 IPROXY_PATH否则按平台在 resources/iproxy 查找。"""
method = "_find_iproxy" method = "_find_iproxy"
env_path = os.getenv("IPROXY_PATH")
if env_path and Path(env_path).is_file():
LogManager.method_info(f"使用环境变量指定的 iproxy 路径:{env_path}", method, udid="system")
return env_path
base = Path(__file__).resolve().parent.parent base = Path(__file__).resolve().parent.parent
name = "iproxy.exe" is_windows = os.name == "nt"
name = "iproxy.exe" if is_windows else "iproxy"
path = base / "resources" / "iproxy" / name path = base / "resources" / "iproxy" / name
LogManager.method_info(f"查找 iproxy 路径:{path}", method, udid="system") LogManager.method_info(f"查找 iproxy 路径:{path}", method, udid="system")
if path.is_file(): if path.is_file():
return str(path) return str(path)
err = f"iproxy 不存在: {path}" err = f"iproxy 不存在: {path}"
LogManager.method_error(err, method, udid="system") LogManager.method_error(err, method, udid="system")
raise FileNotFoundError(err) raise FileNotFoundError(err)