Files
iOSAI/Module/DeviceInfo.py

407 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import os
import signal
import sys
import time
import wda
import threading
import subprocess
from pathlib import Path
from typing import List, Dict, Optional
from tidevice import Usbmux, ConnectionType
from tidevice._device import BaseDevice
from Entity.DeviceModel import DeviceModel
from Entity.Variables import WdaAppBundleId
from Module.FlaskSubprocessManager import FlaskSubprocessManager
from Utils.LogManager import LogManager
class Deviceinfo(object):
"""设备生命周期管理:以 deviceModelList 为唯一真理源"""
def __init__(self):
self.deviceIndex = 0
self.screenProxy = 9110
self.pidList: List[Dict] = [] # 仅记录 iproxy 进程
self.manager = FlaskSubprocessManager.get_instance()
self.deviceModelList: List[DeviceModel] = [] # 根基,不动
self.maxDeviceCount = 6
self._lock = threading.Lock()
self._model_index: Dict[str, DeviceModel] = {} # udid -> model
self._miss_count: Dict[str, int] = {} # udid -> 连续未扫描到次数
self._port_pool: List[int] = [] # 端口回收池
self._port_in_use: set[int] = set() # 正在使用的端口
# 🔥1. 启动 WDA 健康检查线程
# threading.Thread(target=self._wda_health_checker, daemon=True).start()
# region iproxy 初始化
try:
self.iproxy_path = self._iproxy_path()
self.iproxy_dir = self.iproxy_path.parent
os.environ["PATH"] = str(self.iproxy_dir) + os.pathsep + os.environ.get("PATH", "")
try:
os.add_dll_directory(str(self.iproxy_dir))
except Exception:
pass
self._creationflags = 0x08000000 if os.name == "nt" else 0
self._popen_kwargs = dict(
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=str(self.iproxy_dir),
shell=False,
text=True,
creationflags=self._creationflags,
encoding="utf-8",
bufsize=1,
)
def _spawn_iproxy(udid: str, local_port: int, remote_port: int = 9100) -> subprocess.Popen:
args = [str(self.iproxy_path), "-u", udid, str(local_port), str(remote_port)]
p = subprocess.Popen(args, **self._popen_kwargs)
def _pipe_to_log(name: str, stream):
try:
for line in iter(stream.readline, ''):
s = line.strip()
if s:
LogManager.info(f"[iproxy {name}] {s}", udid)
except Exception:
pass
threading.Thread(target=_pipe_to_log, args=("STDOUT", p.stdout), daemon=True).start()
threading.Thread(target=_pipe_to_log, args=("STDERR", p.stderr), daemon=True).start()
return p
self._spawn_iproxy = _spawn_iproxy
LogManager.info(f"iproxy 启动器已就绪,目录: {self.iproxy_dir}")
except Exception as e:
self.iproxy_path = None
self.iproxy_dir = None
self._spawn_iproxy = None
LogManager.error(f"初始化 iproxy 失败:{e}")
# endregion
def startDeviceListener(self):
while True:
try:
lists = Usbmux().device_list()
except Exception as e:
LogManager.warning(f"usbmuxd 连接失败: {e}2 秒后重试")
time.sleep(2)
continue
now_udids = {d.udid for d in lists if d.conn_type == ConnectionType.USB}
# 1. 失踪登记 & 累加
need_remove = None # ← 新增:放锁外记录
with self._lock:
for udid in list(self._model_index.keys()):
if udid not in now_udids:
self._miss_count[udid] = self._miss_count.get(udid, 0) + 1
if self._miss_count[udid] >= 3:
self._miss_count.pop(udid, None)
need_remove = udid # ← 只记录,不调用
else:
self._miss_count.pop(udid, None)
# 🔓 锁已释放,再删设备(不会重入)
if need_remove:
self._remove_model(need_remove)
# 2. 全新插入(只处理未在线且信任且未满)
for d in lists:
if d.conn_type != ConnectionType.USB:
continue
udid = d.udid
with self._lock:
if udid in self._model_index:
continue
if not self.is_device_trusted(udid):
continue
if len(self.deviceModelList) >= self.maxDeviceCount:
continue
try:
self.connectDevice(udid)
except Exception as e:
LogManager.error(f"连接设备失败 {udid}: {e}", udid)
time.sleep(1)
# 🔥2. WDA 健康检查
def _wda_health_checker(self):
while True:
time.sleep(1)
print(len(self.deviceModelList))
with self._lock:
online = [m for m in self.deviceModelList if m.ready] # ← 只检查就绪的
print(len(online))
for model in online:
udid = model.deviceId
if not self._wda_ok(udid):
LogManager.warning(f"WDA 异常,重启通道:{udid}", udid)
with self._lock:
self._remove_model(udid)
self.connectDevice(udid)
# 🔥3. 真正做 health-check 的地方
def _wda_ok(self, udid: str) -> bool:
"""返回 True 表示 WDA 活着False 表示已死"""
try:
# 用 2 秒超时快速探测
c = wda.USBClient(udid, 8100)
# 下面这句就是“xctest launched but check failed” 的触发点
# 如果 status 里返回了 WebDriverAgent 运行信息就认为 OK
st = c.status()
if st.get("state") != "success":
return False
# 你也可以再苛刻一点,多做一次 /wda/healthcheck
# c.http.get("/wda/healthcheck")
return True
except Exception as e:
# 任何异常连接拒绝、超时、json 解析失败)都认为已死
LogManager.error(f"WDA health-check 异常:{e}", udid)
return False
# region ===================== 增删改查唯一入口(线程安全) =====================
def _has_model(self, udid: str) -> bool:
return udid in self._model_index
def _add_model(self, model: DeviceModel):
if model.deviceId in self._model_index:
return # 防重复
model.ready = True
self.deviceModelList.append(model)
self._model_index[model.deviceId] = model
try:
self.manager.send(model.toDict())
except Exception as e:
LogManager.warning(f"{model.deviceId} 发送上线事件失败:{e}")
LogManager.method_info(f"{model.deviceId} 加入设备成功,当前在线数:{len(self.deviceModelList)}",method="device_count")
# 删除设备
def _remove_model(self, udid: str):
print(f"【删】进入删除方法 udid={udid}")
LogManager.method_info(f"【删】进入删除方法 udid={udid}", method="device_count")
# 1. 纯内存临界区——毫秒级
with self._lock:
print(f"【删】拿到锁 udid={udid}")
LogManager.method_info(f"【删】拿到锁 udid={udid}",
method="device_count")
model = self._model_index.pop(udid, None)
if not model:
print(f"【删】模型已空,直接返回 udid={udid}")
LogManager.method_info(f"【删】模型已空,直接返回 udid={udid}",method="device_count")
return
if model.deleting:
print(f"【删】正在删除中,幂等返回 udid={udid}")
LogManager.method_info(method="device_count", text=f"【删】正在删除中,幂等返回 udid={udid}")
return
model.deleting = True
# 标记维删除设备
model.type = 2
print(f"【删】标记 deleting=True udid={udid}")
LogManager.method_info("【删】标记 deleting=True udid={udid}","device_count")
# 过滤列表
before = len(self.deviceModelList)
self.deviceModelList = [m for m in self.deviceModelList if m.deviceId != udid]
after = len(self.deviceModelList)
print(f"【删】列表过滤 before={before} → after={after} udid={udid}")
LogManager.method_info(f"【删】列表过滤 before={before} → after={after} udid={udid}","device_count")
# 端口
self._port_in_use.discard(model.screenPort)
self._port_pool.append(model.screenPort)
print(f"【删】回收端口 port={model.screenPort} udid={udid}")
LogManager.method_info(f"【删】回收端口 port={model.screenPort} udid={udid}", method="device_count")
# 进程
to_kill = [item for item in self.pidList if item.get("id") == udid]
self.pidList = [item for item in self.pidList if item.get("id") != udid]
print(f"【删】待杀进程数 count={len(to_kill)} udid={udid}")
LogManager.method_info(f"【删】待杀进程数 count={len(to_kill)} udid={udid}", method="device_count")
# 2. IO 区无锁
for idx, item in enumerate(to_kill, 1):
print(f"【删】杀进程 {idx}/{len(to_kill)} pid={item.get('target').pid} udid={udid}")
LogManager.method_error(f"【删】杀进程 {idx}/{len(to_kill)} pid={item.get('target').pid} udid={udid}", method="device_count")
self._terminate_proc(item.get("target"))
print(f"【删】进程清理完成 udid={udid}")
LogManager.method_info(f"【删】进程清理完成 udid={udid}", method="device_count")
# 3. 网络 IO
retry = 3
while retry:
try:
self.manager.send(model.toDict())
print(f"【删】下线事件已发送 udid={udid}")
LogManager.method_info(f"【删】下线事件已发送 udid={udid}", method="device_count")
break
except Exception as e:
retry -= 1
print(f"【删】发送事件失败 retry={retry} err={e} udid={udid}")
LogManager.method_error(f"【删】发送事件失败 retry={retry} err={e} udid={udid}", method="device_count")
time.sleep(0.2)
else:
print(f"【删】发送事件彻底失败,主动退出 udid={udid}")
LogManager.method_error(f"【删】发送事件彻底失败,主动退出 udid={udid}", method="device_count")
print(f"【删】===== 设备 {udid} 删除全流程结束 =====")
LogManager.method_info(f"【删】===== 设备 {udid} 删除全流程结束 =====", method="device_count")
print(len(self.deviceModelList))
LogManager.method_info(f"当前剩余设备数量:{len(self.deviceModelList)}", method="device_count")
# region ===================== 端口分配与回收 =====================
def _alloc_port(self) -> int:
if self._port_pool:
port = self._port_pool.pop()
else:
self.screenProxy += 1
port = self.screenProxy
self._port_in_use.add(port)
return port
def _free_port(self, port: int):
if port in self._port_in_use:
self._port_in_use.remove(port)
self._port_pool.append(port)
# endregion
# region ===================== 单台设备连接 =====================
def connectDevice(self, udid: str):
if not self.is_device_trusted(udid):
LogManager.warning("设备未信任,跳过 WDA 启动", udid)
return
try:
d = wda.USBClient(udid, 8100)
except Exception as e:
LogManager.error(f"启动 WDA 失败: {e}", udid)
return
width, height, scale = 0, 0, 1.0
try:
size = d.window_size()
width, height = size.width, size.height
scale = d.scale
except Exception as e:
LogManager.warning(f"读取屏幕信息失败:{e}", udid)
port = self._alloc_port()
model = DeviceModel(udid, port, width, height, scale, type=1)
self._add_model(model)
try:
d.app_start(WdaAppBundleId)
d.home()
except Exception as e:
LogManager.warning(f"启动/切回桌面失败:{e}", udid)
time.sleep(2)
# 先清旧进程再启动新进程
self.pidList = [item for item in self.pidList if item.get("id") != udid]
target = self.relayDeviceScreenPort(udid, port)
if target:
self.pidList.append({"target": target, "id": udid})
# region ===================== 工具方法 =====================
def is_device_trusted(self, udid: str) -> bool:
try:
d = BaseDevice(udid)
d.get_value("DeviceName")
return True
except Exception:
return False
def relayDeviceScreenPort(self, udid: str, port: int) -> Optional[subprocess.Popen]:
"""启动 iproxy 前:端口若仍被占用则先杀掉占用者,再启动"""
if not self._spawn_iproxy:
LogManager.error("iproxy 启动器未就绪", udid)
return None
# --- 新增:端口冲突检查 + 强制清理 ---
while self._port_in_use and self._is_port_open(port):
# 先查是哪个进程占用
pid = self._get_pid_by_port(port)
if pid and pid != os.getpid():
LogManager.warning(f"端口 {port} 仍被 PID {pid} 占用,尝试释放", udid)
self._kill_pid_gracefully(pid)
else:
break
# -------------------------------------
try:
p = self._spawn_iproxy(udid, port, 9100)
self._port_in_use.add(port)
LogManager.info(f"启动 iproxy 成功,本地 {port} -> 设备 9100", udid)
return p
except Exception as e:
LogManager.error(f"启动 iproxy 失败:{e}", udid)
return None
# ------------------- 新增三个小工具 -------------------
def _is_port_open(self, port: int) -> bool:
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(("127.0.0.1", port)) == 0
def _get_pid_by_port(self, port: int) -> Optional[int]:
"""跨平台根据端口号查 PID失败返回 None"""
try:
if os.name == "nt":
cmd = ["netstat", "-ano", "-p", "tcp"]
out = subprocess.check_output(cmd, text=True)
for line in out.splitlines():
if f"127.0.0.1:{port}" in line and "LISTENING" in line:
return int(line.strip().split()[-1])
else:
cmd = ["lsof", "-t", f"-iTCP:{port}", "-sTCP:LISTEN"]
out = subprocess.check_output(cmd, text=True)
return int(out.strip().split()[0])
except Exception:
return None
def _kill_pid_gracefully(self, pid: int):
"""先 terminate 再 kill -9"""
try:
os.kill(pid, signal.SIGTERM)
time.sleep(1)
os.kill(pid, signal.SIGKILL)
except Exception:
pass
def _terminate_proc(self, p: Optional[subprocess.Popen]):
if not p or p.poll() is not None:
return
try:
p.terminate()
p.wait(timeout=3)
except Exception:
try:
if os.name == "posix":
os.killpg(os.getpgid(p.pid), signal.SIGKILL)
else:
p.kill()
p.wait(timeout=2)
except Exception:
pass
def _base_dir(self) -> Path:
if getattr(sys, "frozen", False):
return Path(sys.executable).resolve().parent
return Path(__file__).resolve().parents[1]
def _iproxy_path(self) -> Path:
exe = "iproxy.exe" if os.name == "nt" else "iproxy"
base = self._base_dir()
candidates = [base / "resources" / "iproxy" / exe]
for p in candidates:
if p.exists():
return p
raise FileNotFoundError(f"iproxy not found, tried: {[str(c) for c in candidates]}")