加固添加设备逻辑。加固iproxy逻辑,加固flask

This commit is contained in:
2025-11-05 21:04:04 +08:00
parent 01580e2fb1
commit 36a57b2015
9 changed files with 133 additions and 49 deletions

View File

@@ -11,6 +11,7 @@ import time
import threading import threading
import subprocess import subprocess
import socket import socket
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path from pathlib import Path
from typing import Dict, Optional, List, Any from typing import Dict, Optional, List, Any
import platform import platform
@@ -132,6 +133,11 @@ class DeviceInfo:
self._manager = FlaskSubprocessManager.get_instance() self._manager = FlaskSubprocessManager.get_instance()
self._iproxy_path = self._find_iproxy() self._iproxy_path = self._find_iproxy()
# 懒加载线程池属性(供 _add_device 并发使用)
self._add_lock: Optional[threading.RLock] = None
self._adding_udids: Optional[set[str]] = None
self._add_executor: Optional["ThreadPoolExecutor"] = None
# iproxy 连续失败计数(守护用) # iproxy 连续失败计数(守护用)
self._iproxy_fail_count: Dict[str, int] = {} self._iproxy_fail_count: Dict[str, int] = {}
@@ -147,15 +153,20 @@ class DeviceInfo:
def _ensure_add_executor(self): def _ensure_add_executor(self):
""" """
懒加载:首次调用 _add_device 时初始化线程池与去重集合。 懒加载:首次调用 _add_device 时初始化线程池与去重集合。
不改 __init__避免对现有初始化节奏有影响 注意:不要只用 hasattr属性可能已在 __init__ 里置为 None
""" """
if not hasattr(self, "_add_lock"): # 1) 锁
if getattr(self, "_add_lock", None) is None:
self._add_lock = threading.RLock() self._add_lock = threading.RLock()
if not hasattr(self, "_adding_udids"):
# 2) 去重集合
if getattr(self, "_adding_udids", None) is None:
self._adding_udids = set() self._adding_udids = set()
if not hasattr(self, "_add_executor") or self._add_executor is None:
import os # 3) 线程池
if getattr(self, "_add_executor", None) is None:
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
import os
max_workers = max(2, min(6, (os.cpu_count() or 4) // 2)) max_workers = max(2, min(6, (os.cpu_count() or 4) // 2))
self._add_executor = ThreadPoolExecutor( self._add_executor = ThreadPoolExecutor(
max_workers=max_workers, max_workers=max_workers,
@@ -180,28 +191,40 @@ class DeviceInfo:
except Exception: except Exception:
pass pass
finally: finally:
with self._add_lock: lock = getattr(self, "_add_lock", None)
if lock is None:
# 极端容错,避免再次抛异常
self._add_lock = lock = threading.RLock()
with lock:
self._adding_udids.discard(udid) self._adding_udids.discard(udid)
def _add_device(self, udid: str): def _add_device(self, udid: str):
""" """并发包装器:保持所有调用点不变。"""
并发包装器保持所有调用点不变listen 里仍然调用 _add_device
- 懒加载线程池
- 同一 udid 防重提交
- 真实重逻辑放到 _add_device_impl下方已把你的原始实现迁过去
"""
self._ensure_add_executor() self._ensure_add_executor()
with self._add_lock:
if udid in self._adding_udids: # 保险:即使极端情况下属性仍是 None也就地补齐一次
lock = getattr(self, "_add_lock", None)
if lock is None:
self._add_lock = lock = threading.RLock()
adding = getattr(self, "_adding_udids", None)
if adding is None:
self._adding_udids = adding = set()
# 去重:同一 udid 只提交一次
with lock:
if udid in adding:
return return
self._adding_udids.add(udid) adding.add(udid)
try: try:
# 注意submit(fn, udid) —— 这里不是 *args=udid直接传第二个位置参数即可
self._add_executor.submit(self._safe_add_device, udid) self._add_executor.submit(self._safe_add_device, udid)
except Exception as e: except Exception as e:
with self._add_lock: # 提交失败要把去重标记清掉
self._adding_udids.discard(udid) with lock:
adding.discard(udid)
try: try:
LogManager.method_error(f"提交新增任务失败:{e}", "_add_device", udid=udid) LogManager.method_error(text=f"提交新增任务失败:{e}", method="_add_device", udid=udid)
except Exception: except Exception:
pass pass
@@ -233,13 +256,13 @@ class DeviceInfo:
return False return False
def _iproxy_health_ok(self, udid: str, port: int) -> bool: def _iproxy_health_ok(self, udid: str, port: int) -> bool:
"""综合健康判断:先 TCP后 HTTP /status。两者任一失败即为不健康。""" # 1) 监听检测:不通直接 False
# 先看端口是否真在监听
if not self._iproxy_tcp_probe(port, timeout=0.6): if not self._iproxy_tcp_probe(port, timeout=0.6):
return False return False
# 再看链路到后端是否通WDA 会回应 /status # 2) 业务探测:/status 慢可能是 WDA 卡顿;失败不等同于“端口坏”
if not self._iproxy_http_status_ok_quick(port, timeout=1.2): if not self._iproxy_http_status_ok_quick(port, timeout=1.2):
return False print(f"[iproxy-health] /status 超时,视为轻微异常 {udid}:{port}")
return True
return True return True
def _restart_iproxy(self, udid: str, port: int) -> bool: def _restart_iproxy(self, udid: str, port: int) -> bool:
@@ -276,16 +299,16 @@ class DeviceInfo:
""" """
周期性巡检(默认每 10s 一次): 周期性巡检(默认每 10s 一次):
- 在线设备(type=1) - 在线设备(type=1)
1) 先做 TCP 探测127.0.0.1:screenPort 1) 先做 TCP+HTTP(/status) 探测(封装在 _iproxy_health_ok
2) 再做 HTTP /status 探测 2) 失败 → 自愈重启 iproxy仍失败则累计失败计数
3) 任一失败 → 尝试自愈重启 iproxy若仍失败累计失败计数 3) 连续失败次数 >= 阈值 → 【不删除设备】只标记降级ready=False, streamBroken=True
4) 连续失败次数 >= 3 才移除设备(避免短暂抖动 4) 恢复时清零计数并标记恢复ready=True, streamBroken=False
""" """
# 启动延迟,等新增流程跑起来,避免误判 # 启动延迟,等新增流程跑起来,避免误判
time.sleep(20) time.sleep(20)
FAIL_THRESHOLD = 3 # 连续失败阈值 FAIL_THRESHOLD = int(os.getenv("IPROXY_FAIL_THRESHOLD", "3")) # 连续失败阈值(可用环境变量调)
INTERVAL_SEC = 10 # 巡检间隔 INTERVAL_SEC = int(os.getenv("IPROXY_CHECK_INTERVAL", "10")) # 巡检间隔
while True: while True:
snapshot = list(self._models.items()) # [(deviceId, DeviceModel), ...] snapshot = list(self._models.items()) # [(deviceId, DeviceModel), ...]
@@ -303,9 +326,32 @@ class DeviceInfo:
# 健康探测 # 健康探测
ok = self._iproxy_health_ok(device_id, port) ok = self._iproxy_health_ok(device_id, port)
if ok: if ok:
# 健康清零失败计数 # 健康清零计数
if self._iproxy_fail_count.get(device_id): if self._iproxy_fail_count.get(device_id):
self._iproxy_fail_count[device_id] = 0 self._iproxy_fail_count[device_id] = 0
# CHANGED: 若之前降级过,这里标记恢复并上报
need_report = False
with self._lock:
m = self._models.get(device_id)
if m:
prev_ready = getattr(m, "ready", True)
prev_broken = getattr(m, "streamBroken", False)
if (not prev_ready) or prev_broken:
m.ready = True
if prev_broken:
try:
delattr(m, "streamBroken")
except Exception:
setattr(m, "streamBroken", False)
need_report = True
if need_report and m:
try:
print(f"[iproxy-check] 自愈成功,恢复就绪 deviceId={device_id} port={port}")
self._manager_send(m)
except Exception as e:
print(f"[iproxy-check] 上报恢复异常 deviceId={device_id}: {e}")
# print(f"[iproxy-check] OK deviceId={device_id} port={port}") # print(f"[iproxy-check] OK deviceId={device_id} port={port}")
continue continue
@@ -318,6 +364,27 @@ class DeviceInfo:
if ok2: if ok2:
print(f"[iproxy-check] 自愈成功 deviceId={device_id} port={port}") print(f"[iproxy-check] 自愈成功 deviceId={device_id} port={port}")
self._iproxy_fail_count[device_id] = 0 self._iproxy_fail_count[device_id] = 0
# CHANGED: 若之前降级过,这里也顺便恢复并上报
need_report = False
with self._lock:
m = self._models.get(device_id)
if m:
prev_ready = getattr(m, "ready", True)
prev_broken = getattr(m, "streamBroken", False)
if (not prev_ready) or prev_broken:
m.ready = True
if prev_broken:
try:
delattr(m, "streamBroken")
except Exception:
setattr(m, "streamBroken", False)
need_report = True
if need_report and m:
try:
self._manager_send(m)
except Exception as e:
print(f"[iproxy-check] 上报恢复异常 deviceId={device_id}: {e}")
continue continue
# 自愈失败:累计失败计数 # 自愈失败:累计失败计数
@@ -325,15 +392,20 @@ class DeviceInfo:
self._iproxy_fail_count[device_id] = fails self._iproxy_fail_count[device_id] = fails
print(f"[iproxy-check] 自愈失败 ×{fails} deviceId={device_id} port={port}") print(f"[iproxy-check] 自愈失败 ×{fails} deviceId={device_id} port={port}")
# 达阈值才移除(避免误杀 # 达阈值 → 【不移除设备】,改为降级并上报(避免“删了又加”的抖动
if fails >= FAIL_THRESHOLD: if fails >= FAIL_THRESHOLD:
print(f"[iproxy-check] 连续失败 {fails} 次,移除设备 deviceId={device_id} port={port}") with self._lock:
m = self._models.get(device_id)
if m:
m.ready = False
setattr(m, "streamBroken", True)
try: try:
self._remove_device(device_id) if m:
print(
f"[iproxy-check] 连续失败 {fails} 次,降级设备(保留在线) deviceId={device_id} port={port}")
self._manager_send(m)
except Exception as e: except Exception as e:
print(f"[iproxy-check] _remove_device 异常 deviceId={device_id}: {e}") print(f"[iproxy-check] 上报降级异常 deviceId={device_id}: {e}")
finally:
self._iproxy_fail_count.pop(device_id, None)
except Exception as e: except Exception as e:
print(f"[iproxy-check] 单设备检查异常: {e}") print(f"[iproxy-check] 单设备检查异常: {e}")
@@ -691,19 +763,17 @@ class DeviceInfo:
print(f"[Proc] 结束进程异常: {e}") print(f"[Proc] 结束进程异常: {e}")
def _manager_send(self, model: DeviceModel): def _manager_send(self, model: DeviceModel):
"""
轻量自愈:首次 send 失败 → start() 一次并重试一次;不抛异常。
这样 34566 刚起时不丢“上车”事件,前端更快看到设备。
"""
try: try:
if self._manager.send(model.toDict()): if self._manager.send(model.toDict()):
print(f"[Manager] 已发送前端数据 {model.deviceId}") print(f"[Manager] 已发送前端数据 {model.deviceId}")
return return
except Exception as e: except Exception as e:
print(f"[Manager] 首次发送异常: {e}") print(f"[Manager] 首次发送异常: {e}")
# 自愈:拉起一次并重试一次
# 自愈:拉起一次并重试一次(不要用 and 连接)
try: try:
if self._manager.start() and self._manager.send(model.toDict()): self._manager.start() # 不关心返回值
if self._manager.send(model.toDict()):
print(f"[Manager] 重试发送成功 {model.deviceId}") print(f"[Manager] 重试发送成功 {model.deviceId}")
return return
except Exception as e: except Exception as e:

