Files
iOSAI/Module/DeviceInfo.py
2025-09-23 20:17:33 +08:00

180 lines
6.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import signal
import subprocess
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Dict, Optional, List
import wda
from tidevice import Usbmux, ConnectionType
from tidevice._device import BaseDevice
from Entity.DeviceModel import DeviceModel
from Module.FlaskSubprocessManager import FlaskSubprocessManager
from Utils.LogManager import LogManager
class DeviceInfo:
def __init__(self):
self._port = 9110
self._models: Dict[str, DeviceModel] = {}
self._procs: Dict[str, subprocess.Popen] = {}
self._manager = FlaskSubprocessManager.get_instance()
self._iproxy_path = self._find_iproxy()
self._pool = ThreadPoolExecutor(max_workers=6)
# ---------------- 主循环 ----------------
def listen(self):
while True:
online = {d.udid for d in Usbmux().device_list() if d.conn_type == ConnectionType.USB}
# 拔掉——同步
for udid in list(self._models):
if udid not in online:
self._remove_device(udid)
# 插上——异步
new = [u for u in online if u not in self._models]
if new:
futures = {self._pool.submit(self._add_device, u): u for u in new}
for f in as_completed(futures, timeout=30):
try:
f.result()
except Exception as e:
LogManager.error(f"异步连接失败:{e}")
time.sleep(1)
# ---------------- 新增设备 ----------------
def _add_device(self, udid: str):
if not self._trusted(udid):
return
port = self._alloc_port()
proc = self._start_iproxy(udid, port)
if not proc:
return
w, h, s = self._screen_info(udid)
model = DeviceModel(deviceId=udid, screenPort=port,
width=w, height=h, scale=s, type=1)
model.ready = True
self._models[udid] = model
self._procs[udid] = proc
self._manager_send(model)
# ---------------- 移除设备 ----------------
def _remove_device(self, udid: str):
model = self._models.pop(udid, None)
if not model:
return
model.type = 2
self._kill(self._procs.pop(udid, None))
self._manager_send(model)
# ---------------- 工具函数 ----------------
def _trusted(self, udid: str) -> bool:
try:
BaseDevice(udid).get_value("DeviceName")
return True
except Exception:
return False
def _screen_info(self, udid: str):
try:
c = wda.USBClient(udid, 8100)
size = c.window_size()
scale = c.scale
return int(size.width), int(size.height), float(scale)
except Exception:
return 828, 1792, 2.0
...
# ---------------- 原来代码不变,只替换下面一个函数 ----------------
def _start_iproxy(self, udid: str, port: int) -> Optional[subprocess.Popen]:
try:
# 隐藏窗口的核心参数
kw = {"creationflags": subprocess.CREATE_NO_WINDOW}
return subprocess.Popen(
[self._iproxy_path, "-u", udid, str(port), "9100"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
**kw
)
except Exception as e:
print(e)
return None
def _kill(self, proc: Optional[subprocess.Popen]):
if not proc:
return
try:
proc.terminate()
proc.wait(timeout=2)
except Exception:
try:
os.kill(proc.pid, signal.SIGKILL)
except Exception:
pass
def _alloc_port(self) -> int:
self._port += 1
return self._port
def _manager_send(self, model: DeviceModel):
try:
self._manager.send(model.toDict())
except Exception:
pass
def _find_iproxy(self) -> str:
base = Path(__file__).resolve().parent.parent
name = "iproxy.exe"
path = base / "resources" / "iproxy" / name
print(str(path))
if path.is_file():
return str(path)
raise FileNotFoundError(f"iproxy 不存在: {path}")
# ------------ Windows 专用:列出所有 iproxy 命令行 ------------
def _get_all_iproxy_cmdlines(self) -> List[str]:
try:
raw = subprocess.check_output(
['wmic', 'process', 'where', "name='iproxy.exe'",
'get', 'CommandLine,ProcessId', '/value'],
stderr=subprocess.DEVNULL, text=True
)
except subprocess.CalledProcessError:
return []
lines: List[str] = []
for block in raw.split('\n\n'):
cmd = pid = ''
for line in block.splitlines():
line = line.strip()
if line.startswith('CommandLine='):
cmd = line[len('CommandLine='):].strip()
elif line.startswith('ProcessId='):
pid = line[len('ProcessId='):].strip()
if cmd and pid and '-u' in cmd:
lines.append(f'{cmd} {pid}')
return lines
# ------------ 杀孤儿 ------------
def _cleanup_orphan_iproxy(self):
live_udids = set(self._models.keys())
for ln in self._get_all_iproxy_cmdlines():
parts = ln.split()
try:
udid = parts[parts.index('-u') + 1]
pid = int(parts[-1])
if udid not in live_udids:
self._kill_pid_gracefully(pid)
LogManager.warning(f'扫到孤儿 iproxy已清理 {udid} PID={pid}')
except (ValueError, IndexError):
continue
# ------------ 按 PID 强杀 ------------
def _kill_pid_gracefully(self, pid: int):
try:
os.kill(pid, signal.SIGTERM)
time.sleep(1)
os.kill(pid, signal.SIGKILL)
except Exception:
pass