Merge remote-tracking branch 'origin/main'

# Conflicts:
#	.idea/workspace.xml
#	Module/DeviceInfo.py
#	build.bat
#	resources/FlashLink.exe
This commit is contained in:
2025-09-17 22:26:04 +08:00
11 changed files with 307 additions and 232 deletions

View File

@@ -3,24 +3,38 @@ import os
import signal
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import wda
import threading
import subprocess
from pathlib import Path
from typing import List, Dict, Optional
from tidevice import Usbmux, ConnectionType
from tidevice._device import BaseDevice
from Entity.DeviceModel import DeviceModel
from Entity.Variables import WdaAppBundleId
from Module.FlaskSubprocessManager import FlaskSubprocessManager
from Utils.LogManager import LogManager
from Utils.SubprocessKit import check_output as sp_check_output, popen as sp_popen
class Deviceinfo(object):
"""设备生命周期管理:以 deviceModelList 为唯一真理源"""
def __init__(self):
...
# ✅ 新增:连接线程池(最大 6 并发)
self._connect_pool = ThreadPoolExecutor(max_workers=6)
...
if os.name == "nt":
self._si = subprocess.STARTUPINFO()
self._si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
self._si.wShowWindow = subprocess.SW_HIDE # 0
else:
self._si = None
self.deviceIndex = 0
self.screenProxy = 9110
self.pidList: List[Dict] = [] # 仅记录 iproxy 进程
@@ -30,14 +44,12 @@ class Deviceinfo(object):
self._lock = threading.Lock()
self._model_index: Dict[str, DeviceModel] = {} # udid -> model
self._miss_count: Dict[str, int] = {} # udid -> 连续未扫描到次数
self._port_pool: List[int] = [] # 端口回收池
self._port_in_use: set[int] = set() # 正在使用的端口
# ✅ 1. 失踪时间戳记录(替代原来的 miss_count
self._last_seen: Dict[str, float] = {}
self._port_pool: List[int] = []
self._port_in_use: set[int] = set()
# 🔥1. 启动 WDA 健康检查线程
# threading.Thread(target=self._wda_health_checker, daemon=True).start()
# region iproxy 初始化
# region iproxy 初始化(原逻辑不变)
try:
self.iproxy_path = self._iproxy_path()
self.iproxy_dir = self.iproxy_path.parent
@@ -48,13 +60,14 @@ class Deviceinfo(object):
pass
self._creationflags = 0x08000000 if os.name == "nt" else 0
self._popen_kwargs = dict(
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=str(self.iproxy_dir),
shell=False,
text=True,
creationflags=self._creationflags,
creationflags=0x08000000 if os.name == "nt" else 0, # CREATE_NO_WINDOW
encoding="utf-8",
bufsize=1,
)
@@ -85,7 +98,11 @@ class Deviceinfo(object):
LogManager.error(f"初始化 iproxy 失败:{e}")
# endregion
# ------------------------------------------------------------------
# 主监听循环 → 只负责“发现”和“提交任务”
# ------------------------------------------------------------------
def startDeviceListener(self):
MISS_WINDOW = 5.0
while True:
try:
lists = Usbmux().device_list()
@@ -95,50 +112,55 @@ class Deviceinfo(object):
continue
now_udids = {d.udid for d in lists if d.conn_type == ConnectionType.USB}
usb_sn_set = self._usb_enumerate_sn()
# 1. 失踪登记 & 累加
need_remove = None # ← 新增:放锁外记录
# 1. 失踪判定(同旧逻辑)
need_remove = []
with self._lock:
for udid in list(self._model_index.keys()):
if udid not in now_udids:
self._miss_count[udid] = self._miss_count.get(udid, 0) + 1
if self._miss_count[udid] >= 3:
self._miss_count.pop(udid, None)
need_remove = udid # ← 只记录,不调用
last = self._last_seen.get(udid, time.time())
if time.time() - last > MISS_WINDOW and udid not in usb_sn_set:
need_remove.append(udid)
else:
self._miss_count.pop(udid, None)
self._last_seen[udid] = time.time()
for udid in need_remove:
self._remove_model(udid)
# 🔓 锁已释放,再删设备(不会重入)
if need_remove:
self._remove_model(need_remove)
# 2. 全新插入(只处理未在线且信任且未满)
for d in lists:
if d.conn_type != ConnectionType.USB:
continue
udid = d.udid
with self._lock:
if udid in self._model_index:
continue
if not self.is_device_trusted(udid):
continue
if len(self.deviceModelList) >= self.maxDeviceCount:
continue
try:
self.connectDevice(udid)
except Exception as e:
LogManager.error(f"连接设备失败 {udid}: {e}", udid)
# 2. 发现新设备 → 并发连接
with self._lock:
new_udids = [d.udid for d in lists
if d.conn_type == ConnectionType.USB and
d.udid not in self._model_index and
len(self.deviceModelList) < self.maxDeviceCount]
if new_udids:
futures = {self._connect_pool.submit(self._connect_device_task, udid): udid
for udid in new_udids}
for f in as_completed(futures, timeout=10):
udid = futures[f]
try:
f.result(timeout=8) # 单台 8 s 硬截止
except Exception as e:
LogManager.error(f"连接任务超时/失败: {e}", udid)
time.sleep(1)
# 🔥2. WDA 健康检查
# ------------------------------------------------------------------
# ✅ 3. USB 层枚举 SN跨平台
# ------------------------------------------------------------------
def _usb_enumerate_sn(self) -> set[str]:
try:
out = sp_check_output(["idevice_id", "-l"], text=True, timeout=3)
return {line.strip() for line in out.splitlines() if line.strip()}
except Exception:
return set()
# ===================== 以下代码与原文件完全一致 =====================
def _wda_health_checker(self):
while True:
time.sleep(1)
print(len(self.deviceModelList))
with self._lock:
online = [m for m in self.deviceModelList if m.ready] # ← 只检查就绪的
print(len(online))
online = [m for m in self.deviceModelList if m.ready]
for model in online:
udid = model.deviceId
if not self._wda_ok(udid):
@@ -147,32 +169,24 @@ class Deviceinfo(object):
self._remove_model(udid)
self.connectDevice(udid)
# 🔥3. 真正做 health-check 的地方
def _wda_ok(self, udid: str) -> bool:
"""返回 True 表示 WDA 活着False 表示已死"""
try:
# 用 2 秒超时快速探测
c = wda.USBClient(udid, 8100)
# 下面这句就是“xctest launched but check failed” 的触发点
# 如果 status 里返回了 WebDriverAgent 运行信息就认为 OK
st = c.status()
if st.get("state") != "success":
return False
# 你也可以再苛刻一点,多做一次 /wda/healthcheck
# c.http.get("/wda/healthcheck")
return True
except Exception as e:
# 任何异常连接拒绝、超时、json 解析失败)都认为已死
LogManager.error(f"WDA health-check 异常:{e}", udid)
return False
# region ===================== 增删改查唯一入口(线程安全) =====================
# -------------------- 增删改查唯一入口(未改动) --------------------
def _has_model(self, udid: str) -> bool:
return udid in self._model_index
def _add_model(self, model: DeviceModel):
if model.deviceId in self._model_index:
return # 防重复
return
model.ready = True
self.deviceModelList.append(model)
self._model_index[model.deviceId] = model
@@ -180,60 +194,48 @@ class Deviceinfo(object):
self.manager.send(model.toDict())
except Exception as e:
LogManager.warning(f"{model.deviceId} 发送上线事件失败:{e}")
LogManager.method_info(f"{model.deviceId} 加入设备成功,当前在线数:{len(self.deviceModelList)}",method="device_count")
LogManager.method_info(f"{model.deviceId} 加入设备成功,当前在线数:{len(self.deviceModelList)}", method="device_count")
# 删除设备
def _remove_model(self, udid: str):
print(f"【删】进入删除方法 udid={udid}")
LogManager.method_info(f"【删】进入删除方法 udid={udid}", method="device_count")
# 1. 纯内存临界区——毫秒级
with self._lock:
print(f"【删】拿到锁 udid={udid}")
LogManager.method_info(f"【删】拿到锁 udid={udid}",
method="device_count")
LogManager.method_info(f"【删】拿到锁 udid={udid}", method="device_count")
model = self._model_index.pop(udid, None)
if not model:
print(f"【删】模型已空,直接返回 udid={udid}")
LogManager.method_info(f"【删】模型已空,直接返回 udid={udid}",method="device_count")
LogManager.method_info(f"【删】模型已空,直接返回 udid={udid}", method="device_count")
return
if model.deleting:
print(f"【删】正在删除中,幂等返回 udid={udid}")
LogManager.method_info(method="device_count", text=f"【删】正在删除中,幂等返回 udid={udid}")
return
model.deleting = True
# 标记维删除设备
model.type = 2
print(f"【删】标记 deleting=True udid={udid}")
LogManager.method_info("【删】标记 deleting=True udid={udid}","device_count")
# 过滤列表
LogManager.method_info("【删】标记 deleting=True udid={udid}", "device_count")
before = len(self.deviceModelList)
self.deviceModelList = [m for m in self.deviceModelList if m.deviceId != udid]
after = len(self.deviceModelList)
print(f"【删】列表过滤 before={before} → after={after} udid={udid}")
LogManager.method_info(f"【删】列表过滤 before={before} → after={after} udid={udid}","device_count")
# 端口
LogManager.method_info(f"【删】列表过滤 before={before} → after={after} udid={udid}", "device_count")
self._port_in_use.discard(model.screenPort)
self._port_pool.append(model.screenPort)
print(f"【删】回收端口 port={model.screenPort} udid={udid}")
LogManager.method_info(f"【删】回收端口 port={model.screenPort} udid={udid}", method="device_count")
# 进程
to_kill = [item for item in self.pidList if item.get("id") == udid]
self.pidList = [item for item in self.pidList if item.get("id") != udid]
print(f"【删】待杀进程数 count={len(to_kill)} udid={udid}")
LogManager.method_info(f"【删】待杀进程数 count={len(to_kill)} udid={udid}", method="device_count")
# 2. IO 区无锁
for idx, item in enumerate(to_kill, 1):
print(f"【删】杀进程 {idx}/{len(to_kill)} pid={item.get('target').pid} udid={udid}")
LogManager.method_error(f"【删】杀进程 {idx}/{len(to_kill)} pid={item.get('target').pid} udid={udid}", method="device_count")
LogManager.method_info(f"【删】杀进程 {idx}/{len(to_kill)} pid={item.get('target').pid} udid={udid}", method="device_count")
self._terminate_proc(item.get("target"))
print(f"【删】进程清理完成 udid={udid}")
LogManager.method_info(f"【删】进程清理完成 udid={udid}", method="device_count")
# 3. 网络 IO
retry = 3
while retry:
try:
@@ -255,7 +257,7 @@ class Deviceinfo(object):
print(len(self.deviceModelList))
LogManager.method_info(f"当前剩余设备数量:{len(self.deviceModelList)}", method="device_count")
# region ===================== 端口分配与回收 =====================
# -------------------- 端口分配与回收(未改动) --------------------
def _alloc_port(self) -> int:
if self._port_pool:
port = self._port_pool.pop()
@@ -269,14 +271,14 @@ class Deviceinfo(object):
if port in self._port_in_use:
self._port_in_use.remove(port)
self._port_pool.append(port)
# endregion
# region ===================== 单台设备连接 =====================
def connectDevice(self, udid: str):
# ------------------------------------------------------------------
# 线程池里真正干活的地方(原 connectDevice 逻辑搬过来)
# ------------------------------------------------------------------
def _connect_device_task(self, udid: str):
if not self.is_device_trusted(udid):
LogManager.warning("设备未信任,跳过 WDA 启动", udid)
return
try:
d = wda.USBClient(udid, 8100)
except Exception as e:
@@ -293,23 +295,34 @@ class Deviceinfo(object):
port = self._alloc_port()
model = DeviceModel(udid, port, width, height, scale, type=1)
self._add_model(model)
# 先做完所有 IO再抢锁写内存
try:
d.app_start(WdaAppBundleId)
d.home()
except Exception as e:
LogManager.warning(f"启动/切回桌面失败:{e}", udid)
time.sleep(2)
time.sleep(2) # 原逻辑保留
# 先清旧进程再启动新进程
self.pidList = [item for item in self.pidList if item.get("id") != udid]
target = self.relayDeviceScreenPort(udid, port)
if target:
self.pidList.append({"target": target, "id": udid})
# region ===================== 工具方法 =====================
# 毫秒级临界区
with self._lock:
if udid in self._model_index: # 并发防重
return
self._add_model(model)
if target:
self.pidList.append({"target": target, "id": udid})
# ------------------------------------------------------------------
# 原函数保留(改名即可)
# ------------------------------------------------------------------
def connectDevice(self, udid: str):
"""对外保留接口,实际走线程池"""
self._connect_pool.submit(self._connect_device_task, udid)
# -------------------- 工具方法(未改动) --------------------
def is_device_trusted(self, udid: str) -> bool:
try:
d = BaseDevice(udid)
@@ -319,22 +332,16 @@ class Deviceinfo(object):
return False
def relayDeviceScreenPort(self, udid: str, port: int) -> Optional[subprocess.Popen]:
"""启动 iproxy 前:端口若仍被占用则先杀掉占用者,再启动"""
if not self._spawn_iproxy:
LogManager.error("iproxy 启动器未就绪", udid)
return None
# --- 新增:端口冲突检查 + 强制清理 ---
while self._port_in_use and self._is_port_open(port):
# 先查是哪个进程占用
pid = self._get_pid_by_port(port)
if pid and pid != os.getpid():
LogManager.warning(f"端口 {port} 仍被 PID {pid} 占用,尝试释放", udid)
self._kill_pid_gracefully(pid)
else:
break
# -------------------------------------
try:
p = self._spawn_iproxy(udid, port, 9100)
self._port_in_use.add(port)
@@ -344,30 +351,25 @@ class Deviceinfo(object):
LogManager.error(f"启动 iproxy 失败:{e}", udid)
return None
# ------------------- 新增三个小工具 -------------------
def _is_port_open(self, port: int) -> bool:
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(("127.0.0.1", port)) == 0
def _get_pid_by_port(self, port: int) -> Optional[int]:
"""跨平台根据端口号查 PID失败返回 None"""
try:
if os.name == "nt":
cmd = ["netstat", "-ano", "-p", "tcp"]
out = subprocess.check_output(cmd, text=True)
out = sp_check_output(["netstat", "-ano", "-p", "tcp"], text=True)
for line in out.splitlines():
if f"127.0.0.1:{port}" in line and "LISTENING" in line:
return int(line.strip().split()[-1])
else:
cmd = ["lsof", "-t", f"-iTCP:{port}", "-sTCP:LISTEN"]
out = subprocess.check_output(cmd, text=True)
out = sp_check_output(["lsof", "-t", f"-iTCP:{port}", "-sTCP:LISTEN"], text=True)
return int(out.strip().split()[0])
except Exception:
return None
def _kill_pid_gracefully(self, pid: int):
"""先 terminate 再 kill -9"""
try:
os.kill(pid, signal.SIGTERM)
time.sleep(1)
@@ -375,7 +377,6 @@ class Deviceinfo(object):
except Exception:
pass
def _terminate_proc(self, p: Optional[subprocess.Popen]):
if not p or p.poll() is not None:
return
@@ -404,4 +405,4 @@ class Deviceinfo(object):
for p in candidates:
if p.exists():
return p
raise FileNotFoundError(f"iproxy not found, tried: {[str(c) for c in candidates]}")
raise FileNotFoundError(f"iproxy not found, tried: {[str(c) for c in candidates]}")

