优化iproxy看门狗

This commit is contained in:
2025-11-20 16:49:37 +08:00
parent 33f261e8af
commit d96a19c659

View File

@@ -1,12 +1,4 @@
# -*- coding: utf-8 -*-
"""
极简稳定版设备监督器DeviceInfo加详细 print 日志
- 每个关键节点都会 print便于人工观察执行到哪一步
- 保留核心逻辑:监听上下线 / 启动 WDA / 起 iproxy / 通知前端
- 并发提速_add_device 异步化(受控并发)
- iproxy 守护:本地端口 + /status 探活,不通则自愈重启;连续失败达阈值才移除
"""
import datetime
import http.client import http.client
import json import json
import os import os
@@ -18,13 +10,11 @@ import time
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from pathlib import Path from pathlib import Path
from typing import Dict, Optional from typing import Dict, Optional
import psutil import psutil
import tidevice import tidevice
import wda import wda
from tidevice import Usbmux, ConnectionType from tidevice import Usbmux, ConnectionType
from tidevice._device import BaseDevice from tidevice._device import BaseDevice
from Entity.DeviceModel import DeviceModel from Entity.DeviceModel import DeviceModel
from Entity.Variables import WdaAppBundleId, wdaScreenPort, wdaFunctionPort from Entity.Variables import WdaAppBundleId, wdaScreenPort, wdaFunctionPort
from Module.FlaskSubprocessManager import FlaskSubprocessManager from Module.FlaskSubprocessManager import FlaskSubprocessManager
@@ -119,7 +109,6 @@ class DeviceInfo:
ADD_STABLE_SEC = float(os.getenv("ADD_STABLE_SEC", "2.0")) ADD_STABLE_SEC = float(os.getenv("ADD_STABLE_SEC", "2.0"))
REMOVE_GRACE_SEC = float(os.getenv("REMOVE_GRACE_SEC", "6.0")) REMOVE_GRACE_SEC = float(os.getenv("REMOVE_GRACE_SEC", "6.0"))
WDA_READY_TIMEOUT = float(os.getenv("WDA_READY_TIMEOUT", "35.0"))
def __init__(self) -> None: def __init__(self) -> None:
# 防止多次初始化(因为 __init__ 每次调用 DeviceInfo() 都会执行) # 防止多次初始化(因为 __init__ 每次调用 DeviceInfo() 都会执行)
@@ -170,7 +159,7 @@ class DeviceInfo:
if getattr(self, "_add_executor", None) is None: if getattr(self, "_add_executor", None) is None:
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
import os import os
max_workers = int(os.getenv("DEVICE_ADD_WORKERS", "6")) max_workers = 6
self._add_executor = ThreadPoolExecutor( self._add_executor = ThreadPoolExecutor(
max_workers=max_workers, max_workers=max_workers,
thread_name_prefix="dev-add" thread_name_prefix="dev-add"
@@ -231,59 +220,29 @@ class DeviceInfo:
except Exception: except Exception:
pass pass
# =============== iproxy 健康检查 / 自愈 =============== def _iproxy_health_ok(self, port: int) -> bool:
def _iproxy_tcp_probe(self, port: int, timeout: float = 0.6) -> bool:
"""快速 TCP 探测:能建立连接即认为本地监听正常。"""
try: try:
with socket.create_connection(("127.0.0.1", int(port)), timeout=timeout): conn = http.client.HTTPConnection("127.0.0.1", int(port), timeout=1.5)
return True conn.request("GET", "/")
except Exception:
return False
def _iproxy_http_status_ok_quick(self, port: int, timeout: float = 1.2) -> bool:
"""
轻量 HTTP 探测GET /status
- 成功返回 2xx/3xx 视为 OK
- 4xx/5xx 也说明链路畅通(服务可交互),这里统一认为 OK避免误判
"""
try:
conn = http.client.HTTPConnection("127.0.0.1", int(port), timeout=timeout)
conn.request("GET", "/status")
resp = conn.getresponse() resp = conn.getresponse()
_ = resp.read(128)
code = getattr(resp, "status", 0) status = getattr(resp, "status", 0)
ctype = resp.getheader("Content-Type", "") or ""
conn.close() conn.close()
# 任何能返回 HTTP 的,都说明“有服务可交互”
return 100 <= code <= 599 ok = (200 <= status < 400) and ("multipart" in ctype.lower())
except Exception:
if not ok:
LogManager.error(
f"[iproxy] 健康检查失败: status={status}, ctype={ctype!r}, port={port}"
)
return ok
except Exception as e:
LogManager.error(f"[iproxy] 健康检查异常 port={port}: {e}")
return False return False
def _video_stream_ok(self, udid: str, port: int) -> bool:
try:
conn = http.client.HTTPConnection("127.0.0.1", int(port), timeout=1.2)
conn.request("GET", "/screen") # 你们实际的视频流接口
resp = conn.getresponse()
_ = resp.read(64)
conn.close()
return 200 <= resp.status < 500 # 任何可交互状态都算活着
except:
return False
def _iproxy_health_ok(self, udid: str, port: int) -> bool:
# 1) 监听检测:不通直接 False
if not self._iproxy_tcp_probe(port, timeout=0.6):
LogManager.error("检测到有设备视频流异常")
return False
# 2) 业务探测:/status 慢可能是 WDA 卡顿;失败不等同于“端口坏”
if not self._iproxy_http_status_ok_quick(port, timeout=1.2):
LogManager.error("检测到有设备视频流异常")
return False
# 3) 视频流接口健康才算真健康
if not self._video_stream_ok(udid, port):
LogManager.error("检测到有设备视频流异常")
return False
return True
def _restart_iproxy(self, udid: str, port: int) -> bool: def _restart_iproxy(self, udid: str, port: int) -> bool:
"""干净重启 iproxy先杀旧的再启动新的并等待监听。""" """干净重启 iproxy先杀旧的再启动新的并等待监听。"""
@@ -366,8 +325,7 @@ class DeviceInfo:
print(f"[WDA-guard] 已有现成端口 {port},等待 WDA 在该端口就绪 {udid}") print(f"[WDA-guard] 已有现成端口 {port},等待 WDA 在该端口就绪 {udid}")
ok = self._wait_wda_ready_on_port( ok = self._wait_wda_ready_on_port(
udid, udid,
local_port=port, local_port=wdaFunctionPort,
total_timeout_sec=self.WDA_READY_TIMEOUT,
) )
if not ok: if not ok:
print(f"[WDA-guard] WDA 在端口 {port} 未在超时内就绪 {udid}") print(f"[WDA-guard] WDA 在端口 {port} 未在超时内就绪 {udid}")
@@ -384,204 +342,75 @@ class DeviceInfo:
# =============== 一轮检查:先自愈,仍失败才考虑移除 ================= # =============== 一轮检查:先自愈,仍失败才考虑移除 =================
def check_iproxy_ports(self): def check_iproxy_ports(self):
"""
后台守护 iproxy 健康状态的看门狗线程:
- 定期遍历当前所有设备对应的 iproxy 进程 + 端口 print("[Guard] iproxy+WDA 守护线程启动")
- 先做健康检查TCP + HTTP
- 如不健康:
1先尝试重启 iproxy第一层自愈
2连续多次失败后尝试重启 WDA第二层自愈
3两层自愈都失败多次以后将设备标记为ready=False, streamBroken=True
但不移除设备,避免“列表里忽隐忽现”的抖动
"""
import os
import time
# 失败计数达到多少次,触发 WDA 重启(第二级自愈)
WDA_RESTART_THRESHOLD = int(os.getenv("WDA_RESTART_THRESHOLD", "2"))
# 总失败次数达到多少次,认为这台设备当前整体不可用,降级 ready/streamBroken
FAIL_THRESHOLD = int(os.getenv("IPROXY_FAIL_THRESHOLD", "3"))
# 给整个系统一点启动缓冲时间
time.sleep(int(os.getenv("IPROXY_CHECK_START_DELAY", "10")))
print("[iproxy-check] iproxy 守护线程已启动")
while True: while True:
try: try:
# 复制一份当前快照,避免遍历过程中被修改
with self._lock: with self._lock:
items = list(self._iproxy.items()) udids = list(self._models.keys())
# 同时需要端口映射
ports_map = dict(self._port_by_udid) for udid in udids:
with self._lock:
port = self._port_by_udid.get(udid)
fail = self._iproxy_fail_count.get(udid, 0)
for device_id, proc in items:
# 没端口信息,说明这台设备还没完成初始化/已经被清理,跳过
port = ports_map.get(device_id)
if not port: if not port:
continue continue
# 进程对象可能已经被 kill/None做个防御 # ==== 第一层:视频流健康检查 ====
if proc is None: ok_stream = self._iproxy_health_ok(port)
print(f"[iproxy-check] 发现 {device_id} 没有 iproxy 进程对象,记录一次失败")
fails = self._iproxy_fail_count.get(device_id, 0) + 1
self._iproxy_fail_count[device_id] = fails
continue
# ---------- 第一步:健康探测 ---------- if ok_stream:
ok = False # 成功 → 清零失败计数
try:
ok = self._iproxy_health_ok(device_id, port)
except Exception as e:
ok = False
print(f"[iproxy-check] 健康检查异常 deviceId={device_id} port={port}: {e}")
if ok:
# 健康 → 失败计数清零,如之前标记过 streamBroken/ready=False可考虑恢复
if self._iproxy_fail_count.get(device_id):
print(f"[iproxy-check] 设备恢复健康,清零失败计数 deviceId={device_id} port={port}")
self._iproxy_fail_count[device_id] = 0
# 如果之前降级过,这里顺便恢复 ready/streamBroken
with self._lock: with self._lock:
m = self._models.get(device_id) self._iproxy_fail_count[udid] = 0
need_report = False
if m:
prev_ready = getattr(m, "ready", True)
prev_broken = getattr(m, "streamBroken", False)
if (not prev_ready) or prev_broken:
m.ready = True
if prev_broken:
try:
delattr(m, "streamBroken")
except Exception:
setattr(m, "streamBroken", False)
need_report = True
if need_report:
try:
self._manager_send()
except Exception as e:
print(f"[iproxy-check] 上报设备恢复异常 deviceId={device_id}: {e}")
# 这台设备没事了,检查下一台
continue continue
# ---------- 第二步:不健康 → 尝试第一层自愈(重启 iproxy ---------- # ------ 以下为失败处理 ------
print(f"[iproxy-check] 探活失败,准备自愈重启 iproxy deviceId={device_id} port={port}") fail += 1
healed = False with self._lock:
try: self._iproxy_fail_count[udid] = fail
healed = self._restart_iproxy(device_id, port)
except Exception as e:
healed = False
print(f"[iproxy-check] _restart_iproxy 调用异常 deviceId={device_id} port={port}: {e}")
ok2 = False print(f"[Guard] 第 {fail} 次失败 udid={udid}, port={port}")
if healed:
try:
ok2 = self._iproxy_health_ok(device_id, port)
except Exception as e:
ok2 = False
print(f"[iproxy-check] 自愈后健康检查异常 deviceId={device_id} port={port}: {e}")
if ok2: # ==== 第 12 次失败 → 优先重启 iproxy ====
print(f"[iproxy-check] iproxy 自愈成功 deviceId={device_id} port={port}") if fail in (1, 2):
self._iproxy_fail_count[device_id] = 0 print(f"[Guard] 尝试重启 iproxy{fail} 次){udid}")
# 有可能之前是降级状态,这里也做一次恢复 if self._restart_iproxy(udid, port):
with self._lock: time.sleep(1.5)
m = self._models.get(device_id) continue
need_report = False
if m:
prev_ready = getattr(m, "ready", True)
prev_broken = getattr(m, "streamBroken", False)
if (not prev_ready) or prev_broken:
m.ready = True
if prev_broken:
try:
delattr(m, "streamBroken")
except Exception:
setattr(m, "streamBroken", False)
need_report = True
if need_report:
try:
self._manager_send()
except Exception as e:
print(f"[iproxy-check] 上报 iproxy 自愈恢复异常 deviceId={device_id}: {e}")
continue
# ---------- 第三步iporxy 自愈失败,累计失败计数 ---------- # ==== 第 34 次失败 → 检查 WDA 状态 ====
fails = self._iproxy_fail_count.get(device_id, 0) + 1 if fail in (3, 4):
self._iproxy_fail_count[device_id] = fails print(f"[Guard] 检查 WDA 状态 {udid}")
print(f"[iproxy-check] iproxy 自愈失败 ×{fails} deviceId={device_id} port={port}") wda_ok = self._wait_wda_ready_on_port(udid, wdaFunctionPort)
# ---------- 第四步:第二级自愈 → 重启 WDA ---------- if not wda_ok:
if fails >= WDA_RESTART_THRESHOLD: print(f"[Guard] WDA 异常 → 尝试重启 WDA{fail} 次){udid}")
print(f"[iproxy-check] 连续失败 {fails} 次,尝试重启 WDA deviceId={device_id}") if self._restart_wda(udid):
time.sleep(1.2)
wda_ok = False continue
try: else:
wda_ok = self._restart_wda(device_id) print(f"[Guard] WDA 正常,但视频流挂了 → 再重启 iproxy")
except Exception as e: if self._restart_iproxy(udid, port):
wda_ok = False time.sleep(1.5)
print(f"[iproxy-check] 调用 _restart_wda 异常 deviceId={device_id}: {e}")
if wda_ok:
# WDA 重启成功后,再做一次健康检查
ok3 = False
try:
ok3 = self._iproxy_health_ok(device_id, port)
except Exception as e:
ok3 = False
print(f"[iproxy-check] WDA 重启后健康检查异常 deviceId={device_id} port={port}: {e}")
if ok3:
print(f"[iproxy-check] WDA 重启后恢复正常 deviceId={device_id} port={port}")
self._iproxy_fail_count[device_id] = 0
with self._lock:
m = self._models.get(device_id)
need_report = False
if m:
prev_ready = getattr(m, "ready", True)
prev_broken = getattr(m, "streamBroken", False)
if (not prev_ready) or prev_broken:
m.ready = True
if prev_broken:
try:
delattr(m, "streamBroken")
except Exception:
setattr(m, "streamBroken", False)
need_report = True
if need_report:
try:
self._manager_send()
except Exception as e:
print(f"[iproxy-check] 上报 WDA 自愈恢复异常 deviceId={device_id}: {e}")
# 这台设备已经恢复,继续下一台
continue continue
else:
print(f"[iproxy-check] WDA 重启后仍不健康 deviceId={device_id} port={port}")
# ---------- 第五步:自愈 + 重启 WDA 都不行 → 按 FAIL_THRESHOLD 降级 ---------- # ==== 第 5 次失败 → 移除设备 ====
if fails >= FAIL_THRESHOLD: if fail >= 5:
with self._lock: print(f"[Guard] 连续 5 次失败,移除设备 {udid}")
m = self._models.get(device_id)
if m:
m.ready = False
setattr(m, "streamBroken", True)
try: try:
if m: self._remove_device(udid)
print(
f"[iproxy-check] 连续失败 {fails} 次,降级设备(保留在线) deviceId={device_id} port={port}"
)
self._manager_send()
except Exception as e: except Exception as e:
print(f"[iproxy-check] 上报降级异常 deviceId={device_id}: {e}") print(f"[Guard] 移除异常: {e}")
continue
# 每轮检查间隔,可按需调整
time.sleep(int(os.getenv("IPROXY_CHECK_INTERVAL", "5")))
except Exception as e: except Exception as e:
# 整个循环防御,避免线程因为异常退出 print(f"[Guard] 守护线程异常: {e}")
print(f"[iproxy-check] 守护线程异常: {e}")
time.sleep(5) time.sleep(2.0)
def listen(self): def listen(self):
LogManager.method_info("进入主循环", "listen", udid="system") LogManager.method_info("进入主循环", "listen", udid="system")
@@ -606,8 +435,6 @@ class DeviceInfo:
for udid in online - known: for udid in online - known:
if (now - self._first_seen.get(udid, now)) >= self.ADD_STABLE_SEC: if (now - self._first_seen.get(udid, now)) >= self.ADD_STABLE_SEC:
print(datetime.datetime.now().strftime("%H:%M:%S"))
print(f"[Add] 检测到新设备: {udid}")
try: try:
self._add_device(udid) # ← 并发包装器 self._add_device(udid) # ← 并发包装器
except Exception as e: except Exception as e:
@@ -628,30 +455,16 @@ class DeviceInfo:
time.sleep(1) time.sleep(1)
def _wait_wda_ready_on_port(self, udid: str, local_port: int, total_timeout_sec: float = None) -> bool: # 检测设备wda状态
"""在给定的本地映射端口上等待 /status 就绪。""" def _wait_wda_ready_on_port(self, udid: str, local_port: int) -> bool:
import http.client, time try:
if total_timeout_sec is None: dev = wda.USBClient(udid, local_port)
total_timeout_sec = self.WDA_READY_TIMEOUT info = dev.status() # 调用成功即可说明 WDA 正常
deadline = _monotonic() + total_timeout_sec return info["ready"]
attempt = 0 except Exception as e:
while _monotonic() < deadline: print(f"[WDA] status 异常({udid}): {e}")
attempt += 1 return False
try:
conn = http.client.HTTPConnection("127.0.0.1", local_port, timeout=1.8)
conn.request("GET", "/status")
resp = conn.getresponse()
_ = resp.read(128)
code = getattr(resp, "status", 0)
ok = 200 <= code < 400
print(f"[WDA] /status@{local_port}{attempt}次 code={code}, ok={ok} {udid}")
if ok:
return True
except Exception as e:
print(f"[WDA] /status@{local_port} 异常({attempt}): {e}")
time.sleep(0.5)
print(f"[WDA] /status@{local_port} 等待超时 {udid}")
return False
def _send_snapshot_to_flask(self): def _send_snapshot_to_flask(self):
"""把当前 _models 的全量快照发送给 Flask 进程""" """把当前 _models 的全量快照发送给 Flask 进程"""
@@ -755,7 +568,7 @@ class DeviceInfo:
major = 0 major = 0
# 直接用“正式端口”探测 /status避免再启一次临时 iproxy # 直接用“正式端口”探测 /status避免再启一次临时 iproxy
if not self._wait_wda_ready_on_port(udid, local_port=port, total_timeout_sec=3.0): if not self._wait_wda_ready_on_port(udid, local_port=wdaFunctionPort):
# 如果还没起来,按你原逻辑拉起 WDA 再等 # 如果还没起来,按你原逻辑拉起 WDA 再等
if major >= 17: if major >= 17:
print("进入 iOS17+ 设备的分支") print("进入 iOS17+ 设备的分支")
@@ -782,7 +595,7 @@ class DeviceInfo:
except Exception as e: except Exception as e:
print(f"[WDA] app_start 异常: {e}") print(f"[WDA] app_start 异常: {e}")
if not self._wait_wda_ready_on_port(udid, local_port=port, total_timeout_sec=self.WDA_READY_TIMEOUT): if not self._wait_wda_ready_on_port(udid, local_port=wdaFunctionPort):
print(f"[WDA] WDA 未在超时内就绪, 放弃新增 {udid}") print(f"[WDA] WDA 未在超时内就绪, 放弃新增 {udid}")
# 清理已起的正式 iproxy # 清理已起的正式 iproxy
try: try:
@@ -819,7 +632,6 @@ class DeviceInfo:
print(f"[Manager] 准备发送设备数据到前端 {udid}") print(f"[Manager] 准备发送设备数据到前端 {udid}")
self._manager_send() self._manager_send()
print(datetime.datetime.now().strftime("%H:%M:%S"))
print(f"[Add] 设备添加成功 {udid}, port={port}, {w}x{h}@{s}") print(f"[Add] 设备添加成功 {udid}, port={port}, {w}x{h}@{s}")
def _remove_device(self, udid: str): def _remove_device(self, udid: str):
@@ -873,61 +685,6 @@ class DeviceInfo:
print(f"[Trust] 设备 {udid} 未信任") print(f"[Trust] 设备 {udid} 未信任")
return False return False
def _wda_http_status_ok_once(self, udid: str, timeout_sec: float = 1.8) -> bool:
"""只做一次 /status 探测。任何异常都返回 False不让外层炸掉。"""
tmp_port = None
proc = None
try:
tmp_port = self._alloc_port() # 这里可能抛异常
print(f"[WDA] 启动临时 iproxy 以检测 /status {udid}")
proc = self._spawn_iproxy(udid, local_port=tmp_port, remote_port=wdaScreenPort)
if not proc:
print("[WDA] 启动临时 iproxy 失败")
return False
if not self._wait_until_listening(tmp_port, 3.0):
print(f"[WDA] 临时端口未监听 {tmp_port}")
return False
# 最多两次快速探测
for i in (1, 2):
try:
import http.client
conn = http.client.HTTPConnection("127.0.0.1", tmp_port, timeout=timeout_sec)
conn.request("GET", "/status")
resp = conn.getresponse()
_ = resp.read(128)
code = getattr(resp, "status", 0)
ok = 200 <= code < 400
print(f"[WDA] /status 第{i}次 code={code}, ok={ok}")
if ok:
return True
except Exception as e:
print(f"[WDA] /status 异常({i}): {e}")
time.sleep(0.25)
return False
except Exception as e:
import traceback
print(f"[WDA][probe] 异常:{e}\n{traceback.format_exc()}")
return False
finally:
if proc:
self._kill(proc)
if tmp_port is not None:
self._release_port(tmp_port)
def _wait_wda_ready_http(self, udid: str, total_timeout_sec: float) -> bool:
print(f"[WDA] 等待 WDA Ready (超时 {total_timeout_sec}s) {udid}")
deadline = _monotonic() + total_timeout_sec
while _monotonic() < deadline:
if self._wda_http_status_ok_once(udid):
print(f"[WDA] WDA 就绪 {udid}")
return True
time.sleep(0.6)
print(f"[WDA] WDA 等待超时 {udid}")
return False
def _screen_info(self, udid: str): def _screen_info(self, udid: str):
try: try:
# 避免 c.home() 可能触发的阻塞,直接取 window_size # 避免 c.home() 可能触发的阻塞,直接取 window_size
@@ -1022,21 +779,6 @@ class DeviceInfo:
print(f"[Proc] 结束进程异常: {e}") print(f"[Proc] 结束进程异常: {e}")
def _manager_send(self): def _manager_send(self):
# try:
# if self._manager.send(model.toDict()):
# print(f"[Manager] 已发送前端数据 {model.deviceId}")
# return
# except Exception as e:
# print(f"[Manager] 首次发送异常: {e}")
#
# # 自愈:拉起一次并重试一次(不要用 and 连接)
# try:
# self._manager.start() # 不关心返回值
# if self._manager.send(model.toDict()):
# print(f"[Manager] 重试发送成功 {model.deviceId}")
# return
# except Exception as e:
# print(f"[Manager] 重试发送异常: {e}")
"""对外统一的“通知 Flask 有设备变动”的入口(无参数)。 """对外统一的“通知 Flask 有设备变动”的入口(无参数)。
作用:把当前所有设备的全量快照发给 Flask。 作用:把当前所有设备的全量快照发给 Flask。
""" """