Files
iOSAI/Module/DeviceInfo.py
2025-09-08 13:48:21 +08:00

291 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import os
import signal
import sys
import time
import json
import wda
import threading
import subprocess
from pathlib import Path
from typing import List, Dict, Optional
from tidevice import Usbmux, ConnectionType
from tidevice._device import BaseDevice
from Entity.DeviceModel import DeviceModel
from Entity.Variables import WdaAppBundleId
from Module.FlaskSubprocessManager import FlaskSubprocessManager
from Utils.LogManager import LogManager
class Deviceinfo(object):
def __init__(self):
self.deviceIndex = 0
self.screenProxy = 9110
self.pidList: List[Dict] = []
self.deviceArray: List = []
self.manager = FlaskSubprocessManager.get_instance()
self.deviceModelList: List[DeviceModel] = []
self.maxDeviceCount = 6
self._lock = threading.Lock()
self._pending_udids = set()
try:
self.iproxy_path = self._iproxy_path()
self.iproxy_dir = self.iproxy_path.parent
os.environ["PATH"] = str(self.iproxy_dir) + os.pathsep + os.environ.get("PATH", "")
try:
os.add_dll_directory(str(self.iproxy_dir))
except Exception:
pass
self._creationflags = 0x08000000 if os.name == "nt" else 0
self._popen_kwargs = dict(
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=str(self.iproxy_dir),
shell=False,
text=True,
creationflags=self._creationflags,
encoding="utf-8",
bufsize=1,
)
def _spawn_iproxy(udid: str, local_port: int, remote_port: int = 9100) -> subprocess.Popen:
args = [str(self.iproxy_path), "-u", udid, str(local_port), str(remote_port)]
p = subprocess.Popen(args, **self._popen_kwargs)
def _pipe_to_log(name: str, stream):
try:
for line in iter(stream.readline, ''):
s = line.strip()
if s:
LogManager.info(f"[iproxy {name}] {s}", udid)
except Exception:
pass
try:
threading.Thread(target=_pipe_to_log, args=("STDOUT", p.stdout), daemon=True).start()
threading.Thread(target=_pipe_to_log, args=("STDERR", p.stderr), daemon=True).start()
except Exception:
pass
return p
self._spawn_iproxy = _spawn_iproxy
LogManager.info(f"iproxy 启动器已就绪,目录: {self.iproxy_dir}")
except Exception as e:
self.iproxy_path = None
self.iproxy_dir = None
self._spawn_iproxy = None
LogManager.error(f"初始化 iproxy 失败:{e}")
# ----------------------------
# 监听设备连接(死循环,内部捕获异常)
# ----------------------------
def startDeviceListener(self):
while True:
try:
lists = Usbmux().device_list()
except Exception as e:
LogManager.warning(
f"usbmuxd 连接失败: {e}。请确认已安装 iTunes/Apple Mobile Device Support并在手机上“信任此电脑”")
time.sleep(2)
continue
now_udids = {d.udid for d in lists if d.conn_type == ConnectionType.USB}
# 1. 处理“已插入但未信任”的设备,一旦信任就补连接
for udid in list(self._pending_udids):
if udid not in now_udids:
# 设备已拔出,从 pending 移除
self._pending_udids.discard(udid)
continue
if self.is_device_trusted(udid):
# 已信任,补连接
self._pending_udids.discard(udid)
self.screenProxy += 1
try:
self.connectDevice(udid)
# 补加入 deviceArray用 usbmux 对象)
for d in lists:
if d.udid == udid:
self.deviceArray.append(d)
break
except Exception as e:
LogManager.error(f"补连接设备失败 {udid}: {e}", udid)
# 2. 处理全新插入的设备
for device in lists:
if device.conn_type == ConnectionType.USB and device not in self.deviceArray and len(
self.deviceArray) < self.maxDeviceCount:
if not self.is_device_trusted(device.udid):
# 未信任,记入 pending下次循环再判
self._pending_udids.add(device.udid)
LogManager.warning("设备未信任,已记录,等待信任后自动连接", device.udid)
continue
# 已信任,直接走完整流程
self.screenProxy += 1
try:
self.connectDevice(device.udid)
self.deviceArray.append(device)
except Exception as e:
LogManager.error(f"连接设备失败 {device.udid}: {e}", device.udid)
# 3. 处理拔出
self._removeDisconnected(lists)
time.sleep(1)
# ----------------------------
# 判断设备是否已信任
# ----------------------------
def is_device_trusted(self, udid: str) -> bool:
try:
d = BaseDevice(udid)
d.get_value("DeviceName") # 任意读取一个值,失败即未信任
return True
except Exception:
return False
# ----------------------------
# 连接单台设备:先判断是否信任,再启动 WDA
# ----------------------------
def connectDevice(self, identifier: str):
if not self.is_device_trusted(identifier):
LogManager.warning("设备未信任,跳过 WDA 启动,等待信任后再试", identifier)
return
try:
d = wda.USBClient(identifier, 8100)
LogManager.info("启动 WDA 成功", identifier)
except Exception as e:
LogManager.error(f"启动 WDA 失败请检查手机是否已信任、WDA 是否正常。错误: {e}", identifier)
return
width, height, scale = 0, 0, 1.0
try:
size = d.window_size()
width, height = size.width, size.height
scale = d.scale
except Exception as e:
LogManager.warning(f"读取屏幕信息失败:{e}", identifier)
model = DeviceModel(identifier, self.screenProxy, width, height, scale, type=1)
self.deviceModelList.append(model)
try:
self.manager.send(model.toDict())
except Exception as e:
LogManager.warning(f"向前端发送设备模型失败:{e}", identifier)
try:
d.app_start(WdaAppBundleId)
d.home()
except Exception as e:
LogManager.warning(f"启动/切回桌面失败:{e}", identifier)
time.sleep(2)
target = self.relayDeviceScreenPort(identifier)
if target is not None:
with self._lock:
self.pidList.append({"target": target, "id": identifier})
# ----------------------------
# 以下方法未改动,省略以节省篇幅
# ----------------------------
def _terminate_proc(self, p: subprocess.Popen):
if not p:
return
if p.poll() is not None:
return
try:
p.terminate()
p.wait(timeout=3)
except Exception:
try:
if os.name == "posix":
try:
os.killpg(os.getpgid(p.pid), signal.SIGKILL)
except Exception:
p.kill()
else:
p.kill()
p.wait(timeout=2)
except Exception:
pass
def _removeDisconnected(self, current_list):
try:
prev_udids = {getattr(d, "udid", None) for d in self.deviceArray if getattr(d, "udid", None)}
now_udids = {getattr(d, "udid", None) for d in current_list if getattr(d, "udid", None)}
except Exception as e:
LogManager.error(f"收集 UDID 失败:{e}", "")
return
removed_udids = prev_udids - now_udids
if not removed_udids:
return
if not hasattr(self, "_lock"):
self._lock = threading.RLock()
with self._lock:
for udid in list(removed_udids):
for a in list(self.deviceModelList):
if udid == getattr(a, "deviceId", None):
a.type = 2
try:
self.manager.send(a.toDict())
except Exception as e:
LogManager.warning(f"发送下线事件失败:{e}", udid)
try:
self.deviceModelList.remove(a)
except ValueError:
pass
survivors = []
for k in list(self.pidList):
kid = k.get("id")
if kid in removed_udids:
p = k.get("target")
try:
self._terminate_proc(p)
except Exception as e:
LogManager.warning(f"关闭 iproxy 异常:{e}", kid)
else:
survivors.append(k)
self.pidList = survivors
self.deviceArray = [d for d in self.deviceArray if getattr(d, "udid", None) not in removed_udids]
for udid in removed_udids:
LogManager.info("设备已拔出,清理完成(下线通知 + 端口映射关闭 + 状态移除)", udid)
def _base_dir(self) -> Path:
if getattr(sys, "frozen", False):
return Path(sys.executable).resolve().parent
return Path(__file__).resolve().parents[1]
def _iproxy_path(self) -> Path:
exe = "iproxy.exe" if os.name == "nt" else "iproxy"
base = self._base_dir()
candidates = [
base / "resources" / "iproxy" / exe,
]
for p in candidates:
if p.exists():
return p
raise FileNotFoundError(f"iproxy not found, tried: {[str(c) for c in candidates]}")
def relayDeviceScreenPort(self, udid: str) -> Optional[subprocess.Popen]:
if not self._spawn_iproxy:
LogManager.error("iproxy 启动器未就绪,无法建立端口映射(初始化时未找到 iproxy", udid)
return None
try:
p = self._spawn_iproxy(udid, self.screenProxy, 9100)
LogManager.info(f"启动 iproxy 成功,本地 {self.screenProxy} -> 设备 9100", udid)
return p
except Exception as e:
LogManager.error(f"启动 iproxy 失败:{e}", udid)
return None