Files
iOSAI/Utils/AiUtils.py

1462 lines
57 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import html
import json
import os
import re
import signal
import socket
import subprocess
import sys
import time
import xml.etree.ElementTree as ET
from pathlib import Path
import cv2
import numpy as np
import unicodedata
import wda
from lxml import etree
from wda import Client
from Entity.Variables import wdaFunctionPort
from Utils.LogManager import LogManager
# 工具类
class AiUtils(object):
# 在屏幕中找到对应的图片
# @classmethod
# def findImageInScreen(cls, target, udid):
# try:
# # 加载原始图像和模板图像
# image_path = AiUtils.imagePathWithName(udid, "bgv") # 替换为你的图像路径
# template_path = AiUtils.imagePathWithName("", target) # 替换为你的模板路径
#
# # 读取图像和模板,确保它们都是单通道灰度图
# image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
# template = cv2.imread(template_path, cv2.IMREAD_GRAYSCALE)
#
# if image is None:
# LogManager.error("加载背景图失败")
# return -1, -1
#
# if template is None:
# LogManager.error("加载模板图失败")
# return -1, -1
#
# # 获取模板的宽度和高度
# w, h = template.shape[::-1]
#
# # 使用模板匹配方法
# res = cv2.matchTemplate(image, template, cv2.TM_CCOEFF_NORMED)
# threshold = 0.7 # 匹配度阈值,可以根据需要调整
# loc = np.where(res >= threshold)
#
# # 检查是否有匹配结果
# if loc[0].size > 0:
# # 取第一个匹配位置
# pt = zip(*loc[::-1]).__next__() # 获取第一个匹配点的坐标
# center_x = int(pt[0] + w // 2)
# center_y = int(pt[1] + h // 2)
# # print(f"第一个匹配到的小心心中心坐标: ({center_x}, {center_y})")
# return center_x, center_y
# else:
# return -1, -1
# except Exception as e:
# LogManager.error(f"加载素材失败:{e}", udid)
# print(e)
# return -1, -1
@classmethod
def flask_port_free(cls,port):
"""无需 psutil 的版本,通过系统命令查 PID"""
def can_bind(p):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.bind(("0.0.0.0", p))
s.close()
return True
except OSError:
s.close()
return False
if can_bind(port):
return
print(f"[ensure_port_free] Port {port} is occupied. Searching PID...")
pids = set()
if sys.platform.startswith("darwin") or sys.platform.startswith("linux"):
cmd = f"lsof -t -i:{port}"
proc = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
for line in proc.stdout.splitlines():
if line.strip().isdigit():
pids.add(int(line.strip()))
elif sys.platform.startswith("win"):
cmd = f"netstat -ano | findstr :{port}"
proc = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
for line in proc.stdout.splitlines():
parts = line.split()
if len(parts) >= 5 and parts[-1].isdigit():
pids.add(int(parts[-1]))
else:
raise RuntimeError("Unsupported platform for ensure_port_free")
for pid in pids:
try:
print(f"[ensure_port_free] Killing PID {pid}...")
os.kill(pid, signal.SIGKILL)
except Exception as e:
print(f"[ensure_port_free] Failed to kill PID {pid}: {e}")
time.sleep(0.3)
if not can_bind(port):
raise RuntimeError(f"[ensure_port_free] Port {port} still occupied after kill.")
@classmethod
def findImageInScreen(cls, target, udid):
try:
# 加载原始图像和模板图像
image_path = AiUtils.imagePathWithName(udid, "bgv")
template_path = AiUtils.imagePathWithName("", target)
# 读取图像和模板,确保它们都是单通道灰度图
image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
template = cv2.imread(template_path, cv2.IMREAD_GRAYSCALE)
if image is None:
LogManager.error("加载背景图失败", udid)
return -1, -1
if template is None:
LogManager.error("加载模板图失败", udid)
return -1, -1
# 获取模板的宽度和高度
w, h = template.shape[::-1]
# 模板匹配
res = cv2.matchTemplate(image, template, cv2.TM_CCOEFF_NORMED)
threshold = 0.85
loc = np.where(res >= threshold)
# 放在 cv2.matchTemplate 之前
cv2.imwrite(f'/tmp/runtime_bg_{udid}.png', image)
cv2.imwrite(f'/tmp/runtime_tpl_{udid}.png', template)
print(f'>>> 设备{udid} 模板{target} 最高相似度:', cv2.minMaxLoc(res)[1])
# 安全取出第一个匹配点
matches = list(zip(*loc[::-1]))
if not matches:
return -1, -1
pt = matches[0]
center_x = int(pt[0] + w // 2)
center_y = int(pt[1] + h // 2)
return center_x, center_y
except Exception as e:
LogManager.error(f"加载素材失败:{e}", udid)
return -1, -1
# 使用正则查找字符串中的数字
@classmethod
def findNumber(cls, str):
# 使用正则表达式匹配数字
match = re.search(r'\d+', str)
if match:
return int(match.group()) # 将匹配到的数字转换为整数
return None # 如果没有找到数字,返回 None
# 选择截图
@classmethod
def screenshot(cls, session):
image = session.screenshot()
image_path = "screenshot.png"
image.save(image_path)
image = cv2.imread(image_path)
# 如果图像过大,缩小显示
scale_percent = 50 # 缩小比例
width = int(image.shape[1] * scale_percent / 100)
height = int(image.shape[0] * scale_percent / 100)
dim = (width, height)
resized_image = cv2.resize(image, dim, interpolation=cv2.INTER_AREA)
# 创建一个窗口并显示缩小后的图像
cv2.namedWindow("Image")
cv2.imshow("Image", resized_image)
print("请在图像上选择爱心图标区域然后按Enter键确认。")
# 使用selectROI函数手动选择区域
roi = cv2.selectROI("Image", resized_image, showCrosshair=True, fromCenter=False)
# 将ROI坐标按原始图像尺寸放大
x, y, w, h = roi
x = int(x * image.shape[1] / resized_image.shape[1])
y = int(y * image.shape[0] / resized_image.shape[0])
w = int(w * image.shape[1] / resized_image.shape[1])
h = int(h * image.shape[0] / resized_image.shape[0])
# 根据选择的ROI提取爱心图标
if w > 0 and h > 0: # 确保选择的区域有宽度和高度
heart_icon = image[y:y + h, x:x + w]
# 转换为HSV颜色空间
hsv = cv2.cvtColor(heart_icon, cv2.COLOR_BGR2HSV)
# 定义红色的HSV范围
lower_red1 = np.array([0, 120, 70])
upper_red1 = np.array([10, 255, 255])
lower_red2 = np.array([170, 120, 70])
upper_red2 = np.array([180, 255, 255])
# 创建掩模
mask1 = cv2.inRange(hsv, lower_red1, upper_red1)
mask2 = cv2.inRange(hsv, lower_red2, upper_red2)
mask = mask1 + mask2
# 反转掩模,因为我们想要的是爱心图标,而不是背景
mask_inv = cv2.bitwise_not(mask)
# 应用掩模
heart_icon = cv2.bitwise_and(heart_icon, heart_icon, mask=mask_inv)
# 创建一个全透明的背景
height, width, channels = heart_icon.shape
roi = np.zeros((height, width, channels), dtype=np.uint8)
# 将爱心图标粘贴到透明背景上
for c in range(channels):
roi[:, :, c] = np.where(mask_inv == 255, heart_icon[:, :, c], roi[:, :, c])
# 图片名称
imgName = "temp.png"
# 保存结果
cv2.imwrite(imgName, roi)
# 显示结果
cv2.imshow("Heart Icon with Transparent Background", roi)
cv2.waitKey(0)
cv2.destroyAllWindows()
else:
print("未选择有效区域。")
# 根据名称获取图片完整地址
@classmethod
def imagePathWithName(cls, udid, name):
current_file_path = os.path.abspath(__file__)
# 获取当前文件所在的目录即script目录
current_dir = os.path.dirname(current_file_path)
# 由于script目录位于项目根目录下一级因此需要向上一级目录移动两次
project_root = os.path.abspath(os.path.join(current_dir, '..'))
# 构建资源文件的完整路径,向上两级目录,然后进入 resources 目录
resource_path = os.path.abspath(os.path.join(project_root, "resources", udid, name + ".png")).replace('/', '\\')
return resource_path
# 获取根目录
@classmethod
def getRootDir(cls):
current_file = os.path.abspath(__file__)
# 获取当前文件所在的目录
current_dir = os.path.dirname(current_file)
# 获取项目根目录(假设根目录是当前文件的父目录的父目录)
project_root = os.path.dirname(current_dir)
# 返回根目录
return project_root
# 创建一个以udid命名的目录
@classmethod
def makeUdidDir(cls, udid):
# 获取项目根目录
home = cls.getRootDir()
# 拼接 resources 目录的路径
resources_dir = os.path.join(home, "resources")
# 拼接 udid 目录的路径
udid_dir = os.path.join(resources_dir, udid)
# 检查 udid 目录是否存在,如果不存在则创建
if not os.path.exists(udid_dir):
try:
os.makedirs(udid_dir)
LogManager.info(f"目录 {udid_dir} 创建成功", udid)
print(f"目录 {udid_dir} 创建成功")
except Exception as e:
print(f"创建目录时出错: {e}")
LogManager.error(f"创建目录时出错: {e}", udid)
else:
LogManager.info(f"目录 {udid_dir} 已存在,跳过创建", udid)
print(f"目录 {udid_dir} 已存在,跳过创建")
# 查找首页按钮
# uuid 设备id
# click 是否点击该按钮
@classmethod
def findHomeButton(cls, session):
session.appium_settings({"snapshotMaxDepth": 10})
homeButton = session.xpath("//XCUIElementTypeButton[@name='a11y_vo_home' or @label='Home' or @label='首页']")
try:
if homeButton.exists:
print("找到首页了")
return homeButton
else:
print("没找到首页")
return None
except Exception as e:
print(e)
return None
# 查找关闭按钮
@classmethod
def findLiveCloseButton(cls, udid="eca000fcb6f55d7ed9b4c524055214c26a7de7aa"):
client = wda.USBClient(udid,wdaFunctionPort)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 10})
r = session.xpath("//XCUIElementTypeButton[@name='关闭屏幕']")
try:
if r.label == "关闭屏幕":
return r
else:
return None
except Exception as e:
print(e)
return None
# 获取直播间窗口数量
@classmethod
def count_add_by_xml(cls, session):
xml = session.source()
root = ET.fromstring(xml)
return sum(
1 for e in root.iter()
if e.get('type') in ('XCUIElementTypeButton', 'XCUIElementTypeImage')
and (e.get('name') == '添加' or e.get('label') == '添加')
and (e.get('visible') in (None, 'true'))
)
# 获取关注按钮
@classmethod
def getFollowButton(cls, session: Client):
# followButton = session.xpath("//Window[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[2]/Other[2]/Other[1]/Other[1]/Other[3]/Other[1]/Other[1]/Button[1]")
session.appium_settings({"snapshotMaxDepth": 20})
followButton = session.xpath('//XCUIElementTypeButton[@name="关注" or @label="关注"]')
if followButton.exists:
print("2.关注找到了")
LogManager.info("2.关注找到了")
return followButton
else:
print("2.关注没找到")
print("2.关注没找到")
return None
# 查找发消息按钮
@classmethod
def getSendMesageButton(cls, session: Client):
# msgButton = session.xpath(
# '//XCUIElementTypeButton['
# '(@name="发消息" or @label="发消息" or '
# '@name="发送 👋" or @label="发送 👋" or '
# '@name="消息" or @label="消息")'
# ' and @visible="true"]'
# )
msgButton = session.xpath(
'//XCUIElementTypeButton['
'(@name="发消息" or @label="发消息" or '
'@name="发送 👋" or @label="发送 👋" or '
'@name="消息" or @label="消息" or '
'@name="Message" or @label="Message")'
' and @visible="true"]'
)
if msgButton.exists:
print("3.发消息按钮找到了")
LogManager.info("3.发消息按钮找到了")
return msgButton
else:
print("3.发消息按钮没找到")
LogManager.info("3.发消息按钮没找到")
return None
# 获取当前屏幕上的节点
@classmethod
def getCurrentScreenSource(cls):
client = wda.USBClient("eca000fcb6f55d7ed9b4c524055214c26a7de7aa",wdaFunctionPort)
print(client.source())
# 查找app主页上的收件箱按钮
@classmethod
def getMsgBoxButton(cls, session: Client):
# box = session.xpath("//XCUIElementTypeButton[name='a11y_vo_inbox']")
box = session(xpath='//XCUIElementTypeButton[@name="a11y_vo_inbox"]')
if box.exists:
return box
else:
return None
# 获取收件箱中的未读消息数
@classmethod
def getUnReadMsgCount(cls, session: Client):
btn = cls.getMsgBoxButton(session)
print(f"btn:{btn}")
return cls.findNumber(btn.label)
@classmethod
def parse_float(cls, el, attr, default=0.0):
try:
return float(el.get(attr, default))
except Exception:
return default
# @classmethod
# def extract_messages_from_xml(cls, xml: str):
# """
# 解析 TikTok 聊天 XML返回当前屏幕可见的消息与时间分隔
# [{"type":"time","text":"..."}, {"type":"msg","dir":"in|out","text":"..."}]
# 兼容 Table / CollectionView / ScrollView过滤系统提示/底部工具栏;可见性使用“重叠可视+容差”。
# """
# if not isinstance(xml, str) or not xml.strip():
# return []
#
# try:
# root = etree.fromstring(xml.encode("utf-8"))
# except Exception:
# return []
#
# # ---------- 小工具 ----------
# def get_text(el):
# s = (el.get('label') or el.get('name') or el.get('value') or '') or ''
# return html.unescape(s.strip())
#
# def is_visible(el):
# v = el.get('visible')
# return (v is None) or (v.lower() == 'true')
#
# def get_ancestor_cell(el):
# p = el
# while p is not None and p.get('type') != 'XCUIElementTypeCell':
# p = p.getparent()
# return p
#
# # ---------- 屏幕尺寸 ----------
# app = root.xpath('/XCUIElementTypeApplication')
# screen_w = cls.parse_float(app[0], 'width', 414.0) if app else 414.0
# screen_h = cls.parse_float(app[0], 'height', 736.0) if app else 736.0
#
# # ---------- 主容器探测 ----------
# def pick_container():
# cands = []
# for xp, ctype in (
# ('//XCUIElementTypeTable', 'table'),
# ('//XCUIElementTypeCollectionView', 'collection'),
# ('//XCUIElementTypeScrollView', 'scroll'),
# ):
# nodes = [n for n in root.xpath(xp) if is_visible(n)]
# for n in nodes:
# y = cls.parse_float(n, 'y', 0.0)
# h = cls.parse_float(n, 'height', screen_h)
# cells = n.xpath('.//XCUIElementTypeCell')
# score = len(cells) * 10 - abs((y + h / 2) - screen_h / 2)
# cands.append((score, n, ctype))
# if cands:
# cands.sort(key=lambda t: t[0], reverse=True)
# return cands[0][1], cands[0][2]
# return None, None
#
# container, container_type = pick_container()
#
# # ---------- 可视区 ----------
# if container is not None:
# area_top = cls.parse_float(container, 'y', 0.0)
# area_h = cls.parse_float(container, 'height', screen_h)
# area_bot = area_top + area_h
# else:
# blocks = [n for n in root.xpath('//XCUIElementTypeOther[@y and @height and @width>="200"]') if
# is_visible(n)]
# area_top = 0.0
# if blocks:
# blocks.sort(key=lambda n: cls.parse_float(n, 'y', 0.0))
# b = blocks[0]
# area_top = cls.parse_float(b, 'y', 0.0) + cls.parse_float(b, 'height', 0.0)
# tvs = [n for n in root.xpath('//XCUIElementTypeTextView') if is_visible(n)]
# if tvs:
# tvs.sort(key=lambda n: cls.parse_float(n, 'y', 0.0))
# area_bot = cls.parse_float(tvs[-1], 'y', screen_h)
# else:
# area_bot = screen_h
# if area_bot - area_top < 100:
# area_top, area_bot = 0.0, screen_h
#
# def in_view(el) -> bool:
# if not is_visible(el):
# return False
# y = cls.parse_float(el, 'y', -1e9)
# h = cls.parse_float(el, 'height', 0.0)
# by = y + h
# tol = 8.0
# return not (by <= area_top + tol or y >= area_bot - tol)
#
# # ---------- 时间分隔 ----------
# items = []
# for t in root.xpath('//XCUIElementTypeStaticText[contains(@traits, "Header")]'):
# if not in_view(t):
# continue
# txt = get_text(t)
# if txt:
# items.append({'type': 'time', 'text': txt, 'y': cls.parse_float(t, 'y', 0.0)})
#
# # ---------- 系统提示/横幅过滤 ----------
# EXCLUDES_LITERAL = {
# 'Heart', 'Lol', 'ThumbsUp',
# '分享发布内容', '视频贴纸标签页', '双击发送表情', '贴纸',
# '关注',
# }
# SYSTEM_PATTERNS = [
# r"消息请求已被接受。你们可以开始聊天了。",
# r"(消息请求已被接受|你开始了和.*的聊天|你打开了这个与.*的聊天).*",
# r"开启(私信)?通知", r"开启通知",
# r"你打开了这个与 .* 的聊天。.*隐私",
# r"在此用户接受你的消息请求之前,你最多只能发送 ?\d+ 条消息。?",
# r"聊天消息条数已达上限,你将无法向该用户发送消息。?",
# r"未发送$",
# r"Turn on (DM|message|direct message)?\s*notifications",
# r"Enable notifications",
# r"Get notified when .* replies",
# r"You opened this chat .* privacy",
# r"Only \d+ message can be sent .* accepts .* request",
# r"此消息可能违反.*",
# r"无法发送",
# r"请告知我们"
# ]
# SYSTEM_RE = re.compile("|".join(SYSTEM_PATTERNS), re.IGNORECASE)
#
# # ---------- 资料卡片(个人信息)剔除 ----------
# PROFILE_RE = re.compile(
# r"@[\w\.\-]+|粉丝|followers?|following|关注账号",
# re.IGNORECASE
# )
#
# def is_profile_cell(cell) -> bool:
# if cell is None:
# return False
# if cell.xpath(
# './/XCUIElementTypeButton[@name="关注" or @label="关注" or '
# 'contains(translate(@name,"FOLW","folw"),"follow") or '
# 'contains(translate(@label,"FOLW","folw"),"follow")]'
# ):
# return True
# texts = []
# for t in cell.xpath('.//*[@name or @label or @value]'):
# s = get_text(t)
# if s:
# texts.append(s)
# if len(texts) > 40:
# break
# joined = " ".join(texts)
# if PROFILE_RE.search(joined):
# return True
# cy = cls.parse_float(cell, 'y', 0.0)
# ch = cls.parse_float(cell, 'height', 0.0)
# if cy < area_top + 140 and ch >= 150:
# return True
# return False
#
# def is_toolbar_like(o) -> bool:
# txt = get_text(o)
# if txt in EXCLUDES_LITERAL:
# return True
# y = cls.parse_float(o, 'y', 0.0)
# h = cls.parse_float(o, 'height', 0.0)
# near_bottom = (area_bot - (y + h)) < 48
# is_short = h <= 40
# return near_bottom and is_short
#
# # ---------- 收集消息候选 ----------
# msg_nodes = []
# if container is not None:
# cand = container.xpath(
# './/XCUIElementTypeCell//*[self::XCUIElementTypeOther or self::XCUIElementTypeStaticText or self::XCUIElementTypeTextView]'
# '[@y and (@name or @label or @value)]'
# )
# for o in cand:
# if not in_view(o):
# continue
# if is_toolbar_like(o):
# continue
# cell = get_ancestor_cell(o)
# if is_profile_cell(cell):
# continue
# txt = get_text(o)
# if not txt or SYSTEM_RE.search(txt) or txt in EXCLUDES_LITERAL:
# continue
# msg_nodes.append(o)
# else:
# cand = root.xpath(
# '//XCUIElementTypeOther[@y and (@name or @label or @value)]'
# ' | //XCUIElementTypeStaticText[@y and (@name or @label or @value)]'
# ' | //XCUIElementTypeTextView[@y and (@name or @label or @value)]'
# )
# for o in cand:
# p = o.getparent()
# if p is not None and p.get('type') == 'XCUIElementTypeCollectionView':
# continue
# if not in_view(o) or is_toolbar_like(o):
# continue
# cell = get_ancestor_cell(o)
# if is_profile_cell(cell):
# continue
# txt = get_text(o)
# if not txt or SYSTEM_RE.search(txt) or txt in EXCLUDES_LITERAL:
# continue
# msg_nodes.append(o)
#
# # ---------- 方向判定 & 组装(中心点法) ----------
# CENTER_MARGIN = max(12.0, screen_w * 0.02) # 中线容差
#
# for o in msg_nodes:
# txt = get_text(o)
# if not txt or txt in EXCLUDES_LITERAL or SYSTEM_RE.search(txt):
# continue
#
# x = cls.parse_float(o, 'x', 0.0)
# y = cls.parse_float(o, 'y', 0.0)
# w = cls.parse_float(o, 'width', 0.0)
#
# center_x = x + w / 2.0
# screen_center = screen_w / 2.0
#
# if center_x < screen_center - CENTER_MARGIN:
# direction = 'in' # 左侧:对方
# elif center_x > screen_center + CENTER_MARGIN:
# direction = 'out' # 右侧:自己
# else:
# # 处在中线附近,用右缘兜底
# right_edge = x + w
# direction = 'out' if right_edge >= screen_center else 'in'
#
# items.append({'type': 'msg', 'dir': direction, 'text': txt, 'y': y})
#
# # ---------- 排序 & 收尾 ----------
# if items:
# items.sort(key=lambda i: i.get('y', 0.0))
# for it in items:
# it.pop('y', None)
# return items
@staticmethod
def parse_float(el, key: str, default: float = 0.0) -> float:
"""稳健读取浮点属性"""
if el is None:
return default
v = el.get(key)
if v is None or v == "":
return default
try:
return float(v)
except Exception:
try:
# 某些抓取会出现 '20.0px' / '20,' 等
v2 = re.sub(r"[^\d\.\-]+", "", v)
return float(v2) if v2 else default
except Exception:
return default
@classmethod
def extract_messages_from_xml(cls, xml: str):
"""
解析 TikTok 聊天 XML返回当前屏幕可见的消息与时间分隔
[{"type":"time","text":"..."}, {"type":"msg","dir":"in|out","text":"..."}]
兼容 Table / CollectionView / ScrollView过滤系统提示/底部工具栏;
资料卡只过滤“资料区块”而非整 Cell可见性使用“重叠可视+容差”。
"""
if not isinstance(xml, str) or not xml.strip():
return []
try:
root = etree.fromstring(xml.encode("utf-8"))
except Exception:
return []
# ---------- 小工具 ----------
def get_text(el):
s = (el.get('label') or el.get('name') or el.get('value') or '') or ''
return html.unescape(s.strip())
def is_visible(el):
v = el.get('visible')
return (v is None) or (v.lower() == 'true')
def get_ancestor_cell(el):
p = el
while p is not None and p.get('type') != 'XCUIElementTypeCell':
p = p.getparent()
return p
def _bbox(el):
return (
cls.parse_float(el, 'x', 0.0),
cls.parse_float(el, 'y', 0.0),
cls.parse_float(el, 'width', 0.0),
cls.parse_float(el, 'height', 0.0),
)
# ---------- 屏幕尺寸 ----------
app = root.xpath('/XCUIElementTypeApplication')
screen_w = cls.parse_float(app[0], 'width', 414.0) if app else 414.0
screen_h = cls.parse_float(app[0], 'height', 736.0) if app else 736.0
# ---------- 主容器探测 ----------
def pick_container():
cands = []
for xp, ctype in (
('//XCUIElementTypeTable', 'table'),
('//XCUIElementTypeCollectionView', 'collection'),
('//XCUIElementTypeScrollView', 'scroll'),
):
nodes = [n for n in root.xpath(xp) if is_visible(n)]
for n in nodes:
y = cls.parse_float(n, 'y', 0.0)
h = cls.parse_float(n, 'height', screen_h)
cells = n.xpath('.//XCUIElementTypeCell')
score = len(cells) * 10 - abs((y + h / 2) - screen_h / 2)
cands.append((score, n, ctype))
if cands:
cands.sort(key=lambda t: t[0], reverse=True)
return cands[0][1], cands[0][2]
return None, None
container, container_type = pick_container()
# ---------- 可视区 ----------
if container is not None:
area_top = cls.parse_float(container, 'y', 0.0)
area_h = cls.parse_float(container, 'height', screen_h)
area_bot = area_top + area_h
else:
blocks = [n for n in root.xpath('//XCUIElementTypeOther[@y and @height and @width>="200"]') if
is_visible(n)]
area_top = 0.0
if blocks:
blocks.sort(key=lambda n: cls.parse_float(n, 'y', 0.0))
b = blocks[0]
area_top = cls.parse_float(b, 'y', 0.0) + cls.parse_float(b, 'height', 0.0)
tvs = [n for n in root.xpath('//XCUIElementTypeTextView') if is_visible(n)]
if tvs:
tvs.sort(key=lambda n: cls.parse_float(n, 'y', 0.0))
area_bot = cls.parse_float(tvs[-1], 'y', screen_h)
else:
area_bot = screen_h
if area_bot - area_top < 100:
area_top, area_bot = 0.0, screen_h
def in_view(el) -> bool:
if not is_visible(el):
return False
y = cls.parse_float(el, 'y', -1e9)
h = cls.parse_float(el, 'height', 0.0)
by = y + h
tol = 8.0
return not (by <= area_top + tol or y >= area_bot - tol)
# ---------- 时间分隔 ----------
items = []
for t in root.xpath('//XCUIElementTypeStaticText[contains(@traits, "Header")]'):
if not in_view(t):
continue
txt = get_text(t)
if txt:
items.append({'type': 'time', 'text': txt, 'y': cls.parse_float(t, 'y', 0.0)})
# ---------- 系统提示/横幅过滤 ----------
EXCLUDES_LITERAL = {
'Heart', 'Lol', 'ThumbsUp',
'分享发布内容', '视频贴纸标签页', '双击发送表情', '贴纸',
'关注', # 注意:仅用于按钮/工具条等短元素,后续还会叠加区域过滤,避免误杀消息
}
SYSTEM_PATTERNS = [
r"消息请求已被接受。你们可以开始聊天了。",
r"(消息请求已被接受|你开始了和.*的聊天|你打开了这个与.*的聊天).*",
r"开启(私信)?通知", r"开启通知",
r"你打开了这个与 .* 的聊天。.*隐私",
r"在此用户接受你的消息请求之前,你最多只能发送 ?\d+ 条消息。?",
r"聊天消息条数已达上限,你将无法向该用户发送消息。?",
r"未发送$",
r"Turn on (DM|message|direct message)?\s*notifications",
r"Enable notifications",
r"Get notified when .* replies",
r"You opened this chat .* privacy",
r"Only \d+ message can be sent .* accepts .* request",
r"此消息可能违反.*",
r"无法发送",
r"请告知我们",
r"已看过"
]
SYSTEM_RE = re.compile("|".join(SYSTEM_PATTERNS), re.IGNORECASE)
# ---------- 资料卡片(个人信息)剔除:仅过滤“资料区块” ----------
PROFILE_RE = re.compile(
r"@[\w\.\-]+|粉丝|followers?|following|关注账号",
re.IGNORECASE
)
def is_profile_cell(cell) -> bool:
"""更严格:至少同时命中 >=2 个信号才认定为资料卡片 Cell。"""
if cell is None:
return False
has_follow_btn = bool(cell.xpath(
'.//XCUIElementTypeButton['
'@name="关注" or @label="关注" or '
'contains(translate(@name,"FOLW","folw"),"follow") or '
'contains(translate(@label,"FOLW","folw"),"follow")]'
))
has_view_profile = bool(cell.xpath(
'.//XCUIElementTypeButton['
'@name="查看主页" or @label="查看主页" or '
'contains(translate(@name,"VIEW PROFILE","view profile"),"view profile") or '
'contains(translate(@label,"VIEW PROFILE","view profile"),"view profile")]'
))
has_live_ended = bool(cell.xpath(
'.//XCUIElementTypeStaticText['
'@name="直播已结束" or @label="直播已结束" or '
'contains(translate(@name,"LIVE ENDED","live ended"),"live ended") or '
'contains(translate(@label,"LIVE ENDED","live ended"),"live ended")]'
))
cy = cls.parse_float(cell, 'y', 0.0)
ch = cls.parse_float(cell, 'height', 0.0)
looks_large_card = ch >= 180 # 大卡片外观
# 再做一次文本特征检查(防止仅一个“关注”误杀)
texts = []
for t in cell.xpath('.//*[@name or @label or @value]'):
s = get_text(t)
if s:
texts.append(s)
if len(texts) > 40:
break
joined = " ".join(texts)
has_profile_terms = bool(PROFILE_RE.search(joined))
# 命中信号计数至少2个
signals = sum([has_follow_btn, has_view_profile, has_live_ended, looks_large_card, has_profile_terms])
return signals >= 2
def profile_region_y_range(cell):
"""
在资料卡 Cell 内,估算“资料区块”的 y 范围min_y, max_y
用关键元素(关注按钮 / 查看主页 / 直播已结束 / 短用户名)来圈定范围。
"""
if cell is None:
return None
key_nodes = []
key_nodes += cell.xpath('.//XCUIElementTypeButton[@name="关注" or @label="关注"]')
key_nodes += cell.xpath('.//XCUIElementTypeButton[@name="查看主页" or @label="查看主页"]')
key_nodes += cell.xpath('.//XCUIElementTypeStaticText[@name="直播已结束" or @label="直播已结束"]')
# 用户名/昵称:长度较短更像资料区标签
for t in cell.xpath('.//XCUIElementTypeStaticText[@name or @label]'):
s = (t.get('label') or t.get('name') or '') or ''
st = s.strip()
if st and len(st) <= 30:
key_nodes.append(t)
ys = []
for n in key_nodes:
_, y, _, h = _bbox(n)
ys += [y, y + h]
if not ys:
return None # 没有关键元素则不定义资料区
min_y, max_y = min(ys), max(ys)
pad = 12.0
return (min_y - pad, max_y + pad)
def belongs_to_profile_region(node, cell) -> bool:
"""判断候选 node 是否落在资料区块的 y 范围内"""
rng = profile_region_y_range(cell)
if not rng:
return False
_, y, _, h = _bbox(node)
ny1, ny2 = y, y + h
ry1, ry2 = rng
return not (ny2 < ry1 or ny1 > ry2) # 任意重叠即算属于资料区
def is_toolbar_like(o) -> bool:
# 在 Cell 里的元素绝不是底部工具条避免误杀“hgh”这类贴着底部的最后一条消息
if get_ancestor_cell(o) is not None:
return False
txt = get_text(o)
if txt in EXCLUDES_LITERAL:
return True
y = cls.parse_float(o, 'y', 0.0)
h = cls.parse_float(o, 'height', 0.0)
near_bottom = (area_bot - (y + h)) < 48
is_short = h <= 40
return near_bottom and is_short
# ---------- 收集消息候选 ----------
msg_nodes = []
if container is not None:
cand = container.xpath(
'.//XCUIElementTypeCell//*[self::XCUIElementTypeOther or self::XCUIElementTypeStaticText or self::XCUIElementTypeTextView]'
'[@y and (@name or @label or @value)]'
)
for o in cand:
if not in_view(o):
continue
if is_toolbar_like(o):
continue
cell = get_ancestor_cell(o)
# 仅在“资料卡 Cell 且节点位于资料区块范围内”时过滤
if is_profile_cell(cell) and belongs_to_profile_region(o, cell):
continue
txt = get_text(o)
if not txt or SYSTEM_RE.search(txt) or txt in EXCLUDES_LITERAL:
continue
msg_nodes.append(o)
else:
cand = root.xpath(
'//XCUIElementTypeOther[@y and (@name or @label or @value)]'
' | //XCUIElementTypeStaticText[@y and (@name or @label or @value)]'
' | //XCUIElementTypeTextView[@y and (@name or @label or @value)]'
)
for o in cand:
p = o.getparent()
if p is not None and p.get('type') == 'XCUIElementTypeCollectionView':
continue
if not in_view(o) or is_toolbar_like(o):
continue
cell = get_ancestor_cell(o)
if is_profile_cell(cell) and belongs_to_profile_region(o, cell):
continue
txt = get_text(o)
if not txt or SYSTEM_RE.search(txt) or txt in EXCLUDES_LITERAL:
continue
msg_nodes.append(o)
# ---------- 方向判定 & 组装(中心点法) ----------
CENTER_MARGIN = max(12.0, screen_w * 0.02) # 中线容差
for o in msg_nodes:
txt = get_text(o)
if not txt or txt in EXCLUDES_LITERAL or SYSTEM_RE.search(txt):
continue
x = cls.parse_float(o, 'x', 0.0)
y = cls.parse_float(o, 'y', 0.0)
w = cls.parse_float(o, 'width', 0.0)
center_x = x + w / 2.0
screen_center = screen_w / 2.0
if center_x < screen_center - CENTER_MARGIN:
direction = 'in' # 左侧:对方
elif center_x > screen_center + CENTER_MARGIN:
direction = 'out' # 右侧:自己
else:
# 处在中线附近,用右缘兜底
right_edge = x + w
direction = 'out' if right_edge >= screen_center else 'in'
items.append({'type': 'msg', 'dir': direction, 'text': txt, 'y': y})
# ---------- 排序 & 收尾 ----------
if items:
items.sort(key=lambda i: i.get('y', 0.0))
for it in items:
it.pop('y', None)
return items
@classmethod
def get_navbar_anchor_name(cls, session, timeout: float = 5) -> str:
"""从聊天页导航栏读取主播名称;找不到返回空字符串。"""
# 限制快照深度(可选)
try:
session.appium_settings({"snapshotMaxDepth": 22})
except Exception:
pass
def _text_of(el) -> str:
info = getattr(el, "info", {}) or {}
return (info.get("label") or info.get("name") or info.get("value") or "").strip()
def _clean_tail(s: str) -> str:
# 去掉末尾中英标点和空白TikTok 昵称右侧常带一个顿号/逗号,比如 “Alina
return re.sub(r"[,、,。.\s]+$", "", s).strip()
# ---- 关键修复:导航容器 ----
# 1) “返回”可能是 Button 也可能是 Other同时页面右上角有 “更多/举报” 按钮
BACK_ELEM = (
"//*[@type='XCUIElementTypeButton' or @type='XCUIElementTypeOther']"
"[@name='返回' or @label='返回' or @name='Back' or @label='Back' "
" or @name='戻る' or @label='戻る']"
)
RIGHT_MENU_BTN = (
"//XCUIElementTypeButton"
"[@name='更多' or @label='更多' or @name='More' or @label='More' or "
" @name='その他' or @label='その他' or @name='詳細' or @label='詳細' or "
" @name='举报' or @label='举报' or @name='Report' or @label='Report' or "
" @name='報告' or @label='報告']"
)
# 从“返回”向上找到最近祖先 Other且该祖先内包含“更多/举报”按钮
NAV_CONTAINER = (
BACK_ELEM +
"/ancestor::XCUIElementTypeOther[ .//XCUIElementTypeOther or .//XCUIElementTypeButton ][ ."
+ RIGHT_MENU_BTN +
"][1]"
)
# ① 优先:在导航容器里找“可访问的 Other有文本且不包含子 Button
XPATH_TITLE_OTHER = (
NAV_CONTAINER +
"//XCUIElementTypeOther[@accessible='true' and count(.//XCUIElementTypeButton)=0 "
" and (string-length(@name)>0 or string-length(@label)>0 or string-length(@value)>0)][1]"
)
# ② 退路:导航容器内第一个有文本的 StaticText
XPATH_TITLE_STATIC = (
NAV_CONTAINER +
"//XCUIElementTypeStaticText[string-length(@value)>0 or string-length(@label)>0 or string-length(@name)>0][1]"
)
# ③ 兜底:直接在“返回”与右侧菜单同一层级/附近范围内找可读的 Other适配部分机型结构差异
XPATH_FALLBACK_NEAR_BACK = (
BACK_ELEM +
"/ancestor::XCUIElementTypeOther[1]"
"//XCUIElementTypeOther[@accessible='true' and "
" (string-length(@name)>0 or string-length(@label)>0 or string-length(@value)>0)]"
"[count(.//XCUIElementTypeButton)=0][1]"
)
# ④ 兜底:昵称往往以顿号/逗号结尾(例如 “Alina利用这个规律匹配
XPATH_HINT_COMMA_END = (
NAV_CONTAINER +
"//*[ (contains(@name,'') or contains(@label,'') or contains(@value,'')) "
" and string-length(@name)+string-length(@label)+string-length(@value) < 64 ]"
"[1]"
)
# ---- 查询顺序:① -> ② -> ③ -> ④ ----
for xp, wait_s in [
(XPATH_TITLE_OTHER, timeout),
(XPATH_TITLE_STATIC, 1.0),
(XPATH_FALLBACK_NEAR_BACK, 1.0),
(XPATH_HINT_COMMA_END, 1.0),
]:
try:
q = session.xpath(xp)
if q.wait(wait_s):
txt = _clean_tail(_text_of(q.get()))
if txt:
return txt
except Exception:
pass
return ""
# 检查字符串中是否包含中文
@classmethod
def contains_chinese(cls, text):
"""
判断字符串中是否包含中文字符。
参数:
text (str): 要检测的字符串。
返回:
bool: 如果字符串中包含中文,返回 True否则返回 False。
"""
# 使用正则表达式匹配中文字符
pattern = re.compile(r'[\u4e00-\u9fff]')
return bool(pattern.search(text))
@classmethod
def is_language(cls, text: str) -> bool:
if not text:
return False
for ch in text:
if unicodedata.category(ch).startswith("L"):
return True
return False
@classmethod
def _read_json_list(cls, file_path: Path) -> list:
"""读取为 list读取失败或不是 list 则返回空数组"""
if not file_path.exists():
return []
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
return data if isinstance(data, list) else []
except Exception as e:
LogManager.error(f"[acList] 读取失败,将按空数组处理: {e}")
return []
@classmethod
def _write_json_list(cls, file_path: Path, data: list) -> None:
file_path.parent.mkdir(parents=True, exist_ok=True)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
@staticmethod
def _normalize_anchor_items(items):
"""
规范化为 [{...}]
- 允许传入:单个对象、对象数组、字符串(当 anchorId 用)
- 保留原有字段,不限制只存 anchorId/country
- 字符串输入 → {"anchorId": xxx}
"""
result = []
if items is None:
return result
if isinstance(items, dict):
aid = items.get("anchorId")
if aid:
obj = dict(items) # 保留所有字段
result.append(obj)
return result
if isinstance(items, list):
for it in items:
if isinstance(it, dict):
aid = it.get("anchorId")
if aid:
obj = dict(it)
result.append(obj)
elif isinstance(it, str):
result.append({"anchorId": it})
return result
if isinstance(items, str):
result.append({"anchorId": items})
return result
# -------- 追加(对象数组平铺追加) --------
@classmethod
def save_aclist_flat_append(cls, acList, filename="data/acList.json"):
"""
将 anchor 对象数组平铺追加到 JSON 文件(数组)中。
文件固定写到 项目根目录/data/acList.json
"""
# 找到当前文件所在目录,回退到项目根目录
root_dir = Path(__file__).resolve().parent.parent # 根据实际层级调整
log_dir = root_dir
log_dir.mkdir(parents=True, exist_ok=True) # 确保 log 目录存在
file_path = log_dir / filename
data = cls._read_json_list(file_path)
# 规范化输入,确保都是 {anchorId, country}
to_add = cls._normalize_anchor_items(acList)
if not to_add:
LogManager.info("[acList] 传入为空或不合规,跳过写入")
return
data.extend(to_add)
cls._write_json_list(file_path, data)
# LogManager.method_info(f"写入的路径是:{file_path}", "写入数据")
LogManager.info(f"[acList] 已追加 {len(to_add)} 条,当前总数={len(data)} -> {file_path}")
@classmethod
def pop_aclist_first(cls, filename="data/acList.json", mode="pop"):
"""
从 JSON 数组/对象(anchorList)中取出第一个 anchor 对象。
- mode="pop" : 取出并删除
- mode="move" : 取出并放到列表尾部
返回形如:{"anchorId": "...", "country": "...", ...}
"""
file_path = cls._resolve_path(filename)
if not file_path.exists():
return None
try:
raw = json.loads(file_path.read_text(encoding="utf-8-sig"))
except Exception as e:
LogManager.error(f"[pop_aclist_first] 读取失败: {e}")
return None
# 支持两种格式list 或 dict{anchorList:[...]}
if isinstance(raw, list):
data, wrapper = raw, None
elif isinstance(raw, dict) and isinstance(raw.get("anchorList"), list):
data, wrapper = raw["anchorList"], raw
else:
return None
if not data:
return None
# 取第一个
first = data.pop(0)
norm = cls._normalize_anchor_items(first)
first = norm[0] if norm else None
if first and mode == "move":
# 放到尾部
data.append(first)
# 写回
to_write = wrapper if wrapper is not None else data
file_path.write_text(
json.dumps(to_write, ensure_ascii=False, indent=2),
encoding="utf-8"
)
return first
@classmethod
def bulk_update_anchors(cls, updates, filename="data/acList.json", case_insensitive=False):
"""
批量修改(文件根必须是数组,沿用 _read_json_list 的约定)
- updates:
dict: {"id1": {...}, "id2": {...}}
list[dict]: [{"anchorId":"id1", ...}, {"anchorId":"id2", ...}]
- case_insensitive: True 时用小写比较 anchorId
返回: {"updated": <int>, "missing": [ids...], "file": "<实际命中的路径>"}
"""
def norm_id(x: str) -> str:
s = str(x).strip()
return s.lower() if case_insensitive else s
# ✅ 关键:使用你已有的 _resolve_path避免受 cwd 影响
file_path = cls._resolve_path(filename)
data = cls._read_json_list(file_path)
if not data:
return {"updated": 0, "missing": cls._collect_all_ids(updates), "file": str(file_path)}
# 1) 归一化 updates -> map[normalized_id] = patch
upd_map = {}
raw_ids = [] # 保留原始传入 id用于返回 missing 时回显
if isinstance(updates, dict):
for aid, patch in updates.items():
if aid and isinstance(patch, dict):
key = norm_id(aid)
raw_ids.append(str(aid))
patch = {k: v for k, v in patch.items() if k != "anchorId"}
if patch:
upd_map[key] = {**upd_map.get(key, {}), **patch}
elif isinstance(updates, list):
for it in updates:
if isinstance(it, dict) and it.get("anchorId"):
rid = str(it["anchorId"])
key = norm_id(rid)
raw_ids.append(rid)
patch = {k: v for k, v in it.items() if k != "anchorId"}
if patch:
upd_map[key] = {**upd_map.get(key, {}), **patch}
if not upd_map:
return {"updated": 0, "missing": [], "file": str(file_path)}
# 2) 建索引map[normalized_id] -> item
index = {}
for item in data:
if isinstance(item, dict) and "anchorId" in item:
key = norm_id(item.get("anchorId", ""))
if key:
index[key] = item
# 3) 执行更新
updated, seen = 0, set()
for key, patch in upd_map.items():
target = index.get(key)
if target is not None:
target.update(patch)
updated += 1
seen.add(key)
# 4) 写回
if updated > 0:
cls._write_json_list(file_path, data)
# 5) 计算未命中(按传入原始 ID 回显)
missing = []
for rid in raw_ids:
if norm_id(rid) not in seen:
missing.append(rid)
return {"updated": updated, "missing": missing, "file": str(file_path)}
@staticmethod
def _collect_all_ids(updates):
ids = []
if isinstance(updates, dict):
ids = [str(k) for k in updates.keys()]
elif isinstance(updates, list):
for it in updates:
if isinstance(it, dict) and it.get("anchorId"):
ids.append(str(it["anchorId"]))
return ids
@classmethod
def delete_anchors_by_ids(cls, ids: list[str], filename: str = "acList.json") -> int:
"""
根据 anchorId 列表,从项目根目录/data/acList.json 中删除匹配的 anchor。
- ids: 要删除的 anchorId 列表
- filename: 默认为 acList.json可以传文件名或绝对路径
返回:删除数量
"""
# 计算文件路径
root_dir = Path(__file__).resolve().parent.parent
if Path(filename).is_absolute():
file_path = Path(filename)
else:
file_path = root_dir / "data" / filename
if not file_path.exists():
return 0
# 读取 JSON 文件
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, list):
return 0
except Exception as e:
LogManager.error(f"[delete_anchors_by_ids] 读取失败: {e}")
return 0
before = len(data)
# 保留不在 ids 里的对象
data = [d for d in data if isinstance(d, dict) and d.get("anchorId") not in ids]
deleted = before - len(data)
# 写回 JSON 文件
try:
file_path.parent.mkdir(parents=True, exist_ok=True)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
LogManager.error(f"[delete_anchors_by_ids] 写入失败: {e}")
return deleted
# -------- 查看第一个(取出但不删除) --------
@staticmethod
def _resolve_path(p) -> Path:
p = Path(p)
if p.is_absolute():
return p
# 以项目根目录 (iOSAI) 为基准 —— script 的上一级
base = Path(__file__).resolve().parents[1]
return (base / p).resolve()
@classmethod
def peek_aclist_first(cls, filename="data/acList.json"):
file_path = cls._resolve_path(filename)
if not file_path.exists():
print(f"[peek] 文件不存在: {file_path}")
return None
try:
raw = file_path.read_text(encoding="utf-8-sig").strip()
if not raw:
return None
data = json.loads(raw)
arr = data if isinstance(data, list) else data.get("anchorList") if isinstance(data, dict) else None
if not arr:
return None
first = arr[0]
norm = cls._normalize_anchor_items(first)
return norm[0] if norm else None
except Exception as e:
print(f"[peek] 读取失败: {e}")
return None
@staticmethod
def run_tidevice_command(udid, action, bundle_id, timeout=30):
"""
执行tidevice命令的辅助函数
:param udid: 设备UDID
:param action: 动作类型 ('kill''launch')
:param bundle_id: 应用的Bundle ID
:param timeout: 命令执行超时时间(秒)
:return: (bool) 成功返回True失败返回False
"""
# 构建命令列表
cmd = ["tidevice", "--udid", udid, action, bundle_id]
try:
# 执行命令并捕获输出
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout
)
# 检查命令是否成功执行返回码为0通常表示成功
if result.returncode == 0:
LogManager.info(f"Successfully {action}ed {bundle_id} on device {udid}.")
return True
else:
# 记录错误信息
LogManager.error(f"Failed to {action} {bundle_id} on device {udid}. Error: {result.stderr}")
return False
except subprocess.TimeoutExpired:
# 处理命令执行超时
LogManager.error(f"Command 'tidevice {action}' timed out after {timeout} seconds for device {udid}.")
return False
except FileNotFoundError:
# 处理tidevice命令未找到的情况通常意味着tidevice未安装或不在PATH中
LogManager.error(
"The 'tidevice' command was not found. Please ensure it is installed and in your system PATH.")
return False
except Exception as e:
# 捕获其他可能异常
LogManager.error(f"An unexpected error occurred while trying to {action} the app: {e}")
return False
@classmethod
def kill_wda(cls, udid, bundle_id="com.yolozsAgent.wda.xctrunner"):
"""
杀死指定设备上的WDA应用
:param udid: 设备UDID
:param bundle_id: WDA的Bundle ID默认为 com.yolozsAgent.wda.xctrunner
:return: (bool) 成功返回True失败返回False
"""
return cls.run_tidevice_command(udid, "kill", bundle_id)
@classmethod
def launch_wda(cls, udid, bundle_id="com.yolozsAgent.wda.xctrunner", timeout=60):
"""
启动指定设备上的WDA应用
:param udid: 设备UDID
:param bundle_id: WDA的Bundle ID默认为 com.yolozsAgent.wda.xctrunner
:param timeout: 启动命令超时时间默认为60秒启动可能较慢
:return: (bool) 成功返回True失败返回False
"""
return cls.run_tidevice_command(udid, "launch", bundle_id, timeout)
@classmethod
def _screen_info(cls, udid: str):
try:
# 避免 c.home() 可能触发的阻塞,直接取 window_size
c = wda.USBClient(udid, wdaFunctionPort)
size = c.window_size()
print(f"[Screen] 成功获取屏幕 {int(size.width)}x{int(size.height)} {udid}")
return int(size.width), int(size.height), float(c.scale)
except Exception as e:
print(f"[Screen] 获取屏幕信息异常: {e} {udid}")
return 0, 0, 0.0