View File

@@ -188,7 +188,7 @@ def start_socket_listener():
backoff = min(backoff * 2, 8.0) backoff = min(backoff * 2, 8.0)
continue continue
s.listen() s.listen(256)
try: try:
s.settimeout(1.5) # accept 超时,便于检查自愈循环 s.settimeout(1.5) # accept 超时,便于检查自愈循环
except Exception: except Exception:

View File

@@ -34,8 +34,8 @@ class FlaskSubprocessManager:
self._monitor_thread: Optional[threading.Thread] = None self._monitor_thread: Optional[threading.Thread] = None
# 看门狗参数 # 看门狗参数
self._FAIL_THRESHOLD = int(os.getenv("FLASK_WD_FAIL_THRESHOLD", "3")) # 连续失败多少次重启 self._FAIL_THRESHOLD = int(os.getenv("FLASK_WD_FAIL_THRESHOLD", "5")) # 连续失败多少次重启
self._COOLDOWN_SEC = float(os.getenv("FLASK_WD_COOLDOWN", "8.0")) # 两次重启间隔 self._COOLDOWN_SEC = float(os.getenv("FLASK_WD_COOLDOWN", "10")) # 两次重启间隔
self._MAX_RESTARTS = int(os.getenv("FLASK_WD_MAX_RESTARTS", "5")) # 10分钟最多几次重启 self._MAX_RESTARTS = int(os.getenv("FLASK_WD_MAX_RESTARTS", "5")) # 10分钟最多几次重启
self._RESTART_WINDOW = 600 # 10分钟 self._RESTART_WINDOW = 600 # 10分钟
self._restart_times: List[float] = [] self._restart_times: List[float] = []
@@ -176,6 +176,15 @@ class FlaskSubprocessManager:
# ========= 停止 ========= # ========= 停止 =========
def stop(self): def stop(self):
with self._lock:
if not self.process: return
try:
if self.process.stdout:
self.process.stdout.flush()
time.sleep(0.1) # 让读取线程跟上
except Exception:
pass
with self._lock: with self._lock:
if not self.process: if not self.process:
return return
@@ -253,12 +262,17 @@ class FlaskSubprocessManager:
# ========= 辅助 ========= # ========= 辅助 =========
def _port_alive(self) -> bool: def _port_alive(self) -> bool:
def ping(p):
try: try:
with socket.create_connection(("127.0.0.1", self.comm_port), timeout=0.6): with socket.create_connection(("127.0.0.1", p), timeout=0.6):
return True return True
except Exception: except Exception:
return False return False
p1 = self.comm_port
p2 = self.comm_port + 1
return ping(p1) or ping(p2)
def _wait_port_open(self, timeout: float) -> bool: def _wait_port_open(self, timeout: float) -> bool:
start = time.time() start = time.time()
while time.time() - start < timeout: while time.time() - start < timeout: