优化flask获取设备列表的逻辑

This commit is contained in:
2025-11-14 16:58:55 +08:00
parent bc83fcd9d7
commit 96c62ced12
19 changed files with 99 additions and 18 deletions

View File

@@ -21,6 +21,7 @@ import psutil
import http.client import http.client
import tidevice import tidevice
import wda import wda
from flask import json
from tidevice import Usbmux, ConnectionType from tidevice import Usbmux, ConnectionType
from tidevice._device import BaseDevice from tidevice._device import BaseDevice
from Entity.DeviceModel import DeviceModel from Entity.DeviceModel import DeviceModel
@@ -350,7 +351,7 @@ class DeviceInfo:
if need_report and m: if need_report and m:
try: try:
print(f"[iproxy-check] 自愈成功,恢复就绪 deviceId={device_id} port={port}") print(f"[iproxy-check] 自愈成功,恢复就绪 deviceId={device_id} port={port}")
self._manager_send(m) self._manager_send()
except Exception as e: except Exception as e:
print(f"[iproxy-check] 上报恢复异常 deviceId={device_id}: {e}") print(f"[iproxy-check] 上报恢复异常 deviceId={device_id}: {e}")
@@ -384,7 +385,7 @@ class DeviceInfo:
need_report = True need_report = True
if need_report and m: if need_report and m:
try: try:
self._manager_send(m) self._manager_send()
except Exception as e: except Exception as e:
print(f"[iproxy-check] 上报恢复异常 deviceId={device_id}: {e}") print(f"[iproxy-check] 上报恢复异常 deviceId={device_id}: {e}")
continue continue
@@ -405,7 +406,7 @@ class DeviceInfo:
if m: if m:
print( print(
f"[iproxy-check] 连续失败 {fails} 次,降级设备(保留在线) deviceId={device_id} port={port}") f"[iproxy-check] 连续失败 {fails} 次,降级设备(保留在线) deviceId={device_id} port={port}")
self._manager_send(m) self._manager_send()
except Exception as e: except Exception as e:
print(f"[iproxy-check] 上报降级异常 deviceId={device_id}: {e}") print(f"[iproxy-check] 上报降级异常 deviceId={device_id}: {e}")
@@ -487,6 +488,29 @@ class DeviceInfo:
print(f"[WDA] /status@{local_port} 等待超时 {udid}") print(f"[WDA] /status@{local_port} 等待超时 {udid}")
return False return False
def _send_snapshot_to_flask(self):
"""把当前 _models 的全量快照发送给 Flask 进程"""
try:
# 1. 把 _models 里的设备转成可 JSON 的 dict 列表
with self._lock:
devices = [m.toDict() for m in self._models.values()]
payload = json.dumps({"devices": devices}, ensure_ascii=False)
# 2. 建立到 Flask 的本地 socket 连接并发送
port = int(os.getenv("FLASK_COMM_PORT", "34566"))
if port <= 0:
LogManager.warning("[SNAPSHOT] 无有效端口,跳过发送")
return
with socket.create_connection(("127.0.0.1", port), timeout=1.5) as s:
s.sendall(payload.encode("utf-8") + b"\n")
print(f"[SNAPSHOT] 已发送 {len(devices)} 台设备快照到 Flask")
LogManager.info(f"[SNAPSHOT] 已发送 {len(devices)} 台设备快照到 Flask")
except Exception as e:
# 不要让异常影响主循环,只打个日志
LogManager.warning(f"[SNAPSHOT] 发送快照失败: {e}")
def _device_online_via_tidevice(self, udid: str) -> bool: def _device_online_via_tidevice(self, udid: str) -> bool:
try: try:
from tidevice import Usbmux, ConnectionType from tidevice import Usbmux, ConnectionType
@@ -629,7 +653,7 @@ class DeviceInfo:
self._iproxy_fail_count[udid] = 0 self._iproxy_fail_count[udid] = 0
print(f"[Manager] 准备发送设备数据到前端 {udid}") print(f"[Manager] 准备发送设备数据到前端 {udid}")
self._manager_send(model) self._manager_send()
print(datetime.datetime.now().strftime("%H:%M:%S")) print(datetime.datetime.now().strftime("%H:%M:%S"))
print(f"[Add] 设备添加成功 {udid}, port={port}, {w}x{h}@{s}") print(f"[Add] 设备添加成功 {udid}, port={port}, {w}x{h}@{s}")
@@ -669,7 +693,7 @@ class DeviceInfo:
# 通知上层 # 通知上层
try: try:
self._manager_send(model) self._manager_send()
except Exception as e: except Exception as e:
print(f"[Remove] 通知上层异常 {udid}: {e}") print(f"[Remove] 通知上层异常 {udid}: {e}")
@@ -832,22 +856,44 @@ class DeviceInfo:
except Exception as e: except Exception as e:
print(f"[Proc] 结束进程异常: {e}") print(f"[Proc] 结束进程异常: {e}")
def _manager_send(self, model: DeviceModel): def _manager_send(self):
# try:
# if self._manager.send(model.toDict()):
# print(f"[Manager] 已发送前端数据 {model.deviceId}")
# return
# except Exception as e:
# print(f"[Manager] 首次发送异常: {e}")
#
# # 自愈:拉起一次并重试一次(不要用 and 连接)
# try:
# self._manager.start() # 不关心返回值
# if self._manager.send(model.toDict()):
# print(f"[Manager] 重试发送成功 {model.deviceId}")
# return
# except Exception as e:
# print(f"[Manager] 重试发送异常: {e}")
"""对外统一的“通知 Flask 有设备变动”的入口(无参数)。
作用:把当前所有设备的全量快照发给 Flask。
"""
# 第 1 次:直接发快照
try: try:
if self._manager.send(model.toDict()): self._send_snapshot_to_flask()
print(f"[Manager] 已发送前端数据 {model.deviceId}") return
return
except Exception as e: except Exception as e:
print(f"[Manager] 首次发送异常: {e}") print(f"[Manager] 首次发送快照异常: {e}")
# 自愈:拉起一次并重试一次(不要用 and 连接) # 自愈:尝试拉起 Flask 子进程
try: try:
self._manager.start() # 不关心返回值 self._manager.start()
if self._manager.send(model.toDict()):
print(f"[Manager] 重试发送成功 {model.deviceId}")
return
except Exception as e: except Exception as e:
print(f"[Manager] 重试发送异常: {e}") print(f"[Manager] 拉起 Flask 子进程异常: {e}")
# 第 2 次:再发快照
try:
self._send_snapshot_to_flask()
print(f"[Manager] 重试发送快照成功")
except Exception as e:
print(f"[Manager] 重试发送快照仍失败: {e}")
def _find_iproxy(self) -> str: def _find_iproxy(self) -> str:
env_path = os.getenv("IPROXY_PATH") env_path = os.getenv("IPROXY_PATH")

