优化处监听iproxy逻辑,增加安全性
This commit is contained in:
@@ -6,6 +6,7 @@
|
||||
- 并发提速:_add_device 异步化(受控并发)
|
||||
- iproxy 守护:本地端口 + /status 探活,不通则自愈重启;连续失败达阈值才移除
|
||||
"""
|
||||
import datetime
|
||||
import os
|
||||
import time
|
||||
import threading
|
||||
@@ -310,107 +311,111 @@ class DeviceInfo:
|
||||
FAIL_THRESHOLD = int(os.getenv("IPROXY_FAIL_THRESHOLD", "3")) # 连续失败阈值(可用环境变量调)
|
||||
INTERVAL_SEC = int(os.getenv("IPROXY_CHECK_INTERVAL", "10")) # 巡检间隔
|
||||
|
||||
while True:
|
||||
snapshot = list(self._models.items()) # [(deviceId, DeviceModel), ...]
|
||||
for device_id, model in snapshot:
|
||||
try:
|
||||
if model.type != 1:
|
||||
# 离线设备清零计数
|
||||
self._iproxy_fail_count.pop(device_id, None)
|
||||
continue
|
||||
try:
|
||||
while True:
|
||||
snapshot = list(self._models.items()) # [(deviceId, DeviceModel), ...]
|
||||
for device_id, model in snapshot:
|
||||
try:
|
||||
if model.type != 1:
|
||||
# 离线设备清零计数
|
||||
self._iproxy_fail_count.pop(device_id, None)
|
||||
continue
|
||||
|
||||
port = int(model.screenPort)
|
||||
if port <= 0 or port > 65535:
|
||||
continue
|
||||
port = int(model.screenPort)
|
||||
if port <= 0 or port > 65535:
|
||||
continue
|
||||
|
||||
# 健康探测
|
||||
ok = self._iproxy_health_ok(device_id, port)
|
||||
if ok:
|
||||
# 健康:清零计数
|
||||
if self._iproxy_fail_count.get(device_id):
|
||||
# 健康探测
|
||||
ok = self._iproxy_health_ok(device_id, port)
|
||||
if ok:
|
||||
# 健康:清零计数
|
||||
if self._iproxy_fail_count.get(device_id):
|
||||
self._iproxy_fail_count[device_id] = 0
|
||||
|
||||
# CHANGED: 若之前降级过,这里标记恢复并上报
|
||||
need_report = False
|
||||
with self._lock:
|
||||
m = self._models.get(device_id)
|
||||
if m:
|
||||
prev_ready = getattr(m, "ready", True)
|
||||
prev_broken = getattr(m, "streamBroken", False)
|
||||
if (not prev_ready) or prev_broken:
|
||||
m.ready = True
|
||||
if prev_broken:
|
||||
try:
|
||||
delattr(m, "streamBroken")
|
||||
except Exception:
|
||||
setattr(m, "streamBroken", False)
|
||||
need_report = True
|
||||
if need_report and m:
|
||||
try:
|
||||
print(f"[iproxy-check] 自愈成功,恢复就绪 deviceId={device_id} port={port}")
|
||||
self._manager_send(m)
|
||||
except Exception as e:
|
||||
print(f"[iproxy-check] 上报恢复异常 deviceId={device_id}: {e}")
|
||||
|
||||
# print(f"[iproxy-check] OK deviceId={device_id} port={port}")
|
||||
continue
|
||||
|
||||
# 第一次失败:尝试自愈重启
|
||||
print(f"[iproxy-check] 探活失败,准备自愈重启 deviceId={device_id} port={port}")
|
||||
healed = self._restart_iproxy(device_id, port)
|
||||
|
||||
# 重启后再探测一次
|
||||
ok2 = self._iproxy_health_ok(device_id, port) if healed else False
|
||||
if ok2:
|
||||
print(f"[iproxy-check] 自愈成功 deviceId={device_id} port={port}")
|
||||
self._iproxy_fail_count[device_id] = 0
|
||||
|
||||
# CHANGED: 若之前降级过,这里标记恢复并上报
|
||||
need_report = False
|
||||
with self._lock:
|
||||
m = self._models.get(device_id)
|
||||
if m:
|
||||
prev_ready = getattr(m, "ready", True)
|
||||
prev_broken = getattr(m, "streamBroken", False)
|
||||
if (not prev_ready) or prev_broken:
|
||||
m.ready = True
|
||||
if prev_broken:
|
||||
try:
|
||||
delattr(m, "streamBroken")
|
||||
except Exception:
|
||||
setattr(m, "streamBroken", False)
|
||||
need_report = True
|
||||
if need_report and m:
|
||||
# CHANGED: 若之前降级过,这里也顺便恢复并上报
|
||||
need_report = False
|
||||
with self._lock:
|
||||
m = self._models.get(device_id)
|
||||
if m:
|
||||
prev_ready = getattr(m, "ready", True)
|
||||
prev_broken = getattr(m, "streamBroken", False)
|
||||
if (not prev_ready) or prev_broken:
|
||||
m.ready = True
|
||||
if prev_broken:
|
||||
try:
|
||||
delattr(m, "streamBroken")
|
||||
except Exception:
|
||||
setattr(m, "streamBroken", False)
|
||||
need_report = True
|
||||
if need_report and m:
|
||||
try:
|
||||
self._manager_send(m)
|
||||
except Exception as e:
|
||||
print(f"[iproxy-check] 上报恢复异常 deviceId={device_id}: {e}")
|
||||
continue
|
||||
|
||||
# 自愈失败:累计失败计数
|
||||
fails = self._iproxy_fail_count.get(device_id, 0) + 1
|
||||
self._iproxy_fail_count[device_id] = fails
|
||||
print(f"[iproxy-check] 自愈失败 ×{fails} deviceId={device_id} port={port}")
|
||||
|
||||
# 达阈值 → 【不移除设备】,改为降级并上报(避免“删了又加”的抖动)
|
||||
if fails >= FAIL_THRESHOLD:
|
||||
with self._lock:
|
||||
m = self._models.get(device_id)
|
||||
if m:
|
||||
m.ready = False
|
||||
setattr(m, "streamBroken", True)
|
||||
try:
|
||||
print(f"[iproxy-check] 自愈成功,恢复就绪 deviceId={device_id} port={port}")
|
||||
self._manager_send(m)
|
||||
if m:
|
||||
print(
|
||||
f"[iproxy-check] 连续失败 {fails} 次,降级设备(保留在线) deviceId={device_id} port={port}")
|
||||
self._manager_send(m)
|
||||
except Exception as e:
|
||||
print(f"[iproxy-check] 上报恢复异常 deviceId={device_id}: {e}")
|
||||
print(f"[iproxy-check] 上报降级异常 deviceId={device_id}: {e}")
|
||||
|
||||
# print(f"[iproxy-check] OK deviceId={device_id} port={port}")
|
||||
continue
|
||||
except Exception as e:
|
||||
print(f"[iproxy-check] 单设备检查异常: {e}")
|
||||
|
||||
# 第一次失败:尝试自愈重启
|
||||
print(f"[iproxy-check] 探活失败,准备自愈重启 deviceId={device_id} port={port}")
|
||||
healed = self._restart_iproxy(device_id, port)
|
||||
|
||||
# 重启后再探测一次
|
||||
ok2 = self._iproxy_health_ok(device_id, port) if healed else False
|
||||
if ok2:
|
||||
print(f"[iproxy-check] 自愈成功 deviceId={device_id} port={port}")
|
||||
self._iproxy_fail_count[device_id] = 0
|
||||
|
||||
# CHANGED: 若之前降级过,这里也顺便恢复并上报
|
||||
need_report = False
|
||||
with self._lock:
|
||||
m = self._models.get(device_id)
|
||||
if m:
|
||||
prev_ready = getattr(m, "ready", True)
|
||||
prev_broken = getattr(m, "streamBroken", False)
|
||||
if (not prev_ready) or prev_broken:
|
||||
m.ready = True
|
||||
if prev_broken:
|
||||
try:
|
||||
delattr(m, "streamBroken")
|
||||
except Exception:
|
||||
setattr(m, "streamBroken", False)
|
||||
need_report = True
|
||||
if need_report and m:
|
||||
try:
|
||||
self._manager_send(m)
|
||||
except Exception as e:
|
||||
print(f"[iproxy-check] 上报恢复异常 deviceId={device_id}: {e}")
|
||||
continue
|
||||
|
||||
# 自愈失败:累计失败计数
|
||||
fails = self._iproxy_fail_count.get(device_id, 0) + 1
|
||||
self._iproxy_fail_count[device_id] = fails
|
||||
print(f"[iproxy-check] 自愈失败 ×{fails} deviceId={device_id} port={port}")
|
||||
|
||||
# 达阈值 → 【不移除设备】,改为降级并上报(避免“删了又加”的抖动)
|
||||
if fails >= FAIL_THRESHOLD:
|
||||
with self._lock:
|
||||
m = self._models.get(device_id)
|
||||
if m:
|
||||
m.ready = False
|
||||
setattr(m, "streamBroken", True)
|
||||
try:
|
||||
if m:
|
||||
print(
|
||||
f"[iproxy-check] 连续失败 {fails} 次,降级设备(保留在线) deviceId={device_id} port={port}")
|
||||
self._manager_send(m)
|
||||
except Exception as e:
|
||||
print(f"[iproxy-check] 上报降级异常 deviceId={device_id}: {e}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"[iproxy-check] 单设备检查异常: {e}")
|
||||
|
||||
time.sleep(INTERVAL_SEC)
|
||||
time.sleep(INTERVAL_SEC)
|
||||
except Exception as e:
|
||||
print("检查iproxy状态遇到错误:",e)
|
||||
LogManager.error("检查iproxy状态遇到错误:",e)
|
||||
|
||||
def listen(self):
|
||||
LogManager.method_info("进入主循环", "listen", udid="system")
|
||||
@@ -435,6 +440,7 @@ class DeviceInfo:
|
||||
|
||||
for udid in online - known:
|
||||
if (now - self._first_seen.get(udid, now)) >= self.ADD_STABLE_SEC:
|
||||
print(datetime.datetime.now().strftime("%H:%M:%S"))
|
||||
print(f"[Add] 检测到新设备: {udid}")
|
||||
try:
|
||||
self._add_device(udid) # ← 并发包装器
|
||||
@@ -561,6 +567,7 @@ class DeviceInfo:
|
||||
|
||||
print(f"[Manager] 准备发送设备数据到前端 {udid}")
|
||||
self._manager_send(model)
|
||||
print(datetime.datetime.now().strftime("%H:%M:%S"))
|
||||
print(f"[Add] 设备添加成功 {udid}, port={port}, {w}x{h}@{s}")
|
||||
|
||||
def _remove_device(self, udid: str):
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Reference in New Issue
Block a user