import subprocess import threading import time import wda from tidevice import Usbmux from Entity.DeviceModel import DeviceModel from Entity.Variables import tikTokApp, WdaAppBundleId from Module.FlaskSubprocessManager import FlaskSubprocessManager from Utils.LogManager import LogManager threadLock = threading.Lock() class Deviceinfo(object): def __init__(self): self.deviceIndex = 0 # 投屏端口 self.screenProxy = 9110 # 存放pid的数组 self.pidList = [] # 设备列表 self.deviceArray = [] # 获取到县城管理类 self.manager = FlaskSubprocessManager.get_instance() # 给前端的设备模型数组 self.deviceModelList = [] # 监听设备连接 def startDeviceListener(self): while True: lists = Usbmux().device_list() # 添加设备逻辑 for device in lists: if device not in self.deviceArray: self.screenProxy += 1 self.connectDevice(device.udid) self.deviceArray.append(device) # 创建模型 model = DeviceModel(device.udid,self.screenProxy,type=1) self.deviceModelList.append(model) # 发送数据 self.manager.send(model.toDict()) # 处理拔出设备的逻辑 def removeDevice(): set1 = set(self.deviceArray) set2 = set(lists) difference = set1 - set2 differenceList = list(difference) for i in differenceList: for j in self.deviceArray: # 判断是否为差异设备 if i.udid == j.udid: # 从设备模型中删除数据 for a in self.deviceModelList: if i.udid == a.deviceId: a.type = 2 # 发送数据 self.manager.send(a.toDict()) self.deviceModelList.remove(a) for k in self.pidList: # 干掉端口短发进程 if j.udid == k["id"]: target = k["target"] target.kill() self.pidList.remove(k) # 删除已经拔出的设备 self.deviceArray.remove(j) removeDevice() time.sleep(1) # 连接设备 def connectDevice(self, identifier): try: d = wda.USBClient(identifier, 8100) LogManager.info("启动wda成功", identifier) except Exception as e: LogManager.error("启动wda失败。请检查wda是否正常", identifier) return d.app_start(WdaAppBundleId) d.home() time.sleep(2) target = self.relayDeviceScreenPort() self.pidList.append({ "target": target, "id": identifier }) # 转发设备端口 def relayDeviceScreenPort(self): try: command = f"iproxy.exe {self.screenProxy} 9100" # 创建一个没有窗口的进程 startupinfo = subprocess.STARTUPINFO() startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW startupinfo.wShowWindow = 0 r = subprocess.Popen(command, shell=True, startupinfo=startupinfo) return r except Exception as e: print(e) return 0