Compare commits
2 Commits
267d87e43d
...
01580e2fb1
| Author | SHA1 | Date | |
|---|---|---|---|
| 01580e2fb1 | |||
| 01e18bdc03 |
@@ -3,6 +3,8 @@
|
||||
极简稳定版设备监督器(DeviceInfo):加详细 print 日志
|
||||
- 每个关键节点都会 print,便于人工观察执行到哪一步
|
||||
- 保留核心逻辑:监听上下线 / 启动 WDA / 起 iproxy / 通知前端
|
||||
- 并发提速:_add_device 异步化(受控并发)
|
||||
- iproxy 守护:本地端口 + /status 探活,不通则自愈重启;连续失败达阈值才移除
|
||||
"""
|
||||
import os
|
||||
import time
|
||||
@@ -129,72 +131,214 @@ class DeviceInfo:
|
||||
self._last_seen: Dict[str, float] = {}
|
||||
self._manager = FlaskSubprocessManager.get_instance()
|
||||
self._iproxy_path = self._find_iproxy()
|
||||
|
||||
# iproxy 连续失败计数(守护用)
|
||||
self._iproxy_fail_count: Dict[str, int] = {}
|
||||
|
||||
LogManager.info("DeviceInfo 初始化完成", udid="system")
|
||||
print("[Init] DeviceInfo 初始化完成")
|
||||
|
||||
threading.Thread(target=self.check_iproxy_ports).start()
|
||||
# iproxy 守护线程(端口+HTTP探活 → 自愈重启 → 达阈值才移除)
|
||||
threading.Thread(target=self.check_iproxy_ports, daemon=True).start()
|
||||
|
||||
# =============== 核心:端口连通性检测(HTTP 方式) =================
|
||||
def _is_local_port_open(self, port: int, udid: str, timeout: float = 5) -> bool:
|
||||
"""
|
||||
使用 HTTP 方式检测:向 http://127.0.0.1:port/ 发送一次 HEAD 请求。
|
||||
只要建立连接并收到合法的 HTTP 响应(任意 1xx~5xx 状态码),即认为 HTTP 可达。
|
||||
遇到连接失败、超时、协议不对等异常,视为不可用。
|
||||
"""
|
||||
if not isinstance(port, int) or port <= 0 or port > 65535:
|
||||
LogManager.error("端口不可用(非法端口号)", udid=udid)
|
||||
return False
|
||||
self._initialized = True # 标记已初始化
|
||||
|
||||
# =============== 并发添加设备:最小改动(包装 _add_device) ===============
|
||||
def _ensure_add_executor(self):
|
||||
"""
|
||||
懒加载:首次调用 _add_device 时初始化线程池与去重集合。
|
||||
不改 __init__,避免对现有初始化节奏有影响。
|
||||
"""
|
||||
if not hasattr(self, "_add_lock"):
|
||||
self._add_lock = threading.RLock()
|
||||
if not hasattr(self, "_adding_udids"):
|
||||
self._adding_udids = set()
|
||||
if not hasattr(self, "_add_executor") or self._add_executor is None:
|
||||
import os
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
max_workers = max(2, min(6, (os.cpu_count() or 4) // 2))
|
||||
self._add_executor = ThreadPoolExecutor(
|
||||
max_workers=max_workers,
|
||||
thread_name_prefix="dev-add"
|
||||
)
|
||||
try:
|
||||
LogManager.info(f"[Init] Device add executor started, max_workers={max_workers}", udid="system")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _safe_add_device(self, udid: str):
|
||||
"""
|
||||
后台执行真正的新增实现(_add_device_impl):
|
||||
- 任何异常只记日志,不抛出
|
||||
- 无论成功与否,都在 finally 里清理“正在添加”标记
|
||||
"""
|
||||
try:
|
||||
# HEAD 更轻;若后端对 HEAD 不友好,可改为 "GET", "/"
|
||||
conn = http.client.HTTPConnection("127.0.0.1", int(port), timeout=timeout)
|
||||
conn.request("HEAD", "/")
|
||||
resp = conn.getresponse()
|
||||
status = resp.status
|
||||
# 读到响应即可关闭
|
||||
conn.close()
|
||||
# 任何合法 HTTP 状态码都说明“HTTP 服务在监听且可交互”,包括 404/401/403/5xx
|
||||
if 100 <= status <= 599:
|
||||
return True
|
||||
else:
|
||||
LogManager.error(f"HTTP状态码异常: {status}", udid=udid)
|
||||
return False
|
||||
|
||||
self._add_device_impl(udid) # ← 这是你原来的重逻辑(见下方)
|
||||
except Exception as e:
|
||||
# 连接被拒绝、超时、不是HTTP协议正确响应(比如返回了非HTTP的字节流)都会到这里
|
||||
LogManager.error(f"HTTP检测失败:{e}", udid=udid)
|
||||
try:
|
||||
LogManager.method_error(f"_add_device_impl 异常:{e}", "_safe_add_device", udid=udid)
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
with self._add_lock:
|
||||
self._adding_udids.discard(udid)
|
||||
|
||||
def _add_device(self, udid: str):
|
||||
"""
|
||||
并发包装器:保持所有调用点不变(listen 里仍然调用 _add_device)。
|
||||
- 懒加载线程池
|
||||
- 同一 udid 防重提交
|
||||
- 真实重逻辑放到 _add_device_impl(下方,已把你的原始实现迁过去)
|
||||
"""
|
||||
self._ensure_add_executor()
|
||||
with self._add_lock:
|
||||
if udid in self._adding_udids:
|
||||
return
|
||||
self._adding_udids.add(udid)
|
||||
try:
|
||||
self._add_executor.submit(self._safe_add_device, udid)
|
||||
except Exception as e:
|
||||
with self._add_lock:
|
||||
self._adding_udids.discard(udid)
|
||||
try:
|
||||
LogManager.method_error(f"提交新增任务失败:{e}", "_add_device", udid=udid)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# =============== iproxy 健康检查 / 自愈 ===============
|
||||
def _iproxy_tcp_probe(self, port: int, timeout: float = 0.6) -> bool:
|
||||
"""快速 TCP 探测:能建立连接即认为本地监听正常。"""
|
||||
try:
|
||||
with socket.create_connection(("127.0.0.1", int(port)), timeout=timeout):
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
# =============== 一轮检查:发现不通就移除 =================
|
||||
def _iproxy_http_status_ok_quick(self, port: int, timeout: float = 1.2) -> bool:
|
||||
"""
|
||||
轻量 HTTP 探测:GET /status
|
||||
- 成功返回 2xx/3xx 视为 OK
|
||||
- 4xx/5xx 也说明链路畅通(服务可交互),这里统一认为 OK(避免误判)
|
||||
"""
|
||||
try:
|
||||
conn = http.client.HTTPConnection("127.0.0.1", int(port), timeout=timeout)
|
||||
conn.request("GET", "/status")
|
||||
resp = conn.getresponse()
|
||||
_ = resp.read(128)
|
||||
code = getattr(resp, "status", 0)
|
||||
conn.close()
|
||||
# 任何能返回 HTTP 的,都说明“有服务可交互”
|
||||
return 100 <= code <= 599
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def _iproxy_health_ok(self, udid: str, port: int) -> bool:
|
||||
"""综合健康判断:先 TCP,后 HTTP /status。两者任一失败即为不健康。"""
|
||||
# 先看端口是否真在监听
|
||||
if not self._iproxy_tcp_probe(port, timeout=0.6):
|
||||
return False
|
||||
# 再看链路到后端是否通(WDA 会回应 /status)
|
||||
if not self._iproxy_http_status_ok_quick(port, timeout=1.2):
|
||||
return False
|
||||
return True
|
||||
|
||||
def _restart_iproxy(self, udid: str, port: int) -> bool:
|
||||
"""干净重启 iproxy:先杀旧的,再启动新的,并等待监听。"""
|
||||
print(f"[iproxy-guard] 准备重启 iproxy {udid} on {port}")
|
||||
proc = None
|
||||
with self._lock:
|
||||
old = self._iproxy.get(udid)
|
||||
try:
|
||||
if old:
|
||||
self._kill(old)
|
||||
except Exception as e:
|
||||
print(f"[iproxy-guard] 杀旧进程异常 {udid}: {e}")
|
||||
|
||||
# 重新拉起
|
||||
try:
|
||||
proc = self._start_iproxy(udid, local_port=port)
|
||||
except Exception as e:
|
||||
print(f"[iproxy-guard] 重启失败 {udid}: {e}")
|
||||
proc = None
|
||||
|
||||
if not proc:
|
||||
return False
|
||||
|
||||
# 写回进程表
|
||||
with self._lock:
|
||||
self._iproxy[udid] = proc
|
||||
|
||||
print(f"[iproxy-guard] 重启成功 {udid} port={port}")
|
||||
return True
|
||||
|
||||
# =============== 一轮检查:先自愈,仍失败才考虑移除 =================
|
||||
def check_iproxy_ports(self, connect_timeout: float = 3) -> None:
|
||||
"""
|
||||
周期性巡检(默认每 10s 一次):
|
||||
- 在线设备(type=1):
|
||||
1) 先做 TCP 探测(127.0.0.1:screenPort)
|
||||
2) 再做 HTTP /status 探测
|
||||
3) 任一失败 → 尝试自愈重启 iproxy;若仍失败,累计失败计数
|
||||
4) 连续失败次数 >= 3 才移除设备(避免短暂抖动)
|
||||
"""
|
||||
# 启动延迟,等新增流程跑起来,避免误判
|
||||
time.sleep(20)
|
||||
|
||||
FAIL_THRESHOLD = 3 # 连续失败阈值
|
||||
INTERVAL_SEC = 10 # 巡检间隔
|
||||
|
||||
while True:
|
||||
snapshot = list(self._models.items()) # [(deviceId, DeviceModel), ...]
|
||||
for device_id, model in snapshot:
|
||||
try:
|
||||
# 只处理在线且端口合法的设备
|
||||
if model.type != 1:
|
||||
# 离线设备清零计数
|
||||
self._iproxy_fail_count.pop(device_id, None)
|
||||
continue
|
||||
|
||||
port = int(model.screenPort)
|
||||
if port <= 0 or port > 65535:
|
||||
continue
|
||||
|
||||
ok = self._is_local_port_open(port, timeout=connect_timeout, udid=device_id)
|
||||
if not ok:
|
||||
print(f"[iproxy-check] 端口不可连,移除设备 deviceId={device_id} port={port}")
|
||||
# 健康探测
|
||||
ok = self._iproxy_health_ok(device_id, port)
|
||||
if ok:
|
||||
# 健康则清零失败计数
|
||||
if self._iproxy_fail_count.get(device_id):
|
||||
self._iproxy_fail_count[device_id] = 0
|
||||
# print(f"[iproxy-check] OK deviceId={device_id} port={port}")
|
||||
continue
|
||||
|
||||
# 第一次失败:尝试自愈重启
|
||||
print(f"[iproxy-check] 探活失败,准备自愈重启 deviceId={device_id} port={port}")
|
||||
healed = self._restart_iproxy(device_id, port)
|
||||
|
||||
# 重启后再探测一次
|
||||
ok2 = self._iproxy_health_ok(device_id, port) if healed else False
|
||||
if ok2:
|
||||
print(f"[iproxy-check] 自愈成功 deviceId={device_id} port={port}")
|
||||
self._iproxy_fail_count[device_id] = 0
|
||||
continue
|
||||
|
||||
# 自愈失败:累计失败计数
|
||||
fails = self._iproxy_fail_count.get(device_id, 0) + 1
|
||||
self._iproxy_fail_count[device_id] = fails
|
||||
print(f"[iproxy-check] 自愈失败 ×{fails} deviceId={device_id} port={port}")
|
||||
|
||||
# 达阈值才移除(避免误杀)
|
||||
if fails >= FAIL_THRESHOLD:
|
||||
print(f"[iproxy-check] 连续失败 {fails} 次,移除设备 deviceId={device_id} port={port}")
|
||||
try:
|
||||
self._remove_device(device_id) # 这里面可安全地改 self._models
|
||||
self._remove_device(device_id)
|
||||
except Exception as e:
|
||||
print(f"[iproxy-check] _remove_device 异常 deviceId={device_id}: {e}")
|
||||
else:
|
||||
# 心跳日志按需开启,避免刷屏
|
||||
# print(f"[iproxy-check] OK deviceId={device_id} port={port}")
|
||||
pass
|
||||
finally:
|
||||
self._iproxy_fail_count.pop(device_id, None)
|
||||
|
||||
except Exception as e:
|
||||
print(f"[iproxy-check] 单设备检查异常: {e}")
|
||||
# 8秒间隔
|
||||
time.sleep(10)
|
||||
|
||||
time.sleep(INTERVAL_SEC)
|
||||
|
||||
def listen(self):
|
||||
LogManager.method_info("进入主循环", "listen", udid="system")
|
||||
@@ -221,7 +365,7 @@ class DeviceInfo:
|
||||
if (now - self._first_seen.get(udid, now)) >= self.ADD_STABLE_SEC:
|
||||
print(f"[Add] 检测到新设备: {udid}")
|
||||
try:
|
||||
self._add_device(udid)
|
||||
self._add_device(udid) # ← 并发包装器
|
||||
except Exception as e:
|
||||
LogManager.method_error(f"新增失败:{e}", "listen", udid=udid)
|
||||
print(f"[Add] 新增失败 {udid}: {e}")
|
||||
@@ -265,65 +409,83 @@ class DeviceInfo:
|
||||
print(f"[WDA] /status@{local_port} 等待超时 {udid}")
|
||||
return False
|
||||
|
||||
|
||||
def _add_device(self, udid: str):
|
||||
# ---------------- 原 _add_device 实现:整体改名为 _add_device_impl ----------------
|
||||
def _add_device_impl(self, udid: str):
|
||||
print(f"[Add] 开始新增设备 {udid}")
|
||||
|
||||
if not self._trusted(udid):
|
||||
print(f"[Add] 未信任设备 {udid}, 跳过")
|
||||
return
|
||||
|
||||
try:
|
||||
dev = tidevice.Device(udid)
|
||||
major = int(dev.product_version.split(".")[0])
|
||||
except Exception:
|
||||
major = 0
|
||||
|
||||
if not self._wda_http_status_ok_once(udid):
|
||||
if major > 17:
|
||||
print("进入iOS17设备的分支")
|
||||
out = IOSActivator().activate(udid)
|
||||
print("wda启动完成")
|
||||
else:
|
||||
print(f"[WDA] iOS<=17 启动 WDA app_start (port={wdaScreenPort})")
|
||||
dev = tidevice.Device(udid)
|
||||
dev.app_start(WdaAppBundleId)
|
||||
time.sleep(2)
|
||||
if not self._wait_wda_ready_http(udid, self.WDA_READY_TIMEOUT):
|
||||
print(f"[WDA] WDA 未在超时内就绪, 放弃新增 {udid}")
|
||||
return
|
||||
|
||||
print(f"[WDA] WDA 就绪,准备获取屏幕信息 {udid}")
|
||||
# 给 WDA 一点稳定时间,避免刚 ready 就查询卡住
|
||||
time.sleep(0.5)
|
||||
# 带超时的屏幕信息获取,避免卡死在 USBClient 调用里
|
||||
w, h, s = self._screen_info_with_timeout(udid, timeout=3.5)
|
||||
if not (w and h and s):
|
||||
# 再做几次快速重试(带超时)
|
||||
for i in range(4):
|
||||
print(f"[Screen] 第{i + 1}次获取失败, 重试中... {udid}")
|
||||
time.sleep(0.6)
|
||||
w, h, s = self._screen_info_with_timeout(udid, timeout=3.5)
|
||||
if w and h and s:
|
||||
break
|
||||
|
||||
if not (w and h and s):
|
||||
print(f"[Screen] 屏幕信息仍为空,继续添加 {udid}")
|
||||
|
||||
# 先分配一个“正式使用”的本地端口,并立即起 iproxy(只起这一回)
|
||||
port = self._alloc_port()
|
||||
print(f"[iproxy] 准备启动 iproxy 映射 {port}->{wdaScreenPort}")
|
||||
print(f"[iproxy] 准备启动 iproxy 映射 {port}->{wdaScreenPort} (正式)")
|
||||
proc = self._start_iproxy(udid, local_port=port)
|
||||
if not proc:
|
||||
self._release_port(port)
|
||||
print(f"[iproxy] 启动失败,放弃新增 {udid}")
|
||||
return
|
||||
|
||||
# 判断 WDA 是否已就绪;如果未就绪,按原逻辑拉起 WDA 并等到就绪
|
||||
try:
|
||||
dev = tidevice.Device(udid)
|
||||
major = int(dev.product_version.split(".")[0])
|
||||
except Exception:
|
||||
major = 0
|
||||
|
||||
# 直接用“正式端口”探测 /status,避免再启一次临时 iproxy
|
||||
if not self._wait_wda_ready_on_port(udid, local_port=port, total_timeout_sec=3.0):
|
||||
# 如果还没起来,按你原逻辑拉起 WDA 再等
|
||||
if major > 17:
|
||||
print("进入iOS17设备的分支")
|
||||
try:
|
||||
IOSActivator().activate(udid)
|
||||
print("wda启动完成")
|
||||
except Exception as e:
|
||||
print(f"[WDA] iOS17 激活异常: {e}")
|
||||
else:
|
||||
print(f"[WDA] iOS<=17 启动 WDA app_start (port={wdaScreenPort})")
|
||||
try:
|
||||
dev = tidevice.Device(udid)
|
||||
dev.app_start(WdaAppBundleId)
|
||||
time.sleep(2)
|
||||
except Exception as e:
|
||||
print(f"[WDA] app_start 异常: {e}")
|
||||
|
||||
if not self._wait_wda_ready_on_port(udid, local_port=port, total_timeout_sec=self.WDA_READY_TIMEOUT):
|
||||
print(f"[WDA] WDA 未在超时内就绪, 放弃新增 {udid}")
|
||||
# 清理已起的正式 iproxy
|
||||
try:
|
||||
self._kill(proc)
|
||||
except Exception:
|
||||
pass
|
||||
self._release_port(port)
|
||||
return
|
||||
|
||||
print(f"[WDA] WDA 就绪,准备获取屏幕信息 {udid}")
|
||||
time.sleep(0.5)
|
||||
|
||||
# 带超时的屏幕信息获取(保留你原有容错/重试)
|
||||
w, h, s = self._screen_info_with_timeout(udid, timeout=3.5)
|
||||
if not (w and h and s):
|
||||
for i in range(4):
|
||||
print(f"[Screen] 第{i + 1}次获取失败, 重试中... {udid}")
|
||||
time.sleep(0.6)
|
||||
w, h, s = self._screen_info_with_timeout(udid, timeout=3.5)
|
||||
if w and h and s:
|
||||
break
|
||||
if not (w and h and s):
|
||||
print(f"[Screen] 屏幕信息仍为空,继续添加 {udid}")
|
||||
|
||||
# 写入模型 & 发送前端
|
||||
with self._lock:
|
||||
model = DeviceModel(deviceId=udid, screenPort=port, width=w, height=h, scale=s, type=1)
|
||||
model.ready = True
|
||||
self._models[udid] = model
|
||||
self._iproxy[udid] = proc
|
||||
self._port_by_udid[udid] = port
|
||||
if hasattr(self, "_iproxy_fail_count"):
|
||||
self._iproxy_fail_count[udid] = 0
|
||||
|
||||
print(f"[Manager] 准备发送设备数据到前端 {udid}")
|
||||
self._manager_send(model)
|
||||
@@ -343,6 +505,7 @@ class DeviceInfo:
|
||||
self._port_by_udid.pop(udid, None)
|
||||
self._first_seen.pop(udid, None)
|
||||
self._last_seen.pop(udid, None)
|
||||
self._iproxy_fail_count.pop(udid, None)
|
||||
|
||||
# --- 2. 锁外执行重操作 ---
|
||||
# 杀进程
|
||||
@@ -528,11 +691,23 @@ class DeviceInfo:
|
||||
print(f"[Proc] 结束进程异常: {e}")
|
||||
|
||||
def _manager_send(self, model: DeviceModel):
|
||||
"""
|
||||
轻量自愈:首次 send 失败 → start() 一次并重试一次;不抛异常。
|
||||
这样 34566 刚起时不丢“上车”事件,前端更快看到设备。
|
||||
"""
|
||||
try:
|
||||
self._manager.send(model.toDict())
|
||||
print(f"[Manager] 已发送前端数据 {model.deviceId}")
|
||||
if self._manager.send(model.toDict()):
|
||||
print(f"[Manager] 已发送前端数据 {model.deviceId}")
|
||||
return
|
||||
except Exception as e:
|
||||
print(f"[Manager] 发送异常: {e}")
|
||||
print(f"[Manager] 首次发送异常: {e}")
|
||||
# 自愈:拉起一次并重试一次
|
||||
try:
|
||||
if self._manager.start() and self._manager.send(model.toDict()):
|
||||
print(f"[Manager] 重试发送成功 {model.deviceId}")
|
||||
return
|
||||
except Exception as e:
|
||||
print(f"[Manager] 重试发送异常: {e}")
|
||||
|
||||
def _find_iproxy(self) -> str:
|
||||
env_path = os.getenv("IPROXY_PATH")
|
||||
@@ -545,4 +720,4 @@ class DeviceInfo:
|
||||
if path.is_file():
|
||||
print(f"[iproxy] 使用默认路径 {path}")
|
||||
return str(path)
|
||||
raise FileNotFoundError(f"iproxy 不存在: {path}")
|
||||
raise FileNotFoundError(f"iproxy 不存在: {path}")
|
||||
@@ -103,39 +103,59 @@ def _apply_device_event(obj: Dict[str, Any]):
|
||||
|
||||
# ============ 设备事件 socket 监听 ============
|
||||
def _handle_conn(conn: socket.socket, addr):
|
||||
"""统一的连接处理函数(外部全局,避免内嵌函数覆盖)"""
|
||||
"""统一的连接处理函数(拆 JSON 行 → 正常化 type → 应用到 listData)"""
|
||||
try:
|
||||
with conn:
|
||||
try:
|
||||
conn.settimeout(3.0) # 避免永久阻塞
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
buffer = ""
|
||||
while True:
|
||||
data = conn.recv(1024)
|
||||
if not data: # 对端关闭
|
||||
break
|
||||
buffer += data.decode('utf-8', errors='ignore')
|
||||
# 按行切 JSON;发送端每条以 '\n' 结尾
|
||||
while True:
|
||||
line, sep, buffer = buffer.partition('\n')
|
||||
if not sep:
|
||||
try:
|
||||
data = conn.recv(1024)
|
||||
if not data: # 对端关闭
|
||||
break
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
obj = json.loads(line)
|
||||
except json.JSONDecodeError as e:
|
||||
LogManager.warning(f"[SOCKET][WARN] 非法 JSON 丢弃: {line[:120]} err={e}")
|
||||
continue
|
||||
dev_id = obj.get("deviceId")
|
||||
typ = _normalize_type(obj.get("type", 1))
|
||||
obj["type"] = typ
|
||||
LogManager.info(f"[SOCKET][RECV] deviceId={dev_id} type={typ} keys={list(obj.keys())}")
|
||||
_apply_device_event(obj)
|
||||
LogManager.info(f"[SOCKET][APPLY] deviceId={dev_id} type={typ}")
|
||||
buffer += data.decode('utf-8', errors='ignore')
|
||||
|
||||
# 按行切 JSON;发送端每条以 '\n' 结尾
|
||||
while True:
|
||||
line, sep, buffer = buffer.partition('\n')
|
||||
if not sep:
|
||||
break
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
obj = json.loads(line)
|
||||
except json.JSONDecodeError as e:
|
||||
LogManager.warning(f"[SOCKET][WARN] 非法 JSON 丢弃: {line[:120]} err={e}")
|
||||
continue
|
||||
|
||||
dev_id = obj.get("deviceId")
|
||||
typ = _normalize_type(obj.get("type", 1))
|
||||
obj["type"] = typ # 规范 1/0
|
||||
LogManager.info(f"[SOCKET][RECV] deviceId={dev_id} type={typ} keys={list(obj.keys())}")
|
||||
|
||||
try:
|
||||
_apply_device_event(obj) # ← 保持你的原设备增删逻辑
|
||||
LogManager.info(f"[SOCKET][APPLY] deviceId={dev_id} type={typ}")
|
||||
except Exception as e:
|
||||
# 单条业务异常不让线程死
|
||||
LogManager.error(f"[DEVICE][APPLY_EVT][ERROR] {e}")
|
||||
|
||||
except (socket.timeout, ConnectionResetError, BrokenPipeError):
|
||||
# 连接级异常:关闭该连接,回到 accept
|
||||
break
|
||||
except Exception as e:
|
||||
LogManager.warning(f"[SOCKET][WARN] recv error: {e}")
|
||||
break
|
||||
except Exception as e:
|
||||
LogManager.error(f"[SOCKET][ERROR] 连接处理异常: {e}")
|
||||
|
||||
def start_socket_listener():
|
||||
"""启动设备事件监听(与 HTTP 端口无关,走 FLASK_COMM_PORT)"""
|
||||
"""启动设备事件监听(仅走 FLASK_COMM_PORT;增强健壮性,不改业务)"""
|
||||
# 统一使用 FLASK_COMM_PORT,默认 34566
|
||||
port = int(os.getenv('FLASK_COMM_PORT', 34566))
|
||||
LogManager.info(f"Received port from environment: {port}")
|
||||
@@ -146,29 +166,64 @@ def start_socket_listener():
|
||||
print("未获取到通信端口,跳过Socket监听")
|
||||
return
|
||||
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
|
||||
try:
|
||||
s.bind(('127.0.0.1', port))
|
||||
print(f"[INFO] Socket successfully bound to port {port}")
|
||||
LogManager.info(f"[INFO] Socket successfully bound to port {port}")
|
||||
except Exception as bind_error:
|
||||
print(f"[ERROR]端口绑定失败: {bind_error}")
|
||||
LogManager.info(f"[ERROR]端口绑定失败: {bind_error}")
|
||||
return
|
||||
|
||||
s.listen()
|
||||
LogManager.info(f"[INFO] Socket listener started on port {port}, waiting for connections...")
|
||||
print(f"[INFO] Socket listener started on port {port}, waiting for connections...")
|
||||
|
||||
backoff = 0.5 # 自愈退避,起于 0.5s,上限 8s
|
||||
while True:
|
||||
s = None
|
||||
try:
|
||||
conn, addr = s.accept()
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
try:
|
||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
except Exception as e:
|
||||
LogManager.warning(f"[SOCKET][WARN] setsockopt SO_REUSEADDR failed: {e}")
|
||||
|
||||
try:
|
||||
s.bind(('127.0.0.1', port))
|
||||
print(f"[INFO] Socket successfully bound to port {port}")
|
||||
LogManager.info(f"[INFO] Socket successfully bound to port {port}")
|
||||
except Exception as bind_error:
|
||||
print(f"[ERROR]端口绑定失败: {bind_error}")
|
||||
LogManager.info(f"[ERROR]端口绑定失败: {bind_error}")
|
||||
# 绑定失败通常是端口未释放/竞争,退避后重试
|
||||
time.sleep(backoff)
|
||||
backoff = min(backoff * 2, 8.0)
|
||||
continue
|
||||
|
||||
s.listen()
|
||||
try:
|
||||
s.settimeout(1.5) # accept 超时,便于检查自愈循环
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
LogManager.info(f"[INFO] Socket listener started on port {port}, waiting for connections...")
|
||||
print(f"[INFO] Socket listener started on port {port}, waiting for connections...")
|
||||
# 监听成功 → 退避复位
|
||||
backoff = 0.5
|
||||
|
||||
while True:
|
||||
try:
|
||||
conn, addr = s.accept()
|
||||
except socket.timeout:
|
||||
# 定期“透气”,避免永久卡死;继续等待
|
||||
continue
|
||||
except Exception as e:
|
||||
# 发生 accept 级错误:重建 socket(进入外层 while 自愈)
|
||||
LogManager.error(f"[ERROR] accept 失败: {e}")
|
||||
break
|
||||
|
||||
# 每个连接独立线程处理,保持你原来的做法
|
||||
threading.Thread(target=_handle_conn, args=(conn, addr), daemon=True).start()
|
||||
|
||||
except Exception as e:
|
||||
LogManager.error(f"[ERROR] accept 失败: {e}")
|
||||
continue
|
||||
threading.Thread(target=_handle_conn, args=(conn, addr), daemon=True).start()
|
||||
# 任意未兜住的异常,记录并进入退避自愈
|
||||
LogManager.error(f"[SOCKET][ERROR] 监听主循环异常: {e}")
|
||||
time.sleep(backoff)
|
||||
backoff = min(backoff * 2, 8.0)
|
||||
finally:
|
||||
try:
|
||||
if s:
|
||||
s.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 独立线程启动 Socket 服务 + 看门狗
|
||||
listener_thread = threading.Thread(target=start_socket_listener, daemon=True)
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
@@ -14,8 +15,10 @@ from Utils.LogManager import LogManager
|
||||
|
||||
|
||||
class FlaskSubprocessManager:
|
||||
"""Flask 子进程守护 + 看门狗 + 稳定增强"""
|
||||
|
||||
_instance: Optional['FlaskSubprocessManager'] = None
|
||||
_lock: threading.Lock = threading.Lock()
|
||||
_lock = threading.Lock()
|
||||
|
||||
def __new__(cls):
|
||||
with cls._lock:
|
||||
@@ -29,48 +32,75 @@ class FlaskSubprocessManager:
|
||||
self.comm_port = 34566
|
||||
self._stop_event = threading.Event()
|
||||
self._monitor_thread: Optional[threading.Thread] = None
|
||||
# 新增:启动前先把可能残留的 Flask 干掉
|
||||
|
||||
# 看门狗参数
|
||||
self._FAIL_THRESHOLD = int(os.getenv("FLASK_WD_FAIL_THRESHOLD", "3")) # 连续失败多少次重启
|
||||
self._COOLDOWN_SEC = float(os.getenv("FLASK_WD_COOLDOWN", "8.0")) # 两次重启间隔
|
||||
self._MAX_RESTARTS = int(os.getenv("FLASK_WD_MAX_RESTARTS", "5")) # 10分钟最多几次重启
|
||||
self._RESTART_WINDOW = 600 # 10分钟
|
||||
self._restart_times: List[float] = []
|
||||
self._fail_count = 0
|
||||
self._last_restart_time = 0.0
|
||||
|
||||
# Windows 隐藏子窗口启动参数
|
||||
self._si = None
|
||||
if os.name == "nt":
|
||||
si = subprocess.STARTUPINFO()
|
||||
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
|
||||
si.wShowWindow = 0
|
||||
self._si = si
|
||||
|
||||
self._kill_orphan_flask()
|
||||
atexit.register(self.stop)
|
||||
LogManager.info("FlaskSubprocessManager 单例已初始化", udid="system")
|
||||
self._log("info", "FlaskSubprocessManager 初始化完成")
|
||||
|
||||
# ========= 日志工具 =========
|
||||
def _log(self, level: str, msg: str, udid="system"):
|
||||
"""同时写 LogManager + 控制台"""
|
||||
try:
|
||||
if level == "info":
|
||||
LogManager.info(msg, udid=udid)
|
||||
elif level in ("warn", "warning"):
|
||||
LogManager.warning(msg, udid=udid)
|
||||
elif level == "error":
|
||||
LogManager.error(msg, udid=udid)
|
||||
else:
|
||||
LogManager.info(msg, udid=udid)
|
||||
except Exception:
|
||||
pass
|
||||
print(msg)
|
||||
|
||||
# ========= 杀残留 Flask =========
|
||||
def _kill_orphan_flask(self):
|
||||
"""根据端口 34566 把遗留进程全部杀掉"""
|
||||
try:
|
||||
if os.name == "nt":
|
||||
# Windows
|
||||
out = subprocess.check_output(
|
||||
["netstat", "-ano"],
|
||||
text=True, startupinfo=self._si
|
||||
)
|
||||
out = subprocess.check_output(["netstat", "-ano"], text=True, startupinfo=self._si)
|
||||
for line in out.splitlines():
|
||||
if f"127.0.0.1:{self.comm_port}" in line and "LISTENING" in line:
|
||||
pid = int(line.strip().split()[-1])
|
||||
if pid != os.getpid():
|
||||
subprocess.run(["taskkill", "/F", "/PID", str(pid)],
|
||||
startupinfo=self._si,
|
||||
capture_output=True)
|
||||
startupinfo=self._si, capture_output=True)
|
||||
self._log("warn", f"[FlaskMgr] 杀死残留进程 PID={pid}")
|
||||
else:
|
||||
# macOS / Linux
|
||||
out = subprocess.check_output(
|
||||
["lsof", "-t", f"-iTCP:{self.comm_port}", "-sTCP:LISTEN"],
|
||||
text=True
|
||||
)
|
||||
out = subprocess.check_output(["lsof", "-t", f"-iTCP:{self.comm_port}", "-sTCP:LISTEN"], text=True)
|
||||
for pid in map(int, out.split()):
|
||||
if pid != os.getpid():
|
||||
os.kill(pid, 9)
|
||||
self._log("warn", f"[FlaskMgr] 杀死残留进程 PID={pid}")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# ---------- 启动 ----------
|
||||
# ========= 启动 =========
|
||||
def start(self):
|
||||
with self._lock:
|
||||
if self._is_alive():
|
||||
LogManager.warning("子进程已在运行,无需重复启动", udid="system")
|
||||
self._log("warn", "[FlaskMgr] 子进程已在运行,无需重复启动")
|
||||
return
|
||||
|
||||
env = os.environ.copy()
|
||||
env["FLASK_COMM_PORT"] = str(self.comm_port)
|
||||
|
||||
exe_path = Path(sys.executable).resolve()
|
||||
if exe_path.name.lower() in ("python.exe", "pythonw.exe"):
|
||||
exe_path = Path(sys.argv[0]).resolve()
|
||||
@@ -80,13 +110,20 @@ class FlaskSubprocessManager:
|
||||
cmd = [str(exe_path), "--role=flask"]
|
||||
cwd = str(exe_path.parent)
|
||||
else:
|
||||
cmd = [sys.executable, "-u", "-m", "Module.Main", "--role=flask"]
|
||||
cwd = str(Path(__file__).resolve().parent)
|
||||
project_root = Path(__file__).resolve().parents[1]
|
||||
candidates = [
|
||||
project_root / "Module" / "Main.py",
|
||||
project_root / "Main.py",
|
||||
]
|
||||
main_path = next((p for p in candidates if p.is_file()), None)
|
||||
if main_path:
|
||||
cmd = [sys.executable, "-u", str(main_path), "--role=flask"]
|
||||
else:
|
||||
cmd = [sys.executable, "-u", "-m", "Module.Main", "--role=flask"]
|
||||
cwd = str(project_root)
|
||||
|
||||
LogManager.info(f"准备启动 Flask 子进程: {cmd} cwd={cwd}", udid="system")
|
||||
self._log("info", f"[FlaskMgr] 启动命令: {cmd}, cwd={cwd}")
|
||||
|
||||
# 关键:不再自己 open 文件,直接走 LogManager
|
||||
# 用 PIPE 捕获,再转存到 system 级日志
|
||||
self.process = subprocess.Popen(
|
||||
cmd,
|
||||
stdin=subprocess.DEVNULL,
|
||||
@@ -98,112 +135,140 @@ class FlaskSubprocessManager:
|
||||
bufsize=1,
|
||||
env=env,
|
||||
cwd=cwd,
|
||||
start_new_session=True
|
||||
start_new_session=True,
|
||||
startupinfo=self._si
|
||||
)
|
||||
|
||||
# 守护线程:把子进程 stdout → LogManager.info/system
|
||||
threading.Thread(target=self._flush_stdout, daemon=True).start()
|
||||
LogManager.info(f"Flask 子进程已启动,PID={self.process.pid},端口={self.comm_port}", udid="system")
|
||||
self._log("info", f"[FlaskMgr] Flask 子进程已启动,PID={self.process.pid}")
|
||||
|
||||
if not self._wait_port_open(timeout=10):
|
||||
LogManager.error("等待端口监听超时,启动失败", udid="system")
|
||||
self._log("error", "[FlaskMgr] 启动失败,端口未监听")
|
||||
self.stop()
|
||||
raise RuntimeError("Flask 启动后 10 s 内未监听端口")
|
||||
raise RuntimeError("Flask 启动后 10s 内未监听端口")
|
||||
|
||||
self._monitor_thread = threading.Thread(target=self._monitor, daemon=True)
|
||||
self._monitor_thread.start()
|
||||
LogManager.info("端口守护线程已启动", udid="system")
|
||||
if not self._monitor_thread or not self._monitor_thread.is_alive():
|
||||
self._monitor_thread = threading.Thread(target=self._monitor, daemon=True)
|
||||
self._monitor_thread.start()
|
||||
self._log("info", "[FlaskWD] 守护线程已启动")
|
||||
|
||||
# ---------- 实时把子进程 stdout 刷到 system 日志 ----------
|
||||
# ========= stdout捕获 =========
|
||||
def _flush_stdout(self):
|
||||
if not self.process or not self.process.stdout:
|
||||
return
|
||||
for line in iter(self.process.stdout.readline, ""):
|
||||
if line:
|
||||
LogManager.info(line.rstrip(), udid="system")
|
||||
# 同时输出到控制台
|
||||
print(line.rstrip()) # 打印到主进程的控制台
|
||||
self._log("info", line.rstrip())
|
||||
self.process.stdout.close()
|
||||
|
||||
# ---------- 发送 ----------
|
||||
# ========= 发送 =========
|
||||
def send(self, data: Union[str, Dict, List]) -> bool:
|
||||
if isinstance(data, (dict, list)):
|
||||
data = json.dumps(data, ensure_ascii=False)
|
||||
try:
|
||||
with socket.create_connection(("127.0.0.1", self.comm_port), timeout=3.0) as s:
|
||||
s.sendall((data + "\n").encode("utf-8"))
|
||||
LogManager.info(f"数据已成功发送到 Flask 端口:{self.comm_port}", udid="system")
|
||||
return True
|
||||
self._log("info", f"[FlaskMgr] 数据已发送到端口 {self.comm_port}")
|
||||
return True
|
||||
except Exception as e:
|
||||
LogManager.error(f"发送失败:{e}", udid="system")
|
||||
self._log("error", f"[FlaskMgr] 发送失败: {e}")
|
||||
return False
|
||||
|
||||
# ---------- 停止 ----------
|
||||
# ========= 停止 =========
|
||||
def stop(self):
|
||||
with self._lock:
|
||||
if not self.process:
|
||||
return
|
||||
pid = self.process.pid
|
||||
LogManager.info(f"正在停止 Flask 子进程 PID={pid}", udid="system")
|
||||
self._log("info", f"[FlaskMgr] 正在停止子进程 PID={pid}")
|
||||
try:
|
||||
# 1. 杀整棵树(Windows 也适用)
|
||||
parent = psutil.Process(pid)
|
||||
for child in parent.children(recursive=True):
|
||||
child.kill()
|
||||
try:
|
||||
child.kill()
|
||||
except Exception:
|
||||
pass
|
||||
parent.kill()
|
||||
gone, alive = psutil.wait_procs([parent] + parent.children(), timeout=3)
|
||||
for p in alive:
|
||||
p.kill() # 保险再补一刀
|
||||
self.process.wait()
|
||||
parent.wait(timeout=3)
|
||||
except psutil.NoSuchProcess:
|
||||
pass
|
||||
except Exception as e:
|
||||
LogManager.error(f"停止子进程异常:{e}", udid="system")
|
||||
self._log("error", f"[FlaskMgr] 停止子进程异常: {e}")
|
||||
finally:
|
||||
self.process = None
|
||||
self._stop_event.set()
|
||||
|
||||
# ---------- 端口守护 ----------
|
||||
# ========= 看门狗 =========
|
||||
def _monitor(self):
|
||||
LogManager.info("守护线程开始运行,周期性检查端口存活", udid="system")
|
||||
while not self._stop_event.wait(1.0):
|
||||
if not self._port_alive():
|
||||
LogManager.error("检测到端口不通,准备重启 Flask", udid="system")
|
||||
self._log("info", "[FlaskWD] 看门狗线程启动")
|
||||
verbose = os.getenv("FLASK_WD_VERBOSE", "0") == "1"
|
||||
last_ok = 0.0
|
||||
|
||||
while not self._stop_event.wait(2.0):
|
||||
alive = self._port_alive()
|
||||
if alive:
|
||||
self._fail_count = 0
|
||||
if verbose and (time.time() - last_ok) >= 60:
|
||||
self._log("info", f"[FlaskWD] OK {self.comm_port} alive")
|
||||
last_ok = time.time()
|
||||
continue
|
||||
|
||||
self._fail_count += 1
|
||||
self._log("warn", f"[FlaskWD] 探测失败 {self._fail_count}/{self._FAIL_THRESHOLD}")
|
||||
|
||||
if self._fail_count >= self._FAIL_THRESHOLD:
|
||||
now = time.time()
|
||||
if now - self._last_restart_time < self._COOLDOWN_SEC:
|
||||
self._log("warn", "[FlaskWD] 冷却中,跳过重启")
|
||||
continue
|
||||
|
||||
# 限速:10分钟内超过MAX_RESTARTS则不再重启
|
||||
self._restart_times = [t for t in self._restart_times if now - t < self._RESTART_WINDOW]
|
||||
if len(self._restart_times) >= self._MAX_RESTARTS:
|
||||
self._log("error", f"[FlaskWD] 10分钟内重启次数过多({len(self._restart_times)}次),暂停看门狗")
|
||||
break
|
||||
|
||||
self._restart_times.append(now)
|
||||
self._log("warn", "[FlaskWD] 端口不通,准备重启 Flask")
|
||||
|
||||
with self._lock:
|
||||
if self.process and self.process.poll() is None:
|
||||
try:
|
||||
self.stop()
|
||||
try:
|
||||
self.start()
|
||||
from Module.DeviceInfo import DeviceInfo
|
||||
# 重新发送设备相关数据到flask
|
||||
info = DeviceInfo()
|
||||
for model in info._models.keys():
|
||||
self.send(model)
|
||||
except Exception as e:
|
||||
LogManager.error(f"自动重启失败:{e}", udid="system")
|
||||
time.sleep(2)
|
||||
|
||||
# ---------- 辅助 ----------
|
||||
def _is_port_busy(self, port: int) -> bool:
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
s.settimeout(0.2)
|
||||
return s.connect_ex(("127.0.0.1", port)) == 0
|
||||
time.sleep(1)
|
||||
self.start()
|
||||
self._fail_count = 0
|
||||
self._last_restart_time = now
|
||||
self._log("info", "[FlaskWD] Flask 已成功重启")
|
||||
from Module.DeviceInfo import DeviceInfo
|
||||
info = DeviceInfo()
|
||||
with info._lock:
|
||||
for m in info._models.values():
|
||||
try:
|
||||
self.send(m.toDict())
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
self._log("error", f"[FlaskWD] 自动重启失败: {e}")
|
||||
time.sleep(3)
|
||||
|
||||
# ========= 辅助 =========
|
||||
def _port_alive(self) -> bool:
|
||||
try:
|
||||
with socket.create_connection(("127.0.0.1", self.comm_port), timeout=0.5):
|
||||
with socket.create_connection(("127.0.0.1", self.comm_port), timeout=0.6):
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def _wait_port_open(self, timeout: float) -> bool:
|
||||
t0 = time.time()
|
||||
while time.time() - t0 < timeout:
|
||||
start = time.time()
|
||||
while time.time() - start < timeout:
|
||||
if self._port_alive():
|
||||
return True
|
||||
time.sleep(0.2)
|
||||
return False
|
||||
|
||||
def _is_alive(self) -> bool:
|
||||
return self.process is not None and self.process.poll() is None and self._port_alive()
|
||||
return self.process and self.process.poll() is None and self._port_alive()
|
||||
|
||||
@classmethod
|
||||
def get_instance(cls) -> 'FlaskSubprocessManager':
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Reference in New Issue
Block a user