This commit is contained in:
2025-11-12 13:25:07 +08:00
5 changed files with 100 additions and 93 deletions

View File

@@ -6,6 +6,7 @@
- 并发提速_add_device 异步化(受控并发)
- iproxy 守护:本地端口 + /status 探活,不通则自愈重启;连续失败达阈值才移除
"""
import datetime
import os
import time
import threading
@@ -310,107 +311,111 @@ class DeviceInfo:
FAIL_THRESHOLD = int(os.getenv("IPROXY_FAIL_THRESHOLD", "3")) # 连续失败阈值(可用环境变量调)
INTERVAL_SEC = int(os.getenv("IPROXY_CHECK_INTERVAL", "10")) # 巡检间隔
while True:
snapshot = list(self._models.items()) # [(deviceId, DeviceModel), ...]
for device_id, model in snapshot:
try:
if model.type != 1:
# 离线设备清零计数
self._iproxy_fail_count.pop(device_id, None)
continue
try:
while True:
snapshot = list(self._models.items()) # [(deviceId, DeviceModel), ...]
for device_id, model in snapshot:
try:
if model.type != 1:
# 离线设备清零计数
self._iproxy_fail_count.pop(device_id, None)
continue
port = int(model.screenPort)
if port <= 0 or port > 65535:
continue
port = int(model.screenPort)
if port <= 0 or port > 65535:
continue
# 健康探测
ok = self._iproxy_health_ok(device_id, port)
if ok:
# 健康:清零计数
if self._iproxy_fail_count.get(device_id):
# 健康探测
ok = self._iproxy_health_ok(device_id, port)
if ok:
# 健康:清零计数
if self._iproxy_fail_count.get(device_id):
self._iproxy_fail_count[device_id] = 0
# CHANGED: 若之前降级过,这里标记恢复并上报
need_report = False
with self._lock:
m = self._models.get(device_id)
if m:
prev_ready = getattr(m, "ready", True)
prev_broken = getattr(m, "streamBroken", False)
if (not prev_ready) or prev_broken:
m.ready = True
if prev_broken:
try:
delattr(m, "streamBroken")
except Exception:
setattr(m, "streamBroken", False)
need_report = True
if need_report and m:
try:
print(f"[iproxy-check] 自愈成功,恢复就绪 deviceId={device_id} port={port}")
self._manager_send(m)
except Exception as e:
print(f"[iproxy-check] 上报恢复异常 deviceId={device_id}: {e}")
# print(f"[iproxy-check] OK deviceId={device_id} port={port}")
continue
# 第一次失败:尝试自愈重启
print(f"[iproxy-check] 探活失败,准备自愈重启 deviceId={device_id} port={port}")
healed = self._restart_iproxy(device_id, port)
# 重启后再探测一次
ok2 = self._iproxy_health_ok(device_id, port) if healed else False
if ok2:
print(f"[iproxy-check] 自愈成功 deviceId={device_id} port={port}")
self._iproxy_fail_count[device_id] = 0
# CHANGED: 若之前降级过,这里标记恢复并上报
need_report = False
with self._lock:
m = self._models.get(device_id)
if m:
prev_ready = getattr(m, "ready", True)
prev_broken = getattr(m, "streamBroken", False)
if (not prev_ready) or prev_broken:
m.ready = True
if prev_broken:
try:
delattr(m, "streamBroken")
except Exception:
setattr(m, "streamBroken", False)
need_report = True
if need_report and m:
# CHANGED: 若之前降级过,这里也顺便恢复并上报
need_report = False
with self._lock:
m = self._models.get(device_id)
if m:
prev_ready = getattr(m, "ready", True)
prev_broken = getattr(m, "streamBroken", False)
if (not prev_ready) or prev_broken:
m.ready = True
if prev_broken:
try:
delattr(m, "streamBroken")
except Exception:
setattr(m, "streamBroken", False)
need_report = True
if need_report and m:
try:
self._manager_send(m)
except Exception as e:
print(f"[iproxy-check] 上报恢复异常 deviceId={device_id}: {e}")
continue
# 自愈失败:累计失败计数
fails = self._iproxy_fail_count.get(device_id, 0) + 1
self._iproxy_fail_count[device_id] = fails
print(f"[iproxy-check] 自愈失败 ×{fails} deviceId={device_id} port={port}")
# 达阈值 → 【不移除设备】,改为降级并上报(避免“删了又加”的抖动)
if fails >= FAIL_THRESHOLD:
with self._lock:
m = self._models.get(device_id)
if m:
m.ready = False
setattr(m, "streamBroken", True)
try:
print(f"[iproxy-check] 自愈成功,恢复就绪 deviceId={device_id} port={port}")
self._manager_send(m)
if m:
print(
f"[iproxy-check] 连续失败 {fails} 次,降级设备(保留在线) deviceId={device_id} port={port}")
self._manager_send(m)
except Exception as e:
print(f"[iproxy-check] 上报恢复异常 deviceId={device_id}: {e}")
print(f"[iproxy-check] 上报降级异常 deviceId={device_id}: {e}")
# print(f"[iproxy-check] OK deviceId={device_id} port={port}")
continue
except Exception as e:
print(f"[iproxy-check] 单设备检查异常: {e}")
# 第一次失败:尝试自愈重启
print(f"[iproxy-check] 探活失败,准备自愈重启 deviceId={device_id} port={port}")
healed = self._restart_iproxy(device_id, port)
# 重启后再探测一次
ok2 = self._iproxy_health_ok(device_id, port) if healed else False
if ok2:
print(f"[iproxy-check] 自愈成功 deviceId={device_id} port={port}")
self._iproxy_fail_count[device_id] = 0
# CHANGED: 若之前降级过,这里也顺便恢复并上报
need_report = False
with self._lock:
m = self._models.get(device_id)
if m:
prev_ready = getattr(m, "ready", True)
prev_broken = getattr(m, "streamBroken", False)
if (not prev_ready) or prev_broken:
m.ready = True
if prev_broken:
try:
delattr(m, "streamBroken")
except Exception:
setattr(m, "streamBroken", False)
need_report = True
if need_report and m:
try:
self._manager_send(m)
except Exception as e:
print(f"[iproxy-check] 上报恢复异常 deviceId={device_id}: {e}")
continue
# 自愈失败:累计失败计数
fails = self._iproxy_fail_count.get(device_id, 0) + 1
self._iproxy_fail_count[device_id] = fails
print(f"[iproxy-check] 自愈失败 ×{fails} deviceId={device_id} port={port}")
# 达阈值 → 【不移除设备】,改为降级并上报(避免“删了又加”的抖动)
if fails >= FAIL_THRESHOLD:
with self._lock:
m = self._models.get(device_id)
if m:
m.ready = False
setattr(m, "streamBroken", True)
try:
if m:
print(
f"[iproxy-check] 连续失败 {fails} 次,降级设备(保留在线) deviceId={device_id} port={port}")
self._manager_send(m)
except Exception as e:
print(f"[iproxy-check] 上报降级异常 deviceId={device_id}: {e}")
except Exception as e:
print(f"[iproxy-check] 单设备检查异常: {e}")
time.sleep(INTERVAL_SEC)
time.sleep(INTERVAL_SEC)
except Exception as e:
print("检查iproxy状态遇到错误",e)
LogManager.error("检查iproxy状态遇到错误",e)
def listen(self):
LogManager.method_info("进入主循环", "listen", udid="system")
@@ -435,6 +440,7 @@ class DeviceInfo:
for udid in online - known:
if (now - self._first_seen.get(udid, now)) >= self.ADD_STABLE_SEC:
print(datetime.datetime.now().strftime("%H:%M:%S"))
print(f"[Add] 检测到新设备: {udid}")
try:
self._add_device(udid) # ← 并发包装器
@@ -561,6 +567,7 @@ class DeviceInfo:
print(f"[Manager] 准备发送设备数据到前端 {udid}")
self._manager_send(model)
print(datetime.datetime.now().strftime("%H:%M:%S"))
print(f"[Add] 设备添加成功 {udid}, port={port}, {w}x{h}@{s}")
def _remove_device(self, udid: str):