View File

@@ -266,8 +266,9 @@ def watchLiveForGrowth():
def stopScript():
body = request.get_json()
udid = body.get("udid")
code, massage = ThreadManager.stop(udid)
return ResultData(code=code, data="", massage=massage).toJson()
LogManager.method_info(f"接口收到 /stopScript udid={udid}", method="task")
code, msg = ThreadManager.stop(udid)
return ResultData(code=code, data="", massage=msg).toJson()
# 关注打招呼

View File

@@ -9,6 +9,8 @@ import time
from pathlib import Path
from typing import Optional, Union, Dict, List
import psutil
from Utils.LogManager import LogManager
@@ -28,9 +30,39 @@ class FlaskSubprocessManager:
self.comm_port = 34566
self._stop_event = threading.Event()
self._monitor_thread: Optional[threading.Thread] = None
# 新增:启动前先把可能残留的 Flask 干掉
self._kill_orphan_flask()
atexit.register(self.stop)
LogManager.info("FlaskSubprocessManager 单例已初始化", udid="system")
def _kill_orphan_flask(self):
"""根据端口 34566 把遗留进程全部杀掉"""
try:
if os.name == "nt":
# Windows
out = subprocess.check_output(
["netstat", "-ano"],
text=True, startupinfo=self._si
)
for line in out.splitlines():
if f"127.0.0.1:{self.comm_port}" in line and "LISTENING" in line:
pid = int(line.strip().split()[-1])
if pid != os.getpid():
subprocess.run(["taskkill", "/F", "/PID", str(pid)],
startupinfo=self._si,
capture_output=True)
else:
# macOS / Linux
out = subprocess.check_output(
["lsof", "-t", f"-iTCP:{self.comm_port}", "-sTCP:LISTEN"],
text=True
)
for pid in map(int, out.split()):
if pid != os.getpid():
os.kill(pid, 9)
except Exception:
pass
# ---------- 启动 ----------
def start(self):
with self._lock:
@@ -108,27 +140,24 @@ class FlaskSubprocessManager:
# ---------- 停止 ----------
def stop(self):
with self._lock:
if getattr(self, 'process', None) is None:
LogManager.info("无子进程需要停止", udid="system")
if not self.process:
return
pid = self.process.pid
LogManager.info(f"正在停止 Flask 子进程 PID={pid}", udid="system")
try:
self.process.terminate()
try:
self.process.wait(timeout=3)
except subprocess.TimeoutExpired:
LogManager.warning("软杀超时,强制杀进程树", udid="system")
import psutil
parent = psutil.Process(pid)
for child in parent.children(recursive=True):
child.kill()
parent.kill()
self.process.wait()
LogManager.info("Flask 子进程已停止", udid="system")
# 1. 杀整棵树Windows 也适用)
parent = psutil.Process(pid)
for child in parent.children(recursive=True):
child.kill()
parent.kill()
gone, alive = psutil.wait_procs([parent] + parent.children(), timeout=3)
for p in alive:
p.kill() # 保险再补一刀
self.process.wait()
except psutil.NoSuchProcess:
pass
except Exception as e:
LogManager.error(f"停止子进程异常:{e}", udid="system")
LogManager.error(f"停止子进程异常:{e}", udid="system")
finally:
self.process = None
self._stop_event.set()