修复bug

This commit is contained in:
2025-11-05 17:07:51 +08:00
parent 242c2e99c5
commit 01e18bdc03
10 changed files with 497 additions and 202 deletions

View File

@@ -3,6 +3,8 @@
极简稳定版设备监督器DeviceInfo加详细 print 日志
- 每个关键节点都会 print便于人工观察执行到哪一步
- 保留核心逻辑:监听上下线 / 启动 WDA / 起 iproxy / 通知前端
- 并发提速_add_device 异步化(受控并发)
- iproxy 守护:本地端口 + /status 探活,不通则自愈重启;连续失败达阈值才移除
"""
import os
import time
@@ -129,72 +131,214 @@ class DeviceInfo:
self._last_seen: Dict[str, float] = {}
self._manager = FlaskSubprocessManager.get_instance()
self._iproxy_path = self._find_iproxy()
# iproxy 连续失败计数(守护用)
self._iproxy_fail_count: Dict[str, int] = {}
LogManager.info("DeviceInfo 初始化完成", udid="system")
print("[Init] DeviceInfo 初始化完成")
threading.Thread(target=self.check_iproxy_ports).start()
# iproxy 守护线程(端口+HTTP探活 → 自愈重启 → 达阈值才移除)
threading.Thread(target=self.check_iproxy_ports, daemon=True).start()
# =============== 核心端口连通性检测HTTP 方式) =================
def _is_local_port_open(self, port: int, udid: str, timeout: float = 5) -> bool:
"""
使用 HTTP 方式检测:向 http://127.0.0.1:port/ 发送一次 HEAD 请求。
只要建立连接并收到合法的 HTTP 响应(任意 1xx~5xx 状态码),即认为 HTTP 可达。
遇到连接失败、超时、协议不对等异常,视为不可用。
"""
if not isinstance(port, int) or port <= 0 or port > 65535:
LogManager.error("端口不可用(非法端口号)", udid=udid)
return False
self._initialized = True # 标记已初始化
# =============== 并发添加设备:最小改动(包装 _add_device ===============
def _ensure_add_executor(self):
"""
懒加载:首次调用 _add_device 时初始化线程池与去重集合。
不改 __init__避免对现有初始化节奏有影响。
"""
if not hasattr(self, "_add_lock"):
self._add_lock = threading.RLock()
if not hasattr(self, "_adding_udids"):
self._adding_udids = set()
if not hasattr(self, "_add_executor") or self._add_executor is None:
import os
from concurrent.futures import ThreadPoolExecutor
max_workers = max(2, min(6, (os.cpu_count() or 4) // 2))
self._add_executor = ThreadPoolExecutor(
max_workers=max_workers,
thread_name_prefix="dev-add"
)
try:
LogManager.info(f"[Init] Device add executor started, max_workers={max_workers}", udid="system")
except Exception:
pass
def _safe_add_device(self, udid: str):
"""
后台执行真正的新增实现_add_device_impl
- 任何异常只记日志,不抛出
- 无论成功与否,都在 finally 里清理“正在添加”标记
"""
try:
# HEAD 更轻;若后端对 HEAD 不友好,可改为 "GET", "/"
conn = http.client.HTTPConnection("127.0.0.1", int(port), timeout=timeout)
conn.request("HEAD", "/")
resp = conn.getresponse()
status = resp.status
# 读到响应即可关闭
conn.close()
# 任何合法 HTTP 状态码都说明“HTTP 服务在监听且可交互”,包括 404/401/403/5xx
if 100 <= status <= 599:
return True
else:
LogManager.error(f"HTTP状态码异常: {status}", udid=udid)
return False
self._add_device_impl(udid) # ← 这是你原来的重逻辑(见下方)
except Exception as e:
# 连接被拒绝、超时、不是HTTP协议正确响应比如返回了非HTTP的字节流都会到这里
LogManager.error(f"HTTP检测失败{e}", udid=udid)
try:
LogManager.method_error(f"_add_device_impl 异常:{e}", "_safe_add_device", udid=udid)
except Exception:
pass
finally:
with self._add_lock:
self._adding_udids.discard(udid)
def _add_device(self, udid: str):
"""
并发包装器保持所有调用点不变listen 里仍然调用 _add_device
- 懒加载线程池
- 同一 udid 防重提交
- 真实重逻辑放到 _add_device_impl下方已把你的原始实现迁过去
"""
self._ensure_add_executor()
with self._add_lock:
if udid in self._adding_udids:
return
self._adding_udids.add(udid)
try:
self._add_executor.submit(self._safe_add_device, udid)
except Exception as e:
with self._add_lock:
self._adding_udids.discard(udid)
try:
LogManager.method_error(f"提交新增任务失败:{e}", "_add_device", udid=udid)
except Exception:
pass
# =============== iproxy 健康检查 / 自愈 ===============
def _iproxy_tcp_probe(self, port: int, timeout: float = 0.6) -> bool:
"""快速 TCP 探测:能建立连接即认为本地监听正常。"""
try:
with socket.create_connection(("127.0.0.1", int(port)), timeout=timeout):
return True
except Exception:
return False
# =============== 一轮检查:发现不通就移除 =================
def _iproxy_http_status_ok_quick(self, port: int, timeout: float = 1.2) -> bool:
"""
轻量 HTTP 探测GET /status
- 成功返回 2xx/3xx 视为 OK
- 4xx/5xx 也说明链路畅通(服务可交互),这里统一认为 OK避免误判
"""
try:
conn = http.client.HTTPConnection("127.0.0.1", int(port), timeout=timeout)
conn.request("GET", "/status")
resp = conn.getresponse()
_ = resp.read(128)
code = getattr(resp, "status", 0)
conn.close()
# 任何能返回 HTTP 的,都说明“有服务可交互”
return 100 <= code <= 599
except Exception:
return False
def _iproxy_health_ok(self, udid: str, port: int) -> bool:
"""综合健康判断:先 TCP后 HTTP /status。两者任一失败即为不健康。"""
# 先看端口是否真在监听
if not self._iproxy_tcp_probe(port, timeout=0.6):
return False
# 再看链路到后端是否通WDA 会回应 /status
if not self._iproxy_http_status_ok_quick(port, timeout=1.2):
return False
return True
def _restart_iproxy(self, udid: str, port: int) -> bool:
"""干净重启 iproxy先杀旧的再启动新的并等待监听。"""
print(f"[iproxy-guard] 准备重启 iproxy {udid} on {port}")
proc = None
with self._lock:
old = self._iproxy.get(udid)
try:
if old:
self._kill(old)
except Exception as e:
print(f"[iproxy-guard] 杀旧进程异常 {udid}: {e}")
# 重新拉起
try:
proc = self._start_iproxy(udid, local_port=port)
except Exception as e:
print(f"[iproxy-guard] 重启失败 {udid}: {e}")
proc = None
if not proc:
return False
# 写回进程表
with self._lock:
self._iproxy[udid] = proc
print(f"[iproxy-guard] 重启成功 {udid} port={port}")
return True
# =============== 一轮检查:先自愈,仍失败才考虑移除 =================
def check_iproxy_ports(self, connect_timeout: float = 3) -> None:
"""
周期性巡检(默认每 10s 一次):
- 在线设备(type=1)
1) 先做 TCP 探测127.0.0.1:screenPort
2) 再做 HTTP /status 探测
3) 任一失败 → 尝试自愈重启 iproxy若仍失败累计失败计数
4) 连续失败次数 >= 3 才移除设备(避免短暂抖动)
"""
# 启动延迟,等新增流程跑起来,避免误判
time.sleep(20)
FAIL_THRESHOLD = 3 # 连续失败阈值
INTERVAL_SEC = 10 # 巡检间隔
while True:
snapshot = list(self._models.items()) # [(deviceId, DeviceModel), ...]
for device_id, model in snapshot:
try:
# 只处理在线且端口合法的设备
if model.type != 1:
# 离线设备清零计数
self._iproxy_fail_count.pop(device_id, None)
continue
port = int(model.screenPort)
if port <= 0 or port > 65535:
continue
ok = self._is_local_port_open(port, timeout=connect_timeout, udid=device_id)
if not ok:
print(f"[iproxy-check] 端口不可连,移除设备 deviceId={device_id} port={port}")
# 健康探测
ok = self._iproxy_health_ok(device_id, port)
if ok:
# 健康则清零失败计数
if self._iproxy_fail_count.get(device_id):
self._iproxy_fail_count[device_id] = 0
# print(f"[iproxy-check] OK deviceId={device_id} port={port}")
continue
# 第一次失败:尝试自愈重启
print(f"[iproxy-check] 探活失败,准备自愈重启 deviceId={device_id} port={port}")
healed = self._restart_iproxy(device_id, port)
# 重启后再探测一次
ok2 = self._iproxy_health_ok(device_id, port) if healed else False
if ok2:
print(f"[iproxy-check] 自愈成功 deviceId={device_id} port={port}")
self._iproxy_fail_count[device_id] = 0
continue
# 自愈失败:累计失败计数
fails = self._iproxy_fail_count.get(device_id, 0) + 1
self._iproxy_fail_count[device_id] = fails
print(f"[iproxy-check] 自愈失败 ×{fails} deviceId={device_id} port={port}")
# 达阈值才移除(避免误杀)
if fails >= FAIL_THRESHOLD:
print(f"[iproxy-check] 连续失败 {fails} 次,移除设备 deviceId={device_id} port={port}")
try:
self._remove_device(device_id) # 这里面可安全地改 self._models
self._remove_device(device_id)
except Exception as e:
print(f"[iproxy-check] _remove_device 异常 deviceId={device_id}: {e}")
else:
# 心跳日志按需开启,避免刷屏
# print(f"[iproxy-check] OK deviceId={device_id} port={port}")
pass
finally:
self._iproxy_fail_count.pop(device_id, None)
except Exception as e:
print(f"[iproxy-check] 单设备检查异常: {e}")
# 8秒间隔
time.sleep(10)
time.sleep(INTERVAL_SEC)
def listen(self):
LogManager.method_info("进入主循环", "listen", udid="system")
@@ -221,7 +365,7 @@ class DeviceInfo:
if (now - self._first_seen.get(udid, now)) >= self.ADD_STABLE_SEC:
print(f"[Add] 检测到新设备: {udid}")
try:
self._add_device(udid)
self._add_device(udid) # ← 并发包装器
except Exception as e:
LogManager.method_error(f"新增失败:{e}", "listen", udid=udid)
print(f"[Add] 新增失败 {udid}: {e}")
@@ -265,65 +409,83 @@ class DeviceInfo:
print(f"[WDA] /status@{local_port} 等待超时 {udid}")
return False
def _add_device(self, udid: str):
# ---------------- 原 _add_device 实现:整体改名为 _add_device_impl ----------------
def _add_device_impl(self, udid: str):
print(f"[Add] 开始新增设备 {udid}")
if not self._trusted(udid):
print(f"[Add] 未信任设备 {udid}, 跳过")
return
try:
dev = tidevice.Device(udid)
major = int(dev.product_version.split(".")[0])
except Exception:
major = 0
if not self._wda_http_status_ok_once(udid):
if major > 17:
print("进入iOS17设备的分支")
out = IOSActivator().activate(udid)
print("wda启动完成")
else:
print(f"[WDA] iOS<=17 启动 WDA app_start (port={wdaScreenPort})")
dev = tidevice.Device(udid)
dev.app_start(WdaAppBundleId)
time.sleep(2)
if not self._wait_wda_ready_http(udid, self.WDA_READY_TIMEOUT):
print(f"[WDA] WDA 未在超时内就绪, 放弃新增 {udid}")
return
print(f"[WDA] WDA 就绪,准备获取屏幕信息 {udid}")
# 给 WDA 一点稳定时间,避免刚 ready 就查询卡住
time.sleep(0.5)
# 带超时的屏幕信息获取,避免卡死在 USBClient 调用里
w, h, s = self._screen_info_with_timeout(udid, timeout=3.5)
if not (w and h and s):
# 再做几次快速重试(带超时)
for i in range(4):
print(f"[Screen] 第{i + 1}次获取失败, 重试中... {udid}")
time.sleep(0.6)
w, h, s = self._screen_info_with_timeout(udid, timeout=3.5)
if w and h and s:
break
if not (w and h and s):
print(f"[Screen] 屏幕信息仍为空,继续添加 {udid}")
# 先分配一个“正式使用”的本地端口,并立即起 iproxy只起这一回
port = self._alloc_port()
print(f"[iproxy] 准备启动 iproxy 映射 {port}->{wdaScreenPort}")
print(f"[iproxy] 准备启动 iproxy 映射 {port}->{wdaScreenPort} (正式)")
proc = self._start_iproxy(udid, local_port=port)
if not proc:
self._release_port(port)
print(f"[iproxy] 启动失败,放弃新增 {udid}")
return
# 判断 WDA 是否已就绪;如果未就绪,按原逻辑拉起 WDA 并等到就绪
try:
dev = tidevice.Device(udid)
major = int(dev.product_version.split(".")[0])
except Exception:
major = 0
# 直接用“正式端口”探测 /status避免再启一次临时 iproxy
if not self._wait_wda_ready_on_port(udid, local_port=port, total_timeout_sec=3.0):
# 如果还没起来,按你原逻辑拉起 WDA 再等
if major > 17:
print("进入iOS17设备的分支")
try:
IOSActivator().activate(udid)
print("wda启动完成")
except Exception as e:
print(f"[WDA] iOS17 激活异常: {e}")
else:
print(f"[WDA] iOS<=17 启动 WDA app_start (port={wdaScreenPort})")
try:
dev = tidevice.Device(udid)
dev.app_start(WdaAppBundleId)
time.sleep(2)
except Exception as e:
print(f"[WDA] app_start 异常: {e}")
if not self._wait_wda_ready_on_port(udid, local_port=port, total_timeout_sec=self.WDA_READY_TIMEOUT):
print(f"[WDA] WDA 未在超时内就绪, 放弃新增 {udid}")
# 清理已起的正式 iproxy
try:
self._kill(proc)
except Exception:
pass
self._release_port(port)
return
print(f"[WDA] WDA 就绪,准备获取屏幕信息 {udid}")
time.sleep(0.5)
# 带超时的屏幕信息获取(保留你原有容错/重试)
w, h, s = self._screen_info_with_timeout(udid, timeout=3.5)
if not (w and h and s):
for i in range(4):
print(f"[Screen] 第{i + 1}次获取失败, 重试中... {udid}")
time.sleep(0.6)
w, h, s = self._screen_info_with_timeout(udid, timeout=3.5)
if w and h and s:
break
if not (w and h and s):
print(f"[Screen] 屏幕信息仍为空,继续添加 {udid}")
# 写入模型 & 发送前端
with self._lock:
model = DeviceModel(deviceId=udid, screenPort=port, width=w, height=h, scale=s, type=1)
model.ready = True
self._models[udid] = model
self._iproxy[udid] = proc
self._port_by_udid[udid] = port
if hasattr(self, "_iproxy_fail_count"):
self._iproxy_fail_count[udid] = 0
print(f"[Manager] 准备发送设备数据到前端 {udid}")
self._manager_send(model)
@@ -343,6 +505,7 @@ class DeviceInfo:
self._port_by_udid.pop(udid, None)
self._first_seen.pop(udid, None)
self._last_seen.pop(udid, None)
self._iproxy_fail_count.pop(udid, None)
# --- 2. 锁外执行重操作 ---
# 杀进程
@@ -528,11 +691,23 @@ class DeviceInfo:
print(f"[Proc] 结束进程异常: {e}")
def _manager_send(self, model: DeviceModel):
"""
轻量自愈:首次 send 失败 → start() 一次并重试一次;不抛异常。
这样 34566 刚起时不丢“上车”事件,前端更快看到设备。
"""
try:
self._manager.send(model.toDict())
print(f"[Manager] 已发送前端数据 {model.deviceId}")
if self._manager.send(model.toDict()):
print(f"[Manager] 已发送前端数据 {model.deviceId}")
return
except Exception as e:
print(f"[Manager] 发送异常: {e}")
print(f"[Manager] 首次发送异常: {e}")
# 自愈:拉起一次并重试一次
try:
if self._manager.start() and self._manager.send(model.toDict()):
print(f"[Manager] 重试发送成功 {model.deviceId}")
return
except Exception as e:
print(f"[Manager] 重试发送异常: {e}")
def _find_iproxy(self) -> str:
env_path = os.getenv("IPROXY_PATH")
@@ -545,4 +720,4 @@ class DeviceInfo:
if path.is_file():
print(f"[iproxy] 使用默认路径 {path}")
return str(path)
raise FileNotFoundError(f"iproxy 不存在: {path}")
raise FileNotFoundError(f"iproxy 不存在: {path}")