优化iproxy看门狗

This commit is contained in:
2025-11-19 20:47:28 +08:00
parent 51f638f389
commit 611f4e46ac
3 changed files with 272 additions and 108 deletions

View File

@@ -258,14 +258,31 @@ class DeviceInfo:
except Exception:
return False
def _video_stream_ok(self, udid: str, port: int) -> bool:
try:
conn = http.client.HTTPConnection("127.0.0.1", int(port), timeout=1.2)
conn.request("GET", "/screen") # 你们实际的视频流接口
resp = conn.getresponse()
_ = resp.read(64)
conn.close()
return 200 <= resp.status < 500 # 任何可交互状态都算活着
except:
return False
def _iproxy_health_ok(self, udid: str, port: int) -> bool:
# 1) 监听检测:不通直接 False
if not self._iproxy_tcp_probe(port, timeout=0.6):
LogManager.error("检测到有设备视频流异常")
return False
# 2) 业务探测:/status 慢可能是 WDA 卡顿;失败不等同于“端口坏”
if not self._iproxy_http_status_ok_quick(port, timeout=1.2):
print(f"[iproxy-health] /status 超时,视为轻微异常 {udid}:{port}")
return True
LogManager.error("检测到有设备视频流异常")
return False
# 3) 视频流接口健康才算真健康
if not self._video_stream_ok(udid, port):
LogManager.error("检测到有设备视频流异常")
return False
return True
def _restart_iproxy(self, udid: str, port: int) -> bool:
@@ -297,127 +314,274 @@ class DeviceInfo:
print(f"[iproxy-guard] 重启成功 {udid} port={port}")
return True
# =============== 一轮检查:先自愈,仍失败才考虑移除 =================
def check_iproxy_ports(self, connect_timeout: float = 3) -> None:
def _restart_wda(self, udid: str) -> bool:
"""
周期性巡检(默认每 10s 一次
- 在线设备(type=1)
1) 先做 TCP+HTTP(/status) 探测(封装在 _iproxy_health_ok
2) 失败 → 自愈重启 iproxy仍失败则累计失败计数
3) 连续失败次数 >= 阈值 → 【不删除设备】只标记降级ready=False, streamBroken=True
4) 恢复时清零计数并标记恢复ready=True, streamBroken=False
重启指定设备上的 WDA用于已在系统中“在线”的设备
- 假定该设备已经完成过信任/配对,不再重复配对
- iOS 17+:直接调用 IOSActivator().activate(udid)
- iOS <=16走 tidevice.app_start(WdaAppBundleId)
如果当前已存在映射端口,则在该端口上等待 WDA /status 就绪。
"""
# 启动延迟,等新增流程跑起来,避免误判
time.sleep(20)
FAIL_THRESHOLD = int(os.getenv("IPROXY_FAIL_THRESHOLD", "3")) # 连续失败阈值(可用环境变量调)
INTERVAL_SEC = int(os.getenv("IPROXY_CHECK_INTERVAL", "10")) # 巡检间隔
print(f"[WDA-guard] 尝试重启 WDA: {udid}")
try:
while True:
snapshot = list(self._models.items()) # [(deviceId, DeviceModel), ...]
for device_id, model in snapshot:
try:
if model.type != 1:
# 离线设备清零计数
self._iproxy_fail_count.pop(device_id, None)
continue
dev = tidevice.Device(udid)
try:
major = int(dev.product_version.split(".")[0])
except Exception:
major = 0
print(f"[WDA-guard] 设备 {udid} iOS 主版本号 = {major}")
port = int(model.screenPort)
if port <= 0 or port > 65535:
continue
if major >= 17:
# -------- iOS 17+:不再重复配对,直接激活 --------
print(f"[WDA-guard] iOS17+ 设备,直接通过 IOSActivator 激活 {udid}")
try:
IOSActivator().activate(udid)
print(f"[WDA-guard] iOS17+ 通过 IOSActivator 激活完成 {udid}")
except Exception as e:
print(f"[WDA-guard] iOS17+ 激活 WDA 异常: {e}")
return False
# 健康探测
ok = self._iproxy_health_ok(device_id, port)
if ok:
# 健康:清零计数
if self._iproxy_fail_count.get(device_id):
self._iproxy_fail_count[device_id] = 0
else:
# -------- iOS 16 及以下:直接 app_start WDA --------
print(f"[WDA-guard] iOS<=16 设备,准备通过 tidevice.app_start 启动 WDA {udid}")
# app_stop 失败不致命,做一下容错
try:
dev.app_stop(WdaAppBundleId)
except Exception as e:
print(f"[WDA-guard] app_stop 异常(忽略):{e}")
try:
dev.app_start(WdaAppBundleId)
print(f"[WDA-guard] app_start 已调用 {udid}")
except Exception as e:
print(f"[WDA-guard] app_start 异常: {e}")
return False
# CHANGED: 若之前降级过,这里标记恢复并上报
need_report = False
with self._lock:
m = self._models.get(device_id)
if m:
prev_ready = getattr(m, "ready", True)
prev_broken = getattr(m, "streamBroken", False)
if (not prev_ready) or prev_broken:
m.ready = True
if prev_broken:
try:
delattr(m, "streamBroken")
except Exception:
setattr(m, "streamBroken", False)
need_report = True
if need_report and m:
try:
print(f"[iproxy-check] 自愈成功,恢复就绪 deviceId={device_id} port={port}")
self._manager_send()
except Exception as e:
print(f"[iproxy-check] 上报恢复异常 deviceId={device_id}: {e}")
# -------- 如果这台设备已经有固定的 screenPort就在该端口上等 WDA Ready --------
port = None
with self._lock:
port = self._port_by_udid.get(udid)
# print(f"[iproxy-check] OK deviceId={device_id} port={port}")
continue
if port:
print(f"[WDA-guard] 已有现成端口 {port},等待 WDA 在该端口就绪 {udid}")
ok = self._wait_wda_ready_on_port(
udid,
local_port=port,
total_timeout_sec=self.WDA_READY_TIMEOUT,
)
if not ok:
print(f"[WDA-guard] WDA 在端口 {port} 未在超时内就绪 {udid}")
return False
else:
print(f"[WDA-guard] 当前无已记录端口_port_by_udid 无 {udid}),仅完成 WDA 启动,不做就绪检测")
# 第一次失败:尝试自愈重启
print(f"[iproxy-check] 探活失败,准备自愈重启 deviceId={device_id} port={port}")
healed = self._restart_iproxy(device_id, port)
print(f"[WDA-guard] WDA 重启完成 {udid}")
return True
# 重启后再探测一次
ok2 = self._iproxy_health_ok(device_id, port) if healed else False
if ok2:
print(f"[iproxy-check] 自愈成功 deviceId={device_id} port={port}")
self._iproxy_fail_count[device_id] = 0
except Exception as e:
print(f"[WDA-guard] 重启 WDA 总体异常: {e}")
return False
# CHANGED: 若之前降级过,这里也顺便恢复并上报
need_report = False
with self._lock:
m = self._models.get(device_id)
if m:
prev_ready = getattr(m, "ready", True)
prev_broken = getattr(m, "streamBroken", False)
if (not prev_ready) or prev_broken:
m.ready = True
if prev_broken:
try:
delattr(m, "streamBroken")
except Exception:
setattr(m, "streamBroken", False)
need_report = True
if need_report and m:
try:
self._manager_send()
except Exception as e:
print(f"[iproxy-check] 上报恢复异常 deviceId={device_id}: {e}")
continue
# =============== 一轮检查:先自愈,仍失败才考虑移除 =================
def check_iproxy_ports(self):
"""
后台守护 iproxy 健康状态的看门狗线程:
# 自愈失败:累计失败计数
- 定期遍历当前所有设备对应的 iproxy 进程 + 端口
- 先做健康检查TCP + HTTP
- 如不健康:
1先尝试重启 iproxy第一层自愈
2连续多次失败后尝试重启 WDA第二层自愈
3两层自愈都失败多次以后将设备标记为ready=False, streamBroken=True
但不移除设备,避免“列表里忽隐忽现”的抖动
"""
import os
import time
# 失败计数达到多少次,触发 WDA 重启(第二级自愈)
WDA_RESTART_THRESHOLD = int(os.getenv("WDA_RESTART_THRESHOLD", "2"))
# 总失败次数达到多少次,认为这台设备当前整体不可用,降级 ready/streamBroken
FAIL_THRESHOLD = int(os.getenv("IPROXY_FAIL_THRESHOLD", "3"))
# 给整个系统一点启动缓冲时间
time.sleep(int(os.getenv("IPROXY_CHECK_START_DELAY", "10")))
print("[iproxy-check] iproxy 守护线程已启动")
while True:
try:
# 复制一份当前快照,避免遍历过程中被修改
with self._lock:
items = list(self._iproxy.items())
# 同时需要端口映射
ports_map = dict(self._port_by_udid)
for device_id, proc in items:
# 没端口信息,说明这台设备还没完成初始化/已经被清理,跳过
port = ports_map.get(device_id)
if not port:
continue
# 进程对象可能已经被 kill/None做个防御
if proc is None:
print(f"[iproxy-check] 发现 {device_id} 没有 iproxy 进程对象,记录一次失败")
fails = self._iproxy_fail_count.get(device_id, 0) + 1
self._iproxy_fail_count[device_id] = fails
print(f"[iproxy-check] 自愈失败 ×{fails} deviceId={device_id} port={port}")
# 达阈值 → 【不移除设备】,改为降级并上报(避免“删了又加”的抖动)
if fails >= FAIL_THRESHOLD:
with self._lock:
m = self._models.get(device_id)
if m:
m.ready = False
setattr(m, "streamBroken", True)
try:
if m:
print(
f"[iproxy-check] 连续失败 {fails} 次,降级设备(保留在线) deviceId={device_id} port={port}")
self._manager_send()
except Exception as e:
print(f"[iproxy-check] 上报降级异常 deviceId={device_id}: {e}")
continue
# ---------- 第一步:健康探测 ----------
ok = False
try:
ok = self._iproxy_health_ok(device_id, port)
except Exception as e:
print(f"[iproxy-check] 单设备检查异常: {e}")
ok = False
print(f"[iproxy-check] 健康检查异常 deviceId={device_id} port={port}: {e}")
time.sleep(INTERVAL_SEC)
except Exception as e:
print("检查iproxy状态遇到错误",e)
LogManager.error("检查iproxy状态遇到错误:",e)
if ok:
# 健康 → 失败计数清零,如之前标记过 streamBroken/ready=False可考虑恢复
if self._iproxy_fail_count.get(device_id):
print(f"[iproxy-check] 设备恢复健康,清零失败计数 deviceId={device_id} port={port}")
self._iproxy_fail_count[device_id] = 0
# 如果之前降级过,这里顺便恢复 ready/streamBroken
with self._lock:
m = self._models.get(device_id)
need_report = False
if m:
prev_ready = getattr(m, "ready", True)
prev_broken = getattr(m, "streamBroken", False)
if (not prev_ready) or prev_broken:
m.ready = True
if prev_broken:
try:
delattr(m, "streamBroken")
except Exception:
setattr(m, "streamBroken", False)
need_report = True
if need_report:
try:
self._manager_send()
except Exception as e:
print(f"[iproxy-check] 上报设备恢复异常 deviceId={device_id}: {e}")
# 这台设备没事了,检查下一台
continue
# ---------- 第二步:不健康 → 尝试第一层自愈(重启 iproxy ----------
print(f"[iproxy-check] 探活失败,准备自愈重启 iproxy deviceId={device_id} port={port}")
healed = False
try:
healed = self._restart_iproxy(device_id, port)
except Exception as e:
healed = False
print(f"[iproxy-check] _restart_iproxy 调用异常 deviceId={device_id} port={port}: {e}")
ok2 = False
if healed:
try:
ok2 = self._iproxy_health_ok(device_id, port)
except Exception as e:
ok2 = False
print(f"[iproxy-check] 自愈后健康检查异常 deviceId={device_id} port={port}: {e}")
if ok2:
print(f"[iproxy-check] iproxy 自愈成功 deviceId={device_id} port={port}")
self._iproxy_fail_count[device_id] = 0
# 有可能之前是降级状态,这里也做一次恢复
with self._lock:
m = self._models.get(device_id)
need_report = False
if m:
prev_ready = getattr(m, "ready", True)
prev_broken = getattr(m, "streamBroken", False)
if (not prev_ready) or prev_broken:
m.ready = True
if prev_broken:
try:
delattr(m, "streamBroken")
except Exception:
setattr(m, "streamBroken", False)
need_report = True
if need_report:
try:
self._manager_send()
except Exception as e:
print(f"[iproxy-check] 上报 iproxy 自愈恢复异常 deviceId={device_id}: {e}")
continue
# ---------- 第三步iporxy 自愈失败,累计失败计数 ----------
fails = self._iproxy_fail_count.get(device_id, 0) + 1
self._iproxy_fail_count[device_id] = fails
print(f"[iproxy-check] iproxy 自愈失败 ×{fails} deviceId={device_id} port={port}")
# ---------- 第四步:第二级自愈 → 重启 WDA ----------
if fails >= WDA_RESTART_THRESHOLD:
print(f"[iproxy-check] 连续失败 {fails} 次,尝试重启 WDA deviceId={device_id}")
wda_ok = False
try:
wda_ok = self._restart_wda(device_id)
except Exception as e:
wda_ok = False
print(f"[iproxy-check] 调用 _restart_wda 异常 deviceId={device_id}: {e}")
if wda_ok:
# WDA 重启成功后,再做一次健康检查
ok3 = False
try:
ok3 = self._iproxy_health_ok(device_id, port)
except Exception as e:
ok3 = False
print(f"[iproxy-check] WDA 重启后健康检查异常 deviceId={device_id} port={port}: {e}")
if ok3:
print(f"[iproxy-check] WDA 重启后恢复正常 deviceId={device_id} port={port}")
self._iproxy_fail_count[device_id] = 0
with self._lock:
m = self._models.get(device_id)
need_report = False
if m:
prev_ready = getattr(m, "ready", True)
prev_broken = getattr(m, "streamBroken", False)
if (not prev_ready) or prev_broken:
m.ready = True
if prev_broken:
try:
delattr(m, "streamBroken")
except Exception:
setattr(m, "streamBroken", False)
need_report = True
if need_report:
try:
self._manager_send()
except Exception as e:
print(f"[iproxy-check] 上报 WDA 自愈恢复异常 deviceId={device_id}: {e}")
# 这台设备已经恢复,继续下一台
continue
else:
print(f"[iproxy-check] WDA 重启后仍不健康 deviceId={device_id} port={port}")
# ---------- 第五步:自愈 + 重启 WDA 都不行 → 按 FAIL_THRESHOLD 降级 ----------
if fails >= FAIL_THRESHOLD:
with self._lock:
m = self._models.get(device_id)
if m:
m.ready = False
setattr(m, "streamBroken", True)
try:
if m:
print(
f"[iproxy-check] 连续失败 {fails} 次,降级设备(保留在线) deviceId={device_id} port={port}"
)
self._manager_send()
except Exception as e:
print(f"[iproxy-check] 上报降级异常 deviceId={device_id}: {e}")
# 每轮检查间隔,可按需调整
time.sleep(int(os.getenv("IPROXY_CHECK_INTERVAL", "5")))
except Exception as e:
# 整个循环防御,避免线程因为异常退出
print(f"[iproxy-check] 守护线程异常: {e}")
time.sleep(5)
def listen(self):
LogManager.method_info("进入主循环", "listen", udid="system")

View File

@@ -26,7 +26,7 @@ def _force_utf8_everywhere():
except Exception:
pass
# _force_utf8_everywhere()
_force_utf8_everywhere()
class LogManager:
"""

View File

@@ -15,7 +15,7 @@ python -m nuitka Module\Main.py ^
--include-data-dir=resources=resources ^
--include-data-dir=SupportFiles=SupportFiles ^
--include-data-files="resources/iproxy/*=resources/iproxy/" ^
--include-data-files=resources/icon.ico=resources/icon.ico ^Z
--include-data-files=resources/icon.ico=resources/icon.ico ^
--windows-icon-from-ico=resources/icon.ico ^
--noinclude-default-mode=nofollow ^
--nofollow-import-to=pytest,py.test,unittest,setuptools,doctest,IPython,pydoc ^