调整deviceinfo内部逻辑

This commit is contained in:
2025-09-22 14:36:05 +08:00
parent 8f290cf610
commit 81e3462f15
12 changed files with 174 additions and 512 deletions

View File

@@ -1,467 +1,177 @@
# -*- coding: utf-8 -*-
import os
import signal
import sys
import subprocess
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import List, Dict, Optional
import threading
import subprocess
from typing import Dict, Optional, List
import wda
from tidevice import Usbmux, ConnectionType
from tidevice._device import BaseDevice
from Entity.DeviceModel import DeviceModel
from Entity.Variables import WdaAppBundleId
from Module.FlaskSubprocessManager import FlaskSubprocessManager
from Utils.LogManager import LogManager
from Utils.SubprocessKit import check_output as sp_check_output, popen as sp_popen
class Deviceinfo(object):
"""设备生命周期管理:以 deviceModelList 为唯一真理源"""
class DeviceInfo:
def __init__(self):
# ✅ 连接线程池(最大 6 并发)
self._connect_pool = ThreadPoolExecutor(max_workers=6)
self._port = 9110
self._models: Dict[str, DeviceModel] = {} # udid -> model
self._procs: Dict[str, subprocess.Popen] = {} # udid -> iproxy proc
self._manager = FlaskSubprocessManager.get_instance()
self._iproxy_path = self._find_iproxy()
self._pool = ThreadPoolExecutor(max_workers=6)
if os.name == "nt":
self._si = subprocess.STARTUPINFO()
self._si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
self._si.wShowWindow = subprocess.SW_HIDE # 0
else:
self._si = None
# 可选10 秒一次扫野进程
threading.Thread(target=self._janitor, daemon=True).start()
self.deviceIndex = 0
self.screenProxy = 9110
self.pidList: List[Dict] = [] # 仅记录 iproxy 进程
self.manager = FlaskSubprocessManager.get_instance()
self.deviceModelList: List[DeviceModel] = [] # 根基,不动
self.maxDeviceCount = 6
# ---------------- 主循环 ----------------
def listen(self):
while True:
online = {d.udid for d in Usbmux().device_list() if d.conn_type == ConnectionType.USB}
# 拔掉——同步
for udid in list(self._models):
if udid not in online:
self._remove_device(udid)
# 插上——异步
new = [u for u in online if u not in self._models]
if new:
futures = {self._pool.submit(self._add_device, u): u for u in new}
for f in as_completed(futures, timeout=30):
try:
f.result()
except Exception as e:
LogManager.error(f"异步连接失败:{e}")
time.sleep(1)
self._lock = threading.Lock()
self._model_index: Dict[str, DeviceModel] = {} # udid -> model
# ✅ 失踪时间戳记录(替代原来的 miss_count
self._last_seen: Dict[str, float] = {}
self._port_pool: List[int] = []
self._port_in_use: set[int] = set()
# ---------------- 新增设备 ----------------
def _add_device(self, udid: str):
if not self._trusted(udid):
return
port = self._alloc_port()
proc = self._start_iproxy(udid, port)
if not proc:
return
w, h, s = self._screen_info(udid)
model = DeviceModel(deviceId=udid, screenPort=port,
width=w, height=h, scale=s, type=1)
model.ready = True
self._models[udid] = model
self._procs[udid] = proc
self._manager_send(model)
# ✅ 新增:全局 iproxy 进程注册表 udid -> Popen
self._iproxy_registry: Dict[str, subprocess.Popen] = {}
# ---------------- 移除设备 ----------------
def _remove_device(self, udid: str):
model = self._models.pop(udid, None)
if not model:
return
model.type = 2
self._kill(self._procs.pop(udid, None))
self._manager_send(model)
# region iproxy 初始化(原逻辑不变)
# ---------------- 工具函数 ----------------
def _trusted(self, udid: str) -> bool:
try:
self.iproxy_path = self._iproxy_path()
self.iproxy_dir = self.iproxy_path.parent
os.environ["PATH"] = str(self.iproxy_dir) + os.pathsep + os.environ.get("PATH", "")
BaseDevice(udid).get_value("DeviceName")
return True
except Exception:
return False
def _screen_info(self, udid: str):
try:
c = wda.USBClient(udid, 8100)
size = c.window_size()
scale = c.scale
return int(size.width), int(size.height), float(scale)
except Exception:
return 828, 1792, 2.0
def _start_iproxy(self, udid: str, port: int) -> Optional[subprocess.Popen]:
try:
return subprocess.Popen(
[self._iproxy_path, "-u", udid, str(port), "9100"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
except Exception:
return None
def _kill(self, proc: Optional[subprocess.Popen]):
if not proc:
return
try:
proc.terminate()
proc.wait(timeout=2)
except Exception:
try:
os.add_dll_directory(str(self.iproxy_dir))
os.kill(proc.pid, signal.SIGKILL)
except Exception:
pass
self._creationflags = 0x08000000 if os.name == "nt" else 0
def _alloc_port(self) -> int:
self._port += 1
return self._port
self._popen_kwargs = dict(
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=str(self.iproxy_dir),
shell=False,
text=True,
creationflags=0x08000000 if os.name == "nt" else 0, # CREATE_NO_WINDOW
encoding="utf-8",
bufsize=1,
)
def _manager_send(self, model: DeviceModel):
try:
self._manager.send(model.toDict())
except Exception:
pass
def _spawn_iproxy(udid: str, local_port: int, remote_port: int = 9100) -> subprocess.Popen:
args = [str(self.iproxy_path), "-u", udid, str(local_port), str(remote_port)]
p = subprocess.Popen(args, **self._popen_kwargs)
def _find_iproxy(self) -> str:
base = Path(__file__).resolve().parent.parent
name = "iproxy.exe"
path = base / "resources" / "iproxy" / name
if path.is_file():
return str(path)
raise FileNotFoundError(f"iproxy 不存在: {path}")
# ✅ 注册到全局表
self._iproxy_registry[udid] = p
def _pipe_to_log(name: str, stream):
try:
for line in iter(stream.readline, ''):
s = line.strip()
if s:
LogManager.info(f"[iproxy {name}] {s}", udid)
except Exception:
pass
threading.Thread(target=_pipe_to_log, args=("STDOUT", p.stdout), daemon=True).start()
threading.Thread(target=_pipe_to_log, args=("STDERR", p.stderr), daemon=True).start()
return p
self._spawn_iproxy = _spawn_iproxy
LogManager.info(f"iproxy 启动器已就绪,目录: {self.iproxy_dir}")
except Exception as e:
self.iproxy_path = None
self.iproxy_dir = None
self._spawn_iproxy = None
LogManager.error(f"初始化 iproxy 失败:{e}")
# endregion
# ------------------------------------------------------------------
# 主监听循环 → 只负责“发现”和“提交任务”
# ------------------------------------------------------------------
def startDeviceListener(self):
MISS_WINDOW = 5.0
# ---------------- janitor扫野进程 ----------------
def _janitor(self):
while True:
try:
lists = Usbmux().device_list()
except Exception as e:
LogManager.warning(f"usbmuxd 连接失败: {e}2 秒后重试")
time.sleep(2)
continue
now_udids = {d.udid for d in lists if d.conn_type == ConnectionType.USB}
usb_sn_set = self._usb_enumerate_sn()
# 1. 失踪判定(同旧逻辑)
need_remove = []
with self._lock:
for udid in list(self._model_index.keys()):
if udid not in now_udids:
last = self._last_seen.get(udid, time.time())
if time.time() - last > MISS_WINDOW and udid not in usb_sn_set:
need_remove.append(udid)
else:
self._last_seen[udid] = time.time()
for udid in need_remove:
self._remove_model(udid)
# ✅ 实时清理孤儿 iproxy原 10 秒改为每次循环)
time.sleep(10)
self._cleanup_orphan_iproxy()
# ✅ 设备全空时核平所有 iproxy
if not self.deviceModelList:
self._kill_all_iproxy()
# 2. 发现新设备 → 并发连接
with self._lock:
new_udids = [d.udid for d in lists
if d.conn_type == ConnectionType.USB and
d.udid not in self._model_index and
len(self.deviceModelList) < self.maxDeviceCount]
if new_udids:
futures = {self._connect_pool.submit(self._connect_device_task, udid): udid
for udid in new_udids}
try:
for f in as_completed(futures, timeout=30):
udid = futures[f]
try:
f.result(timeout=8)
except Exception as e:
LogManager.error(text=f"连接任务失败:{e}", udid=udid)
except TimeoutError:
LogManager.warning(text="部分设备连接超时,已跳过")
time.sleep(1)
# ------------------------------------------------------------------
# ✅ USB 层枚举 SN跨平台
# ------------------------------------------------------------------
def _usb_enumerate_sn(self) -> set[str]:
# ------------ Windows 专用:列出所有 iproxy 命令行 ------------
def _get_all_iproxy_cmdlines(self) -> List[str]:
try:
out = sp_check_output(["idevice_id", "-l"], text=True, timeout=3)
return {line.strip() for line in out.splitlines() if line.strip()}
except Exception:
return set()
raw = subprocess.check_output(
['wmic', 'process', 'where', "name='iproxy.exe'",
'get', 'CommandLine,ProcessId', '/value'],
stderr=subprocess.DEVNULL, text=True
)
except subprocess.CalledProcessError:
return []
lines: List[str] = []
for block in raw.split('\n\n'):
cmd = pid = ''
for line in block.splitlines():
line = line.strip()
if line.startswith('CommandLine='):
cmd = line[len('CommandLine='):].strip()
elif line.startswith('ProcessId='):
pid = line[len('ProcessId='):].strip()
if cmd and pid and '-u' in cmd:
lines.append(f'{cmd} {pid}')
return lines
# ----------------------------------------------------------
# ✅ 清理孤儿 iproxy
# ----------------------------------------------------------
# ------------ 杀孤儿 ------------
def _cleanup_orphan_iproxy(self):
live_udids = set(self._model_index.keys())
for udid, proc in list(self._iproxy_registry.items()):
if udid not in live_udids:
LogManager.warning(f"发现孤儿 iproxy 进程UDID 不在线:{udid},正在清理")
self._terminate_proc(proc)
self._iproxy_registry.pop(udid, None)
# ----------------------------------------------------------
# ✅ 核平所有 iproxyWindows / macOS 通用)
# ----------------------------------------------------------
def _kill_all_iproxy(self):
try:
if os.name == "nt":
subprocess.run(["taskkill", "/F", "/IM", "iproxy.exe"], check=False)
else:
subprocess.run(["pkill", "-f", "iproxy"], check=False)
self._iproxy_registry.clear()
LogManager.info("已强制清理所有 iproxy 进程")
except Exception as e:
LogManager.warning(f"强制清理 iproxy 失败:{e}")
# -------------------- 以下代码与原文件完全一致 --------------------
def _wda_health_checker(self):
while True:
time.sleep(1)
with self._lock:
online = [m for m in self.deviceModelList if m.ready]
for model in online:
udid = model.deviceId
if not self._wda_ok(udid):
LogManager.warning(f"WDA 异常,重启通道:{udid}", udid)
with self._lock:
self._remove_model(udid)
self.connectDevice(udid)
def _wda_ok(self, udid: str) -> bool:
try:
c = wda.USBClient(udid, 8100)
st = c.status()
if st.get("state") != "success":
return False
return True
except Exception as e:
LogManager.error(f"WDA health-check 异常:{e}", udid)
return False
# -------------------- 增删改查唯一入口(未改动) --------------------
def _has_model(self, udid: str) -> bool:
return udid in self._model_index
def _add_model(self, model: DeviceModel):
if model.deviceId in self._model_index:
return
model.ready = True
self.deviceModelList.append(model)
self._model_index[model.deviceId] = model
try:
self.manager.send(model.toDict())
except Exception as e:
LogManager.warning(f"{model.deviceId} 发送上线事件失败:{e}")
LogManager.method_info(f"{model.deviceId} 加入设备成功,当前在线数:{len(self.deviceModelList)}", method="device_count")
def _remove_model(self, udid: str):
print(f"【删】进入删除方法 udid={udid}")
LogManager.method_info(f"【删】进入删除方法 udid={udid}", method="device_count")
with self._lock:
print(f"【删】拿到锁 udid={udid}")
LogManager.method_info(f"【删】拿到锁 udid={udid}", method="device_count")
model = self._model_index.pop(udid, None)
if not model:
print(f"【删】模型已空,直接返回 udid={udid}")
LogManager.method_info(f"【删】模型已空,直接返回 udid={udid}", method="device_count")
return
if model.deleting:
print(f"【删】正在删除中,幂等返回 udid={udid}")
LogManager.method_info(method="device_count", text=f"【删】正在删除中,幂等返回 udid={udid}")
return
model.deleting = True
model.type = 2
print(f"【删】标记 deleting=True udid={udid}")
LogManager.method_info("【删】标记 deleting=True udid={udid}", "device_count")
before = len(self.deviceModelList)
self.deviceModelList = [m for m in self.deviceModelList if m.deviceId != udid]
after = len(self.deviceModelList)
print(f"【删】列表过滤 before={before} → after={after} udid={udid}")
LogManager.method_info(f"【删】列表过滤 before={before} → after={after} udid={udid}", "device_count")
self._port_in_use.discard(model.screenPort)
self._port_pool.append(model.screenPort)
print(f"【删】回收端口 port={model.screenPort} udid={udid}")
LogManager.method_info(f"【删】回收端口 port={model.screenPort} udid={udid}", method="device_count")
to_kill = [item for item in self.pidList if item.get("id") == udid]
self.pidList = [item for item in self.pidList if item.get("id") != udid]
print(f"【删】待杀进程数 count={len(to_kill)} udid={udid}")
LogManager.method_info(f"【删】待杀进程数 count={len(to_kill)} udid={udid}", method="device_count")
# ✅ 先清理注册表中的 iproxy
iproxy_proc = self._iproxy_registry.pop(udid, None)
if iproxy_proc:
self._terminate_proc(iproxy_proc)
for idx, item in enumerate(to_kill, 1):
print(f"【删】杀进程 {idx}/{len(to_kill)} pid={item.get('target').pid} udid={udid}")
LogManager.method_info(f"【删】杀进程 {idx}/{len(to_kill)} pid={item.get('target').pid} udid={udid}", method="device_count")
self._terminate_proc(item.get("target"))
print(f"【删】进程清理完成 udid={udid}")
LogManager.method_info(f"【删】进程清理完成 udid={udid}", method="device_count")
retry = 3
while retry:
live_udids = set(self._models.keys())
for ln in self._get_all_iproxy_cmdlines():
parts = ln.split()
try:
self.manager.send(model.toDict())
print(f"【删】下线事件已发送 udid={udid}")
LogManager.method_info(f"【删】下线事件已发送 udid={udid}", method="device_count")
break
except Exception as e:
retry -= 1
print(f"【删】发送事件失败 retry={retry} err={e} udid={udid}")
LogManager.method_error(f"【删】发送事件失败 retry={retry} err={e} udid={udid}", method="device_count")
time.sleep(0.2)
else:
print(f"【删】发送事件彻底失败,主动退出 udid={udid}")
LogManager.method_error(f"【删】发送事件彻底失败,主动退出 udid={udid}", method="device_count")
print(f"【删】===== 设备 {udid} 删除全流程结束 =====")
LogManager.method_info(f"【删】===== 设备 {udid} 删除全流程结束 =====", method="device_count")
print(len(self.deviceModelList))
LogManager.method_info(f"当前剩余设备数量:{len(self.deviceModelList)}", method="device_count")
# -------------------- 端口分配与回收(未改动) --------------------
def _alloc_port(self) -> int:
if self._port_pool:
port = self._port_pool.pop()
else:
self.screenProxy += 1
port = self.screenProxy
self._port_in_use.add(port)
return port
def _free_port(self, port: int):
if port in self._port_in_use:
self._port_in_use.remove(port)
self._port_pool.append(port)
# 检测usb是否超时
def _usb_client_with_timeout(self,udid: str, timeout: 8):
LogManager.info(f"[CONNECT FLOW] 即将创建 USBClient超时 {timeout}s", udid)
with ThreadPoolExecutor(max_workers=1) as exe:
fut = exe.submit(wda.USBClient, udid, 8100)
try:
client = fut.result(timeout=timeout)
LogManager.info("[CONNECT FLOW] USBClient 创建成功", udid)
return client
except Exception as e:
LogManager.error(f"[CONNECT FLOW] USBClient 创建失败(超时或异常): {e}", udid)
return None
# ------------------------------------------------------------------
# 线程池里真正干活的地方(原 connectDevice 逻辑搬过来)
# ------------------------------------------------------------------
def _connect_device_task(self, udid: str):
LogManager.info(f"[CONNECT FLOW] >>>>>>>>> 开始处理设备 {udid} >>>>>>>>>", udid)
# 1. 信任检测
trusted = self.is_device_trusted(udid)
LogManager.info(f"[CONNECT FLOW] 信任检测结果 trusted={trusted}", udid)
if not trusted:
LogManager.warning("[CONNECT FLOW] 设备未信任,直接返回", udid)
return
LogManager.info("[CONNECT FLOW] 信任检测通过,准备创建 USBClient", udid)
# 2. 创建 WDA 客户端(带超时)
try:
d = self._usb_client_with_timeout(udid, 8)
if d is None:
LogManager.error("[CONNECT FLOW] USBClient 返回 None超时或异常直接返回", udid)
return
LogManager.info("[CONNECT FLOW] USBClient 创建成功", udid)
except Exception as e:
LogManager.error(f"[CONNECT FLOW] USBClient 抛异常: {e}", udid)
return
# 3. 读取屏幕信息
width, height, scale = 0, 0, 1.0
try:
size = d.window_size()
width, height = size.width, size.height
scale = d.scale
LogManager.info(f"[CONNECT FLOW] 屏幕信息 width={width} height={height} scale={scale}", udid)
except Exception as e:
LogManager.warning(f"[CONNECT FLOW] 读取屏幕信息失败: {e},继续使用默认值", udid)
# 4. 分配端口
port = self._alloc_port()
LogManager.info(f"[CONNECT FLOW] 分配投屏端口 {port}", udid)
# 5. 启动 WDA
try:
LogManager.info("[CONNECT FLOW] 正在启动 WDA 应用", udid)
d.app_start(WdaAppBundleId)
LogManager.info("[CONNECT FLOW] WDA 应用启动完成,等待 2s", udid)
except Exception as e:
LogManager.warning(f"[CONNECT FLOW] 启动 WDA 失败: {e},仍继续", udid)
time.sleep(2)
# 6. 启动 iproxy
LogManager.info(f"[CONNECT FLOW] 准备启动 iproxy 本地 {port} -> 设备 9100", udid)
target = self.relayDeviceScreenPort(udid, port)
if target is None:
LogManager.error("[CONNECT FLOW] iproxy 启动失败,释放端口并返回", udid)
self._free_port(port)
return
LogManager.info("[CONNECT FLOW] iproxy 启动成功,进程已注册", udid)
# 7. 抢锁写内存
with self._lock:
if udid in self._model_index:
LogManager.warning(f"[CONNECT FLOW] 并发重复,已存在内存中,放弃本次任务", udid)
self._terminate_proc(target) # 避免孤儿
self._free_port(port)
return
model = DeviceModel(udid, port, width, height, scale, type=1)
self._add_model(model)
self.pidList.append({"target": target, "id": udid})
LogManager.info(f"[CONNECT FLOW] 设备模型已加入内存,当前在线数: {len(self.deviceModelList)}", udid)
LogManager.info(f"[CONNECT FLOW] <<<<<<<<< 设备 {udid} 处理完成 <<<<<<<<", udid)
# ------------------------------------------------------------------
# 原函数保留(改名即可)
# ------------------------------------------------------------------
def connectDevice(self, udid: str):
"""对外保留接口,实际走线程池"""
self._connect_pool.submit(self._connect_device_task, udid)
# -------------------- 工具方法(未改动) --------------------
def is_device_trusted(self, udid: str) -> bool:
try:
LogManager.info(f"[CONNECT FLOW] 开始信任检测", udid)
d = BaseDevice(udid)
name = d.get_value("DeviceName")
LogManager.info(f"[CONNECT FLOW] 信任检测成功DeviceName={name}", udid)
return True
except Exception as e:
LogManager.warning(f"[CONNECT FLOW] 信任检测失败: {e}", udid)
return False
def relayDeviceScreenPort(self, udid: str, port: int) -> Optional[subprocess.Popen]:
LogManager.info(f"[CONNECT FLOW] 进入 relayDeviceScreenPort目标端口 {port}", udid)
if not self._spawn_iproxy:
LogManager.error("[CONNECT FLOW] _spawn_iproxy 未初始化,返回 None", udid)
return None
for attempt in range(5):
if not self._is_port_open(port):
LogManager.info(f"[CONNECT FLOW] 端口 {port} 检测为空闲", udid)
break
LogManager.warning(f"[CONNECT FLOW] 端口 {port} 仍被占用,第 {attempt + 1}/5 次尝试释放", udid)
pid = self._get_pid_by_port(port)
if pid and pid != os.getpid():
LogManager.info(f"[CONNECT FLOW] 准备 kill 占用端口 {port} 的 PID {pid}", udid)
self._kill_pid_gracefully(pid)
time.sleep(0.2)
else:
LogManager.error("[CONNECT FLOW] 连续 5 次无法释放端口,放弃", udid)
return None
try:
p = self._spawn_iproxy(udid, port, 9100)
LogManager.info(f"[CONNECT FLOW] iproxy 启动完成PID={p.pid}", udid)
return p
except Exception as e:
LogManager.error(f"[CONNECT FLOW] iproxy 启动异常: {e}", udid)
return None
def _is_port_open(self, port: int) -> bool:
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(("127.0.0.1", port)) == 0
def _get_pid_by_port(self, port: int) -> Optional[int]:
try:
if os.name == "nt":
out = sp_check_output(["netstat", "-ano", "-p", "tcp"], text=True)
for line in out.splitlines():
if f"127.0.0.1:{port}" in line and "LISTENING" in line:
return int(line.strip().split()[-1])
else:
out = sp_check_output(["lsof", "-t", f"-iTCP:{port}", "-sTCP:LISTEN"], text=True)
return int(out.strip().split()[0])
except Exception:
return None
udid = parts[parts.index('-u') + 1]
pid = int(parts[-1])
if udid not in live_udids:
self._kill_pid_gracefully(pid)
LogManager.warning(f'扫到孤儿 iproxy已清理 {udid} PID={pid}')
except (ValueError, IndexError):
continue
# ------------ 按 PID 强杀 ------------
def _kill_pid_gracefully(self, pid: int):
try:
os.kill(pid, signal.SIGTERM)
@@ -470,32 +180,3 @@ class Deviceinfo(object):
except Exception:
pass
def _terminate_proc(self, p: Optional[subprocess.Popen]):
if not p or p.poll() is not None:
return
try:
p.terminate()
p.wait(timeout=3)
except Exception:
try:
if os.name == "posix":
os.killpg(os.getpgid(p.pid), signal.SIGKILL)
else:
p.kill()
p.wait(timeout=2)
except Exception:
pass
def _base_dir(self) -> Path:
if getattr(sys, "frozen", False):
return Path(sys.executable).resolve().parent
return Path(__file__).resolve().parents[1]
def _iproxy_path(self) -> Path:
exe = "iproxy.exe" if os.name == "nt" else "iproxy"
base = self._base_dir()
candidates = [base / "resources" / "iproxy" / exe]
for p in candidates:
if p.exists():
return p
raise FileNotFoundError(f"iproxy not found, tried: {[str(c) for c in candidates]}")