View File

@@ -6,7 +6,7 @@ import threading
import time import time
from pathlib import Path from pathlib import Path
from queue import Queue from queue import Queue
from typing import Any, Dict from typing import Any, Dict, List
from Entity import Variables from Entity import Variables
from Utils.AiUtils import AiUtils from Utils.AiUtils import AiUtils
from Utils.IOSAIStorage import IOSAIStorage from Utils.IOSAIStorage import IOSAIStorage
@@ -85,6 +85,29 @@ def _normalize_type(v) -> int:
def _is_online(d: Dict[str, Any]) -> bool: def _is_online(d: Dict[str, Any]) -> bool:
return _normalize_type(d.get("type", 1)) == 1 return _normalize_type(d.get("type", 1)) == 1
def _apply_device_snapshot(devices: List[Dict[str, Any]]):
"""接收 DeviceInfo 送来的全量设备列表,直接覆盖 listData"""
global listData
try:
normed = []
for d in devices:
# 拷贝一份,避免引用共享
d = dict(d)
d["type"] = _normalize_type(d.get("type", 1)) # 规范成 0/1
normed.append(d)
with listLock:
before = len(listData)
listData[:] = normed # 全量覆盖
_log_device_changes("SNAPSHOT")
try:
LogManager.info(f"[DEVICE][SNAPSHOT] size={len(normed)} (was={before})")
except Exception:
print(f"[DEVICE][SNAPSHOT] size={len(normed)} (was={before})")
except Exception as e:
LogManager.error(f"[DEVICE][SNAPSHOT][ERROR] {e}")
def _apply_device_event(obj: Dict[str, Any]): def _apply_device_event(obj: Dict[str, Any]):
"""把单条设备上线/下线事件落到 listData并打印关键日志""" """把单条设备上线/下线事件落到 listData并打印关键日志"""
try: try:
@@ -140,13 +163,25 @@ def _handle_conn(conn: socket.socket, addr):
LogManager.warning(f"[SOCKET][WARN] 非法 JSON 丢弃: {line[:120]} err={e}") LogManager.warning(f"[SOCKET][WARN] 非法 JSON 丢弃: {line[:120]} err={e}")
continue continue
# === 新增:如果是全量快照(包含 devices 字段) ===
if "devices" in obj:
devs = obj.get("devices") or []
LogManager.info(f"[SOCKET][RECV][SNAPSHOT] size={len(devs)} keys={list(obj.keys())}")
try:
_apply_device_snapshot(devs)
LogManager.info(f"[SOCKET][APPLY][SNAPSHOT] size={len(devs)}")
except Exception as e:
LogManager.error(f"[DEVICE][APPLY_SNAPSHOT][ERROR] {e}")
continue # 处理完这一条,继续下一条 JSON
# === 否则按原来的单条设备事件处理(兼容旧逻辑) ===
dev_id = obj.get("deviceId") dev_id = obj.get("deviceId")
typ = _normalize_type(obj.get("type", 1)) typ = _normalize_type(obj.get("type", 1))
obj["type"] = typ # 规范 1/0 obj["type"] = typ # 规范 1/0
LogManager.info(f"[SOCKET][RECV] deviceId={dev_id} type={typ} keys={list(obj.keys())}") LogManager.info(f"[SOCKET][RECV] deviceId={dev_id} type={typ} keys={list(obj.keys())}")
try: try:
_apply_device_event(obj) # 保持你的原设备增删逻辑 _apply_device_event(obj) # 保持你原来的增删逻辑
LogManager.info(f"[SOCKET][APPLY] deviceId={dev_id} type={typ}") LogManager.info(f"[SOCKET][APPLY] deviceId={dev_id} type={typ}")
except Exception as e: except Exception as e:
# 单条业务异常不让线程死 # 单条业务异常不让线程死