1462 lines
57 KiB
Python
1462 lines
57 KiB
Python
import html
|
||
import json
|
||
import os
|
||
import re
|
||
import signal
|
||
import socket
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
import xml.etree.ElementTree as ET
|
||
from pathlib import Path
|
||
import cv2
|
||
import numpy as np
|
||
import unicodedata
|
||
import wda
|
||
from lxml import etree
|
||
from wda import Client
|
||
|
||
from Entity.Variables import wdaFunctionPort
|
||
from Utils.LogManager import LogManager
|
||
|
||
|
||
# 工具类
|
||
class AiUtils(object):
|
||
|
||
# 在屏幕中找到对应的图片
|
||
# @classmethod
|
||
# def findImageInScreen(cls, target, udid):
|
||
# try:
|
||
# # 加载原始图像和模板图像
|
||
# image_path = AiUtils.imagePathWithName(udid, "bgv") # 替换为你的图像路径
|
||
# template_path = AiUtils.imagePathWithName("", target) # 替换为你的模板路径
|
||
#
|
||
# # 读取图像和模板,确保它们都是单通道灰度图
|
||
# image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
|
||
# template = cv2.imread(template_path, cv2.IMREAD_GRAYSCALE)
|
||
#
|
||
# if image is None:
|
||
# LogManager.error("加载背景图失败")
|
||
# return -1, -1
|
||
#
|
||
# if template is None:
|
||
# LogManager.error("加载模板图失败")
|
||
# return -1, -1
|
||
#
|
||
# # 获取模板的宽度和高度
|
||
# w, h = template.shape[::-1]
|
||
#
|
||
# # 使用模板匹配方法
|
||
# res = cv2.matchTemplate(image, template, cv2.TM_CCOEFF_NORMED)
|
||
# threshold = 0.7 # 匹配度阈值,可以根据需要调整
|
||
# loc = np.where(res >= threshold)
|
||
#
|
||
# # 检查是否有匹配结果
|
||
# if loc[0].size > 0:
|
||
# # 取第一个匹配位置
|
||
# pt = zip(*loc[::-1]).__next__() # 获取第一个匹配点的坐标
|
||
# center_x = int(pt[0] + w // 2)
|
||
# center_y = int(pt[1] + h // 2)
|
||
# # print(f"第一个匹配到的小心心中心坐标: ({center_x}, {center_y})")
|
||
# return center_x, center_y
|
||
# else:
|
||
# return -1, -1
|
||
# except Exception as e:
|
||
# LogManager.error(f"加载素材失败:{e}", udid)
|
||
# print(e)
|
||
# return -1, -1
|
||
|
||
@classmethod
|
||
def flask_port_free(cls,port):
|
||
"""无需 psutil 的版本,通过系统命令查 PID"""
|
||
|
||
def can_bind(p):
|
||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
try:
|
||
s.bind(("0.0.0.0", p))
|
||
s.close()
|
||
return True
|
||
except OSError:
|
||
s.close()
|
||
return False
|
||
|
||
if can_bind(port):
|
||
return
|
||
|
||
print(f"[ensure_port_free] Port {port} is occupied. Searching PID...")
|
||
|
||
pids = set()
|
||
|
||
if sys.platform.startswith("darwin") or sys.platform.startswith("linux"):
|
||
cmd = f"lsof -t -i:{port}"
|
||
proc = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
||
for line in proc.stdout.splitlines():
|
||
if line.strip().isdigit():
|
||
pids.add(int(line.strip()))
|
||
|
||
elif sys.platform.startswith("win"):
|
||
cmd = f"netstat -ano | findstr :{port}"
|
||
proc = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
||
for line in proc.stdout.splitlines():
|
||
parts = line.split()
|
||
if len(parts) >= 5 and parts[-1].isdigit():
|
||
pids.add(int(parts[-1]))
|
||
|
||
else:
|
||
raise RuntimeError("Unsupported platform for ensure_port_free")
|
||
|
||
for pid in pids:
|
||
try:
|
||
print(f"[ensure_port_free] Killing PID {pid}...")
|
||
os.kill(pid, signal.SIGKILL)
|
||
except Exception as e:
|
||
print(f"[ensure_port_free] Failed to kill PID {pid}: {e}")
|
||
|
||
time.sleep(0.3)
|
||
if not can_bind(port):
|
||
raise RuntimeError(f"[ensure_port_free] Port {port} still occupied after kill.")
|
||
|
||
@classmethod
|
||
def findImageInScreen(cls, target, udid):
|
||
try:
|
||
# 加载原始图像和模板图像
|
||
image_path = AiUtils.imagePathWithName(udid, "bgv")
|
||
template_path = AiUtils.imagePathWithName("", target)
|
||
|
||
# 读取图像和模板,确保它们都是单通道灰度图
|
||
image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
|
||
template = cv2.imread(template_path, cv2.IMREAD_GRAYSCALE)
|
||
|
||
if image is None:
|
||
LogManager.error("加载背景图失败", udid)
|
||
return -1, -1
|
||
if template is None:
|
||
|
||
LogManager.error("加载模板图失败", udid)
|
||
return -1, -1
|
||
|
||
# 获取模板的宽度和高度
|
||
w, h = template.shape[::-1]
|
||
|
||
# 模板匹配
|
||
res = cv2.matchTemplate(image, template, cv2.TM_CCOEFF_NORMED)
|
||
threshold = 0.85
|
||
loc = np.where(res >= threshold)
|
||
# 放在 cv2.matchTemplate 之前
|
||
cv2.imwrite(f'/tmp/runtime_bg_{udid}.png', image)
|
||
cv2.imwrite(f'/tmp/runtime_tpl_{udid}.png', template)
|
||
print(f'>>> 设备{udid} 模板{target} 最高相似度:', cv2.minMaxLoc(res)[1])
|
||
# 安全取出第一个匹配点
|
||
matches = list(zip(*loc[::-1]))
|
||
if not matches:
|
||
return -1, -1
|
||
|
||
pt = matches[0]
|
||
center_x = int(pt[0] + w // 2)
|
||
center_y = int(pt[1] + h // 2)
|
||
return center_x, center_y
|
||
|
||
except Exception as e:
|
||
LogManager.error(f"加载素材失败:{e}", udid)
|
||
return -1, -1
|
||
|
||
# 使用正则查找字符串中的数字
|
||
@classmethod
|
||
def findNumber(cls, str):
|
||
# 使用正则表达式匹配数字
|
||
match = re.search(r'\d+', str)
|
||
if match:
|
||
return int(match.group()) # 将匹配到的数字转换为整数
|
||
return None # 如果没有找到数字,返回 None
|
||
|
||
# 选择截图
|
||
@classmethod
|
||
def screenshot(cls, session):
|
||
image = session.screenshot()
|
||
image_path = "screenshot.png"
|
||
image.save(image_path)
|
||
image = cv2.imread(image_path)
|
||
|
||
# 如果图像过大,缩小显示
|
||
scale_percent = 50 # 缩小比例
|
||
width = int(image.shape[1] * scale_percent / 100)
|
||
height = int(image.shape[0] * scale_percent / 100)
|
||
dim = (width, height)
|
||
resized_image = cv2.resize(image, dim, interpolation=cv2.INTER_AREA)
|
||
|
||
# 创建一个窗口并显示缩小后的图像
|
||
cv2.namedWindow("Image")
|
||
cv2.imshow("Image", resized_image)
|
||
print("请在图像上选择爱心图标区域,然后按Enter键确认。")
|
||
|
||
# 使用selectROI函数手动选择区域
|
||
roi = cv2.selectROI("Image", resized_image, showCrosshair=True, fromCenter=False)
|
||
|
||
# 将ROI坐标按原始图像尺寸放大
|
||
x, y, w, h = roi
|
||
x = int(x * image.shape[1] / resized_image.shape[1])
|
||
y = int(y * image.shape[0] / resized_image.shape[0])
|
||
w = int(w * image.shape[1] / resized_image.shape[1])
|
||
h = int(h * image.shape[0] / resized_image.shape[0])
|
||
|
||
# 根据选择的ROI提取爱心图标
|
||
if w > 0 and h > 0: # 确保选择的区域有宽度和高度
|
||
heart_icon = image[y:y + h, x:x + w]
|
||
|
||
# 转换为HSV颜色空间
|
||
hsv = cv2.cvtColor(heart_icon, cv2.COLOR_BGR2HSV)
|
||
|
||
# 定义红色的HSV范围
|
||
lower_red1 = np.array([0, 120, 70])
|
||
upper_red1 = np.array([10, 255, 255])
|
||
lower_red2 = np.array([170, 120, 70])
|
||
upper_red2 = np.array([180, 255, 255])
|
||
|
||
# 创建掩模
|
||
mask1 = cv2.inRange(hsv, lower_red1, upper_red1)
|
||
mask2 = cv2.inRange(hsv, lower_red2, upper_red2)
|
||
mask = mask1 + mask2
|
||
|
||
# 反转掩模,因为我们想要的是爱心图标,而不是背景
|
||
mask_inv = cv2.bitwise_not(mask)
|
||
|
||
# 应用掩模
|
||
heart_icon = cv2.bitwise_and(heart_icon, heart_icon, mask=mask_inv)
|
||
|
||
# 创建一个全透明的背景
|
||
height, width, channels = heart_icon.shape
|
||
roi = np.zeros((height, width, channels), dtype=np.uint8)
|
||
|
||
# 将爱心图标粘贴到透明背景上
|
||
for c in range(channels):
|
||
roi[:, :, c] = np.where(mask_inv == 255, heart_icon[:, :, c], roi[:, :, c])
|
||
|
||
# 图片名称
|
||
imgName = "temp.png"
|
||
|
||
# 保存结果
|
||
cv2.imwrite(imgName, roi)
|
||
|
||
# 显示结果
|
||
cv2.imshow("Heart Icon with Transparent Background", roi)
|
||
cv2.waitKey(0)
|
||
cv2.destroyAllWindows()
|
||
else:
|
||
print("未选择有效区域。")
|
||
|
||
# 根据名称获取图片完整地址
|
||
@classmethod
|
||
def imagePathWithName(cls, udid, name):
|
||
current_file_path = os.path.abspath(__file__)
|
||
# 获取当前文件所在的目录(即script目录)
|
||
current_dir = os.path.dirname(current_file_path)
|
||
# 由于script目录位于项目根目录下一级,因此需要向上一级目录移动两次
|
||
project_root = os.path.abspath(os.path.join(current_dir, '..'))
|
||
# 构建资源文件的完整路径,向上两级目录,然后进入 resources 目录
|
||
resource_path = os.path.abspath(os.path.join(project_root, "resources", udid, name + ".png")).replace('/', '\\')
|
||
return resource_path
|
||
|
||
# 获取根目录
|
||
@classmethod
|
||
def getRootDir(cls):
|
||
current_file = os.path.abspath(__file__)
|
||
# 获取当前文件所在的目录
|
||
current_dir = os.path.dirname(current_file)
|
||
# 获取项目根目录(假设根目录是当前文件的父目录的父目录)
|
||
project_root = os.path.dirname(current_dir)
|
||
# 返回根目录
|
||
return project_root
|
||
|
||
# 创建一个以udid命名的目录
|
||
@classmethod
|
||
def makeUdidDir(cls, udid):
|
||
# 获取项目根目录
|
||
home = cls.getRootDir()
|
||
# 拼接 resources 目录的路径
|
||
resources_dir = os.path.join(home, "resources")
|
||
# 拼接 udid 目录的路径
|
||
udid_dir = os.path.join(resources_dir, udid)
|
||
# 检查 udid 目录是否存在,如果不存在则创建
|
||
if not os.path.exists(udid_dir):
|
||
try:
|
||
os.makedirs(udid_dir)
|
||
LogManager.info(f"目录 {udid_dir} 创建成功", udid)
|
||
print(f"目录 {udid_dir} 创建成功")
|
||
except Exception as e:
|
||
print(f"创建目录时出错: {e}")
|
||
LogManager.error(f"创建目录时出错: {e}", udid)
|
||
else:
|
||
LogManager.info(f"目录 {udid_dir} 已存在,跳过创建", udid)
|
||
print(f"目录 {udid_dir} 已存在,跳过创建")
|
||
|
||
# 查找首页按钮
|
||
# uuid 设备id
|
||
# click 是否点击该按钮
|
||
@classmethod
|
||
def findHomeButton(cls, session):
|
||
session.appium_settings({"snapshotMaxDepth": 10})
|
||
homeButton = session.xpath("//XCUIElementTypeButton[@name='a11y_vo_home' or @label='Home' or @label='首页']")
|
||
try:
|
||
if homeButton.exists:
|
||
print("找到首页了")
|
||
return homeButton
|
||
else:
|
||
print("没找到首页")
|
||
return None
|
||
except Exception as e:
|
||
print(e)
|
||
return None
|
||
|
||
# 查找关闭按钮
|
||
@classmethod
|
||
def findLiveCloseButton(cls, udid="eca000fcb6f55d7ed9b4c524055214c26a7de7aa"):
|
||
client = wda.USBClient(udid,wdaFunctionPort)
|
||
session = client.session()
|
||
session.appium_settings({"snapshotMaxDepth": 10})
|
||
r = session.xpath("//XCUIElementTypeButton[@name='关闭屏幕']")
|
||
try:
|
||
if r.label == "关闭屏幕":
|
||
return r
|
||
else:
|
||
return None
|
||
except Exception as e:
|
||
print(e)
|
||
return None
|
||
|
||
# 获取直播间窗口数量
|
||
@classmethod
|
||
def count_add_by_xml(cls, session):
|
||
xml = session.source()
|
||
root = ET.fromstring(xml)
|
||
return sum(
|
||
1 for e in root.iter()
|
||
if e.get('type') in ('XCUIElementTypeButton', 'XCUIElementTypeImage')
|
||
and (e.get('name') == '添加' or e.get('label') == '添加')
|
||
and (e.get('visible') in (None, 'true'))
|
||
)
|
||
|
||
# 获取关注按钮
|
||
@classmethod
|
||
def getFollowButton(cls, session: Client):
|
||
# followButton = session.xpath("//Window[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[2]/Other[2]/Other[1]/Other[1]/Other[3]/Other[1]/Other[1]/Button[1]")
|
||
session.appium_settings({"snapshotMaxDepth": 20})
|
||
|
||
followButton = session.xpath('//XCUIElementTypeButton[@name="关注" or @label="关注"]')
|
||
|
||
if followButton.exists:
|
||
print("2.关注找到了")
|
||
LogManager.info("2.关注找到了")
|
||
return followButton
|
||
else:
|
||
print("2.关注没找到")
|
||
print("2.关注没找到")
|
||
return None
|
||
|
||
# 查找发消息按钮
|
||
@classmethod
|
||
def getSendMesageButton(cls, session: Client):
|
||
|
||
# msgButton = session.xpath(
|
||
# '//XCUIElementTypeButton['
|
||
# '(@name="发消息" or @label="发消息" or '
|
||
# '@name="发送 👋" or @label="发送 👋" or '
|
||
# '@name="消息" or @label="消息")'
|
||
# ' and @visible="true"]'
|
||
# )
|
||
|
||
msgButton = session.xpath(
|
||
'//XCUIElementTypeButton['
|
||
'(@name="发消息" or @label="发消息" or '
|
||
'@name="发送 👋" or @label="发送 👋" or '
|
||
'@name="消息" or @label="消息" or '
|
||
'@name="Message" or @label="Message")'
|
||
' and @visible="true"]'
|
||
)
|
||
|
||
if msgButton.exists:
|
||
print("3.发消息按钮找到了")
|
||
LogManager.info("3.发消息按钮找到了")
|
||
return msgButton
|
||
else:
|
||
print("3.发消息按钮没找到")
|
||
LogManager.info("3.发消息按钮没找到")
|
||
return None
|
||
|
||
# 获取当前屏幕上的节点
|
||
@classmethod
|
||
def getCurrentScreenSource(cls):
|
||
client = wda.USBClient("eca000fcb6f55d7ed9b4c524055214c26a7de7aa",wdaFunctionPort)
|
||
print(client.source())
|
||
|
||
# 查找app主页上的收件箱按钮
|
||
@classmethod
|
||
def getMsgBoxButton(cls, session: Client):
|
||
# box = session.xpath("//XCUIElementTypeButton[name='a11y_vo_inbox']")
|
||
box = session(xpath='//XCUIElementTypeButton[@name="a11y_vo_inbox"]')
|
||
if box.exists:
|
||
return box
|
||
else:
|
||
return None
|
||
|
||
# 获取收件箱中的未读消息数
|
||
@classmethod
|
||
def getUnReadMsgCount(cls, session: Client):
|
||
btn = cls.getMsgBoxButton(session)
|
||
print(f"btn:{btn}")
|
||
return cls.findNumber(btn.label)
|
||
|
||
@classmethod
|
||
def parse_float(cls, el, attr, default=0.0):
|
||
try:
|
||
return float(el.get(attr, default))
|
||
except Exception:
|
||
return default
|
||
|
||
# @classmethod
|
||
# def extract_messages_from_xml(cls, xml: str):
|
||
# """
|
||
# 解析 TikTok 聊天 XML,返回当前屏幕可见的消息与时间分隔:
|
||
# [{"type":"time","text":"..."}, {"type":"msg","dir":"in|out","text":"..."}]
|
||
# 兼容 Table / CollectionView / ScrollView;过滤系统提示/底部工具栏;可见性使用“重叠可视+容差”。
|
||
# """
|
||
# if not isinstance(xml, str) or not xml.strip():
|
||
# return []
|
||
#
|
||
# try:
|
||
# root = etree.fromstring(xml.encode("utf-8"))
|
||
# except Exception:
|
||
# return []
|
||
#
|
||
# # ---------- 小工具 ----------
|
||
# def get_text(el):
|
||
# s = (el.get('label') or el.get('name') or el.get('value') or '') or ''
|
||
# return html.unescape(s.strip())
|
||
#
|
||
# def is_visible(el):
|
||
# v = el.get('visible')
|
||
# return (v is None) or (v.lower() == 'true')
|
||
#
|
||
# def get_ancestor_cell(el):
|
||
# p = el
|
||
# while p is not None and p.get('type') != 'XCUIElementTypeCell':
|
||
# p = p.getparent()
|
||
# return p
|
||
#
|
||
# # ---------- 屏幕尺寸 ----------
|
||
# app = root.xpath('/XCUIElementTypeApplication')
|
||
# screen_w = cls.parse_float(app[0], 'width', 414.0) if app else 414.0
|
||
# screen_h = cls.parse_float(app[0], 'height', 736.0) if app else 736.0
|
||
#
|
||
# # ---------- 主容器探测 ----------
|
||
# def pick_container():
|
||
# cands = []
|
||
# for xp, ctype in (
|
||
# ('//XCUIElementTypeTable', 'table'),
|
||
# ('//XCUIElementTypeCollectionView', 'collection'),
|
||
# ('//XCUIElementTypeScrollView', 'scroll'),
|
||
# ):
|
||
# nodes = [n for n in root.xpath(xp) if is_visible(n)]
|
||
# for n in nodes:
|
||
# y = cls.parse_float(n, 'y', 0.0)
|
||
# h = cls.parse_float(n, 'height', screen_h)
|
||
# cells = n.xpath('.//XCUIElementTypeCell')
|
||
# score = len(cells) * 10 - abs((y + h / 2) - screen_h / 2)
|
||
# cands.append((score, n, ctype))
|
||
# if cands:
|
||
# cands.sort(key=lambda t: t[0], reverse=True)
|
||
# return cands[0][1], cands[0][2]
|
||
# return None, None
|
||
#
|
||
# container, container_type = pick_container()
|
||
#
|
||
# # ---------- 可视区 ----------
|
||
# if container is not None:
|
||
# area_top = cls.parse_float(container, 'y', 0.0)
|
||
# area_h = cls.parse_float(container, 'height', screen_h)
|
||
# area_bot = area_top + area_h
|
||
# else:
|
||
# blocks = [n for n in root.xpath('//XCUIElementTypeOther[@y and @height and @width>="200"]') if
|
||
# is_visible(n)]
|
||
# area_top = 0.0
|
||
# if blocks:
|
||
# blocks.sort(key=lambda n: cls.parse_float(n, 'y', 0.0))
|
||
# b = blocks[0]
|
||
# area_top = cls.parse_float(b, 'y', 0.0) + cls.parse_float(b, 'height', 0.0)
|
||
# tvs = [n for n in root.xpath('//XCUIElementTypeTextView') if is_visible(n)]
|
||
# if tvs:
|
||
# tvs.sort(key=lambda n: cls.parse_float(n, 'y', 0.0))
|
||
# area_bot = cls.parse_float(tvs[-1], 'y', screen_h)
|
||
# else:
|
||
# area_bot = screen_h
|
||
# if area_bot - area_top < 100:
|
||
# area_top, area_bot = 0.0, screen_h
|
||
#
|
||
# def in_view(el) -> bool:
|
||
# if not is_visible(el):
|
||
# return False
|
||
# y = cls.parse_float(el, 'y', -1e9)
|
||
# h = cls.parse_float(el, 'height', 0.0)
|
||
# by = y + h
|
||
# tol = 8.0
|
||
# return not (by <= area_top + tol or y >= area_bot - tol)
|
||
#
|
||
# # ---------- 时间分隔 ----------
|
||
# items = []
|
||
# for t in root.xpath('//XCUIElementTypeStaticText[contains(@traits, "Header")]'):
|
||
# if not in_view(t):
|
||
# continue
|
||
# txt = get_text(t)
|
||
# if txt:
|
||
# items.append({'type': 'time', 'text': txt, 'y': cls.parse_float(t, 'y', 0.0)})
|
||
#
|
||
# # ---------- 系统提示/横幅过滤 ----------
|
||
# EXCLUDES_LITERAL = {
|
||
# 'Heart', 'Lol', 'ThumbsUp',
|
||
# '分享发布内容', '视频贴纸标签页', '双击发送表情', '贴纸',
|
||
# '关注',
|
||
# }
|
||
# SYSTEM_PATTERNS = [
|
||
# r"消息请求已被接受。你们可以开始聊天了。",
|
||
# r"(消息请求已被接受|你开始了和.*的聊天|你打开了这个与.*的聊天).*",
|
||
# r"开启(私信)?通知", r"开启通知",
|
||
# r"你打开了这个与 .* 的聊天。.*隐私",
|
||
# r"在此用户接受你的消息请求之前,你最多只能发送 ?\d+ 条消息。?",
|
||
# r"聊天消息条数已达上限,你将无法向该用户发送消息。?",
|
||
# r"未发送$",
|
||
# r"Turn on (DM|message|direct message)?\s*notifications",
|
||
# r"Enable notifications",
|
||
# r"Get notified when .* replies",
|
||
# r"You opened this chat .* privacy",
|
||
# r"Only \d+ message can be sent .* accepts .* request",
|
||
# r"此消息可能违反.*",
|
||
# r"无法发送",
|
||
# r"请告知我们"
|
||
# ]
|
||
# SYSTEM_RE = re.compile("|".join(SYSTEM_PATTERNS), re.IGNORECASE)
|
||
#
|
||
# # ---------- 资料卡片(个人信息)剔除 ----------
|
||
# PROFILE_RE = re.compile(
|
||
# r"@[\w\.\-]+|粉丝|followers?|following|关注账号",
|
||
# re.IGNORECASE
|
||
# )
|
||
#
|
||
# def is_profile_cell(cell) -> bool:
|
||
# if cell is None:
|
||
# return False
|
||
# if cell.xpath(
|
||
# './/XCUIElementTypeButton[@name="关注" or @label="关注" or '
|
||
# 'contains(translate(@name,"FOLW","folw"),"follow") or '
|
||
# 'contains(translate(@label,"FOLW","folw"),"follow")]'
|
||
# ):
|
||
# return True
|
||
# texts = []
|
||
# for t in cell.xpath('.//*[@name or @label or @value]'):
|
||
# s = get_text(t)
|
||
# if s:
|
||
# texts.append(s)
|
||
# if len(texts) > 40:
|
||
# break
|
||
# joined = " ".join(texts)
|
||
# if PROFILE_RE.search(joined):
|
||
# return True
|
||
# cy = cls.parse_float(cell, 'y', 0.0)
|
||
# ch = cls.parse_float(cell, 'height', 0.0)
|
||
# if cy < area_top + 140 and ch >= 150:
|
||
# return True
|
||
# return False
|
||
#
|
||
# def is_toolbar_like(o) -> bool:
|
||
# txt = get_text(o)
|
||
# if txt in EXCLUDES_LITERAL:
|
||
# return True
|
||
# y = cls.parse_float(o, 'y', 0.0)
|
||
# h = cls.parse_float(o, 'height', 0.0)
|
||
# near_bottom = (area_bot - (y + h)) < 48
|
||
# is_short = h <= 40
|
||
# return near_bottom and is_short
|
||
#
|
||
# # ---------- 收集消息候选 ----------
|
||
# msg_nodes = []
|
||
# if container is not None:
|
||
# cand = container.xpath(
|
||
# './/XCUIElementTypeCell//*[self::XCUIElementTypeOther or self::XCUIElementTypeStaticText or self::XCUIElementTypeTextView]'
|
||
# '[@y and (@name or @label or @value)]'
|
||
# )
|
||
# for o in cand:
|
||
# if not in_view(o):
|
||
# continue
|
||
# if is_toolbar_like(o):
|
||
# continue
|
||
# cell = get_ancestor_cell(o)
|
||
# if is_profile_cell(cell):
|
||
# continue
|
||
# txt = get_text(o)
|
||
# if not txt or SYSTEM_RE.search(txt) or txt in EXCLUDES_LITERAL:
|
||
# continue
|
||
# msg_nodes.append(o)
|
||
# else:
|
||
# cand = root.xpath(
|
||
# '//XCUIElementTypeOther[@y and (@name or @label or @value)]'
|
||
# ' | //XCUIElementTypeStaticText[@y and (@name or @label or @value)]'
|
||
# ' | //XCUIElementTypeTextView[@y and (@name or @label or @value)]'
|
||
# )
|
||
# for o in cand:
|
||
# p = o.getparent()
|
||
# if p is not None and p.get('type') == 'XCUIElementTypeCollectionView':
|
||
# continue
|
||
# if not in_view(o) or is_toolbar_like(o):
|
||
# continue
|
||
# cell = get_ancestor_cell(o)
|
||
# if is_profile_cell(cell):
|
||
# continue
|
||
# txt = get_text(o)
|
||
# if not txt or SYSTEM_RE.search(txt) or txt in EXCLUDES_LITERAL:
|
||
# continue
|
||
# msg_nodes.append(o)
|
||
#
|
||
# # ---------- 方向判定 & 组装(中心点法) ----------
|
||
# CENTER_MARGIN = max(12.0, screen_w * 0.02) # 中线容差
|
||
#
|
||
# for o in msg_nodes:
|
||
# txt = get_text(o)
|
||
# if not txt or txt in EXCLUDES_LITERAL or SYSTEM_RE.search(txt):
|
||
# continue
|
||
#
|
||
# x = cls.parse_float(o, 'x', 0.0)
|
||
# y = cls.parse_float(o, 'y', 0.0)
|
||
# w = cls.parse_float(o, 'width', 0.0)
|
||
#
|
||
# center_x = x + w / 2.0
|
||
# screen_center = screen_w / 2.0
|
||
#
|
||
# if center_x < screen_center - CENTER_MARGIN:
|
||
# direction = 'in' # 左侧:对方
|
||
# elif center_x > screen_center + CENTER_MARGIN:
|
||
# direction = 'out' # 右侧:自己
|
||
# else:
|
||
# # 处在中线附近,用右缘兜底
|
||
# right_edge = x + w
|
||
# direction = 'out' if right_edge >= screen_center else 'in'
|
||
#
|
||
# items.append({'type': 'msg', 'dir': direction, 'text': txt, 'y': y})
|
||
#
|
||
# # ---------- 排序 & 收尾 ----------
|
||
# if items:
|
||
# items.sort(key=lambda i: i.get('y', 0.0))
|
||
# for it in items:
|
||
# it.pop('y', None)
|
||
# return items
|
||
|
||
@staticmethod
|
||
def parse_float(el, key: str, default: float = 0.0) -> float:
|
||
"""稳健读取浮点属性"""
|
||
if el is None:
|
||
return default
|
||
v = el.get(key)
|
||
if v is None or v == "":
|
||
return default
|
||
try:
|
||
return float(v)
|
||
except Exception:
|
||
try:
|
||
# 某些抓取会出现 '20.0px' / '20,' 等
|
||
v2 = re.sub(r"[^\d\.\-]+", "", v)
|
||
return float(v2) if v2 else default
|
||
except Exception:
|
||
return default
|
||
|
||
@classmethod
|
||
def extract_messages_from_xml(cls, xml: str):
|
||
"""
|
||
解析 TikTok 聊天 XML,返回当前屏幕可见的消息与时间分隔:
|
||
[{"type":"time","text":"..."}, {"type":"msg","dir":"in|out","text":"..."}]
|
||
兼容 Table / CollectionView / ScrollView;过滤系统提示/底部工具栏;
|
||
资料卡只过滤“资料区块”而非整 Cell;可见性使用“重叠可视+容差”。
|
||
"""
|
||
if not isinstance(xml, str) or not xml.strip():
|
||
return []
|
||
|
||
try:
|
||
root = etree.fromstring(xml.encode("utf-8"))
|
||
except Exception:
|
||
return []
|
||
|
||
# ---------- 小工具 ----------
|
||
def get_text(el):
|
||
s = (el.get('label') or el.get('name') or el.get('value') or '') or ''
|
||
return html.unescape(s.strip())
|
||
|
||
def is_visible(el):
|
||
v = el.get('visible')
|
||
return (v is None) or (v.lower() == 'true')
|
||
|
||
def get_ancestor_cell(el):
|
||
p = el
|
||
while p is not None and p.get('type') != 'XCUIElementTypeCell':
|
||
p = p.getparent()
|
||
return p
|
||
|
||
def _bbox(el):
|
||
return (
|
||
cls.parse_float(el, 'x', 0.0),
|
||
cls.parse_float(el, 'y', 0.0),
|
||
cls.parse_float(el, 'width', 0.0),
|
||
cls.parse_float(el, 'height', 0.0),
|
||
)
|
||
|
||
# ---------- 屏幕尺寸 ----------
|
||
app = root.xpath('/XCUIElementTypeApplication')
|
||
screen_w = cls.parse_float(app[0], 'width', 414.0) if app else 414.0
|
||
screen_h = cls.parse_float(app[0], 'height', 736.0) if app else 736.0
|
||
|
||
# ---------- 主容器探测 ----------
|
||
def pick_container():
|
||
cands = []
|
||
for xp, ctype in (
|
||
('//XCUIElementTypeTable', 'table'),
|
||
('//XCUIElementTypeCollectionView', 'collection'),
|
||
('//XCUIElementTypeScrollView', 'scroll'),
|
||
):
|
||
nodes = [n for n in root.xpath(xp) if is_visible(n)]
|
||
for n in nodes:
|
||
y = cls.parse_float(n, 'y', 0.0)
|
||
h = cls.parse_float(n, 'height', screen_h)
|
||
cells = n.xpath('.//XCUIElementTypeCell')
|
||
score = len(cells) * 10 - abs((y + h / 2) - screen_h / 2)
|
||
cands.append((score, n, ctype))
|
||
if cands:
|
||
cands.sort(key=lambda t: t[0], reverse=True)
|
||
return cands[0][1], cands[0][2]
|
||
return None, None
|
||
|
||
container, container_type = pick_container()
|
||
|
||
# ---------- 可视区 ----------
|
||
if container is not None:
|
||
area_top = cls.parse_float(container, 'y', 0.0)
|
||
area_h = cls.parse_float(container, 'height', screen_h)
|
||
area_bot = area_top + area_h
|
||
else:
|
||
blocks = [n for n in root.xpath('//XCUIElementTypeOther[@y and @height and @width>="200"]') if
|
||
is_visible(n)]
|
||
area_top = 0.0
|
||
if blocks:
|
||
blocks.sort(key=lambda n: cls.parse_float(n, 'y', 0.0))
|
||
b = blocks[0]
|
||
area_top = cls.parse_float(b, 'y', 0.0) + cls.parse_float(b, 'height', 0.0)
|
||
tvs = [n for n in root.xpath('//XCUIElementTypeTextView') if is_visible(n)]
|
||
if tvs:
|
||
tvs.sort(key=lambda n: cls.parse_float(n, 'y', 0.0))
|
||
area_bot = cls.parse_float(tvs[-1], 'y', screen_h)
|
||
else:
|
||
area_bot = screen_h
|
||
if area_bot - area_top < 100:
|
||
area_top, area_bot = 0.0, screen_h
|
||
|
||
def in_view(el) -> bool:
|
||
if not is_visible(el):
|
||
return False
|
||
y = cls.parse_float(el, 'y', -1e9)
|
||
h = cls.parse_float(el, 'height', 0.0)
|
||
by = y + h
|
||
tol = 8.0
|
||
return not (by <= area_top + tol or y >= area_bot - tol)
|
||
|
||
# ---------- 时间分隔 ----------
|
||
items = []
|
||
for t in root.xpath('//XCUIElementTypeStaticText[contains(@traits, "Header")]'):
|
||
if not in_view(t):
|
||
continue
|
||
txt = get_text(t)
|
||
if txt:
|
||
items.append({'type': 'time', 'text': txt, 'y': cls.parse_float(t, 'y', 0.0)})
|
||
|
||
# ---------- 系统提示/横幅过滤 ----------
|
||
EXCLUDES_LITERAL = {
|
||
'Heart', 'Lol', 'ThumbsUp',
|
||
'分享发布内容', '视频贴纸标签页', '双击发送表情', '贴纸',
|
||
'关注', # 注意:仅用于按钮/工具条等短元素,后续还会叠加区域过滤,避免误杀消息
|
||
}
|
||
SYSTEM_PATTERNS = [
|
||
r"消息请求已被接受。你们可以开始聊天了。",
|
||
r"(消息请求已被接受|你开始了和.*的聊天|你打开了这个与.*的聊天).*",
|
||
r"开启(私信)?通知", r"开启通知",
|
||
r"你打开了这个与 .* 的聊天。.*隐私",
|
||
r"在此用户接受你的消息请求之前,你最多只能发送 ?\d+ 条消息。?",
|
||
r"聊天消息条数已达上限,你将无法向该用户发送消息。?",
|
||
r"未发送$",
|
||
r"Turn on (DM|message|direct message)?\s*notifications",
|
||
r"Enable notifications",
|
||
r"Get notified when .* replies",
|
||
r"You opened this chat .* privacy",
|
||
r"Only \d+ message can be sent .* accepts .* request",
|
||
r"此消息可能违反.*",
|
||
r"无法发送",
|
||
r"请告知我们",
|
||
r"已看过"
|
||
]
|
||
SYSTEM_RE = re.compile("|".join(SYSTEM_PATTERNS), re.IGNORECASE)
|
||
|
||
# ---------- 资料卡片(个人信息)剔除:仅过滤“资料区块” ----------
|
||
PROFILE_RE = re.compile(
|
||
r"@[\w\.\-]+|粉丝|followers?|following|关注账号",
|
||
re.IGNORECASE
|
||
)
|
||
|
||
def is_profile_cell(cell) -> bool:
|
||
"""更严格:至少同时命中 >=2 个信号才认定为资料卡片 Cell。"""
|
||
if cell is None:
|
||
return False
|
||
|
||
has_follow_btn = bool(cell.xpath(
|
||
'.//XCUIElementTypeButton['
|
||
'@name="关注" or @label="关注" or '
|
||
'contains(translate(@name,"FOLW","folw"),"follow") or '
|
||
'contains(translate(@label,"FOLW","folw"),"follow")]'
|
||
))
|
||
|
||
has_view_profile = bool(cell.xpath(
|
||
'.//XCUIElementTypeButton['
|
||
'@name="查看主页" or @label="查看主页" or '
|
||
'contains(translate(@name,"VIEW PROFILE","view profile"),"view profile") or '
|
||
'contains(translate(@label,"VIEW PROFILE","view profile"),"view profile")]'
|
||
))
|
||
|
||
has_live_ended = bool(cell.xpath(
|
||
'.//XCUIElementTypeStaticText['
|
||
'@name="直播已结束" or @label="直播已结束" or '
|
||
'contains(translate(@name,"LIVE ENDED","live ended"),"live ended") or '
|
||
'contains(translate(@label,"LIVE ENDED","live ended"),"live ended")]'
|
||
))
|
||
|
||
cy = cls.parse_float(cell, 'y', 0.0)
|
||
ch = cls.parse_float(cell, 'height', 0.0)
|
||
looks_large_card = ch >= 180 # 大卡片外观
|
||
|
||
# 再做一次文本特征检查(防止仅一个“关注”误杀)
|
||
texts = []
|
||
for t in cell.xpath('.//*[@name or @label or @value]'):
|
||
s = get_text(t)
|
||
if s:
|
||
texts.append(s)
|
||
if len(texts) > 40:
|
||
break
|
||
joined = " ".join(texts)
|
||
has_profile_terms = bool(PROFILE_RE.search(joined))
|
||
|
||
# 命中信号计数(至少2个)
|
||
signals = sum([has_follow_btn, has_view_profile, has_live_ended, looks_large_card, has_profile_terms])
|
||
return signals >= 2
|
||
|
||
def profile_region_y_range(cell):
|
||
"""
|
||
在资料卡 Cell 内,估算“资料区块”的 y 范围(min_y, max_y)。
|
||
用关键元素(关注按钮 / 查看主页 / 直播已结束 / 短用户名)来圈定范围。
|
||
"""
|
||
if cell is None:
|
||
return None
|
||
|
||
key_nodes = []
|
||
key_nodes += cell.xpath('.//XCUIElementTypeButton[@name="关注" or @label="关注"]')
|
||
key_nodes += cell.xpath('.//XCUIElementTypeButton[@name="查看主页" or @label="查看主页"]')
|
||
key_nodes += cell.xpath('.//XCUIElementTypeStaticText[@name="直播已结束" or @label="直播已结束"]')
|
||
|
||
# 用户名/昵称:长度较短更像资料区标签
|
||
for t in cell.xpath('.//XCUIElementTypeStaticText[@name or @label]'):
|
||
s = (t.get('label') or t.get('name') or '') or ''
|
||
st = s.strip()
|
||
if st and len(st) <= 30:
|
||
key_nodes.append(t)
|
||
|
||
ys = []
|
||
for n in key_nodes:
|
||
_, y, _, h = _bbox(n)
|
||
ys += [y, y + h]
|
||
|
||
if not ys:
|
||
return None # 没有关键元素则不定义资料区
|
||
|
||
min_y, max_y = min(ys), max(ys)
|
||
pad = 12.0
|
||
return (min_y - pad, max_y + pad)
|
||
|
||
def belongs_to_profile_region(node, cell) -> bool:
|
||
"""判断候选 node 是否落在资料区块的 y 范围内"""
|
||
rng = profile_region_y_range(cell)
|
||
if not rng:
|
||
return False
|
||
_, y, _, h = _bbox(node)
|
||
ny1, ny2 = y, y + h
|
||
ry1, ry2 = rng
|
||
return not (ny2 < ry1 or ny1 > ry2) # 任意重叠即算属于资料区
|
||
|
||
def is_toolbar_like(o) -> bool:
|
||
# 在 Cell 里的元素绝不是底部工具条(避免误杀“hgh”这类贴着底部的最后一条消息)
|
||
if get_ancestor_cell(o) is not None:
|
||
return False
|
||
|
||
txt = get_text(o)
|
||
if txt in EXCLUDES_LITERAL:
|
||
return True
|
||
|
||
y = cls.parse_float(o, 'y', 0.0)
|
||
h = cls.parse_float(o, 'height', 0.0)
|
||
near_bottom = (area_bot - (y + h)) < 48
|
||
is_short = h <= 40
|
||
return near_bottom and is_short
|
||
|
||
# ---------- 收集消息候选 ----------
|
||
msg_nodes = []
|
||
if container is not None:
|
||
cand = container.xpath(
|
||
'.//XCUIElementTypeCell//*[self::XCUIElementTypeOther or self::XCUIElementTypeStaticText or self::XCUIElementTypeTextView]'
|
||
'[@y and (@name or @label or @value)]'
|
||
)
|
||
for o in cand:
|
||
if not in_view(o):
|
||
continue
|
||
if is_toolbar_like(o):
|
||
continue
|
||
cell = get_ancestor_cell(o)
|
||
# 仅在“资料卡 Cell 且节点位于资料区块范围内”时过滤
|
||
if is_profile_cell(cell) and belongs_to_profile_region(o, cell):
|
||
continue
|
||
txt = get_text(o)
|
||
if not txt or SYSTEM_RE.search(txt) or txt in EXCLUDES_LITERAL:
|
||
continue
|
||
msg_nodes.append(o)
|
||
else:
|
||
cand = root.xpath(
|
||
'//XCUIElementTypeOther[@y and (@name or @label or @value)]'
|
||
' | //XCUIElementTypeStaticText[@y and (@name or @label or @value)]'
|
||
' | //XCUIElementTypeTextView[@y and (@name or @label or @value)]'
|
||
)
|
||
for o in cand:
|
||
p = o.getparent()
|
||
if p is not None and p.get('type') == 'XCUIElementTypeCollectionView':
|
||
continue
|
||
if not in_view(o) or is_toolbar_like(o):
|
||
continue
|
||
cell = get_ancestor_cell(o)
|
||
if is_profile_cell(cell) and belongs_to_profile_region(o, cell):
|
||
continue
|
||
txt = get_text(o)
|
||
if not txt or SYSTEM_RE.search(txt) or txt in EXCLUDES_LITERAL:
|
||
continue
|
||
msg_nodes.append(o)
|
||
|
||
# ---------- 方向判定 & 组装(中心点法) ----------
|
||
CENTER_MARGIN = max(12.0, screen_w * 0.02) # 中线容差
|
||
|
||
for o in msg_nodes:
|
||
txt = get_text(o)
|
||
if not txt or txt in EXCLUDES_LITERAL or SYSTEM_RE.search(txt):
|
||
continue
|
||
|
||
x = cls.parse_float(o, 'x', 0.0)
|
||
y = cls.parse_float(o, 'y', 0.0)
|
||
w = cls.parse_float(o, 'width', 0.0)
|
||
|
||
center_x = x + w / 2.0
|
||
screen_center = screen_w / 2.0
|
||
|
||
if center_x < screen_center - CENTER_MARGIN:
|
||
direction = 'in' # 左侧:对方
|
||
elif center_x > screen_center + CENTER_MARGIN:
|
||
direction = 'out' # 右侧:自己
|
||
else:
|
||
# 处在中线附近,用右缘兜底
|
||
right_edge = x + w
|
||
direction = 'out' if right_edge >= screen_center else 'in'
|
||
|
||
items.append({'type': 'msg', 'dir': direction, 'text': txt, 'y': y})
|
||
|
||
# ---------- 排序 & 收尾 ----------
|
||
if items:
|
||
items.sort(key=lambda i: i.get('y', 0.0))
|
||
for it in items:
|
||
it.pop('y', None)
|
||
return items
|
||
|
||
@classmethod
|
||
def get_navbar_anchor_name(cls, session, timeout: float = 5) -> str:
|
||
"""从聊天页导航栏读取主播名称;找不到返回空字符串。"""
|
||
|
||
# 限制快照深度(可选)
|
||
try:
|
||
session.appium_settings({"snapshotMaxDepth": 22})
|
||
except Exception:
|
||
pass
|
||
|
||
def _text_of(el) -> str:
|
||
info = getattr(el, "info", {}) or {}
|
||
return (info.get("label") or info.get("name") or info.get("value") or "").strip()
|
||
|
||
def _clean_tail(s: str) -> str:
|
||
# 去掉末尾中英标点和空白(TikTok 昵称右侧常带一个顿号/逗号,比如 “Alina,”)
|
||
return re.sub(r"[,、,。.\s]+$", "", s).strip()
|
||
|
||
# ---- 关键修复:导航容器 ----
|
||
# 1) “返回”可能是 Button 也可能是 Other;同时页面右上角有 “更多/举报” 按钮
|
||
BACK_ELEM = (
|
||
"//*[@type='XCUIElementTypeButton' or @type='XCUIElementTypeOther']"
|
||
"[@name='返回' or @label='返回' or @name='Back' or @label='Back' "
|
||
" or @name='戻る' or @label='戻る']"
|
||
)
|
||
RIGHT_MENU_BTN = (
|
||
"//XCUIElementTypeButton"
|
||
"[@name='更多' or @label='更多' or @name='More' or @label='More' or "
|
||
" @name='その他' or @label='その他' or @name='詳細' or @label='詳細' or "
|
||
" @name='举报' or @label='举报' or @name='Report' or @label='Report' or "
|
||
" @name='報告' or @label='報告']"
|
||
)
|
||
# 从“返回”向上找到最近祖先 Other,且该祖先内包含“更多/举报”按钮
|
||
NAV_CONTAINER = (
|
||
BACK_ELEM +
|
||
"/ancestor::XCUIElementTypeOther[ .//XCUIElementTypeOther or .//XCUIElementTypeButton ][ ."
|
||
+ RIGHT_MENU_BTN +
|
||
"][1]"
|
||
)
|
||
|
||
# ① 优先:在导航容器里找“可访问的 Other(有文本且不包含子 Button)”
|
||
XPATH_TITLE_OTHER = (
|
||
NAV_CONTAINER +
|
||
"//XCUIElementTypeOther[@accessible='true' and count(.//XCUIElementTypeButton)=0 "
|
||
" and (string-length(@name)>0 or string-length(@label)>0 or string-length(@value)>0)][1]"
|
||
)
|
||
|
||
# ② 退路:导航容器内第一个有文本的 StaticText
|
||
XPATH_TITLE_STATIC = (
|
||
NAV_CONTAINER +
|
||
"//XCUIElementTypeStaticText[string-length(@value)>0 or string-length(@label)>0 or string-length(@name)>0][1]"
|
||
)
|
||
|
||
# ③ 兜底:直接在“返回”与右侧菜单同一层级/附近范围内找可读的 Other(适配部分机型结构差异)
|
||
XPATH_FALLBACK_NEAR_BACK = (
|
||
BACK_ELEM +
|
||
"/ancestor::XCUIElementTypeOther[1]"
|
||
"//XCUIElementTypeOther[@accessible='true' and "
|
||
" (string-length(@name)>0 or string-length(@label)>0 or string-length(@value)>0)]"
|
||
"[count(.//XCUIElementTypeButton)=0][1]"
|
||
)
|
||
|
||
# ④ 兜底:昵称往往以顿号/逗号结尾(例如 “Alina,”);利用这个规律匹配
|
||
XPATH_HINT_COMMA_END = (
|
||
NAV_CONTAINER +
|
||
"//*[ (contains(@name,',') or contains(@label,',') or contains(@value,',')) "
|
||
" and string-length(@name)+string-length(@label)+string-length(@value) < 64 ]"
|
||
"[1]"
|
||
)
|
||
|
||
# ---- 查询顺序:① -> ② -> ③ -> ④ ----
|
||
for xp, wait_s in [
|
||
(XPATH_TITLE_OTHER, timeout),
|
||
(XPATH_TITLE_STATIC, 1.0),
|
||
(XPATH_FALLBACK_NEAR_BACK, 1.0),
|
||
(XPATH_HINT_COMMA_END, 1.0),
|
||
]:
|
||
try:
|
||
q = session.xpath(xp)
|
||
if q.wait(wait_s):
|
||
txt = _clean_tail(_text_of(q.get()))
|
||
if txt:
|
||
return txt
|
||
except Exception:
|
||
pass
|
||
|
||
return ""
|
||
|
||
# 检查字符串中是否包含中文
|
||
@classmethod
|
||
def contains_chinese(cls, text):
|
||
"""
|
||
判断字符串中是否包含中文字符。
|
||
参数:
|
||
text (str): 要检测的字符串。
|
||
|
||
返回:
|
||
bool: 如果字符串中包含中文,返回 True;否则返回 False。
|
||
"""
|
||
# 使用正则表达式匹配中文字符
|
||
pattern = re.compile(r'[\u4e00-\u9fff]')
|
||
return bool(pattern.search(text))
|
||
|
||
@classmethod
|
||
def is_language(cls, text: str) -> bool:
|
||
if not text:
|
||
return False
|
||
for ch in text:
|
||
if unicodedata.category(ch).startswith("L"):
|
||
return True
|
||
return False
|
||
|
||
@classmethod
|
||
def _read_json_list(cls, file_path: Path) -> list:
|
||
"""读取为 list;读取失败或不是 list 则返回空数组"""
|
||
if not file_path.exists():
|
||
return []
|
||
try:
|
||
with open(file_path, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
return data if isinstance(data, list) else []
|
||
except Exception as e:
|
||
LogManager.error(f"[acList] 读取失败,将按空数组处理: {e}")
|
||
return []
|
||
|
||
@classmethod
|
||
def _write_json_list(cls, file_path: Path, data: list) -> None:
|
||
file_path.parent.mkdir(parents=True, exist_ok=True)
|
||
with open(file_path, "w", encoding="utf-8") as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
|
||
@staticmethod
|
||
def _normalize_anchor_items(items):
|
||
"""
|
||
规范化为 [{...}]:
|
||
- 允许传入:单个对象、对象数组、字符串(当 anchorId 用)
|
||
- 保留原有字段,不限制只存 anchorId/country
|
||
- 字符串输入 → {"anchorId": xxx}
|
||
"""
|
||
result = []
|
||
if items is None:
|
||
return result
|
||
|
||
if isinstance(items, dict):
|
||
aid = items.get("anchorId")
|
||
if aid:
|
||
obj = dict(items) # 保留所有字段
|
||
result.append(obj)
|
||
return result
|
||
|
||
if isinstance(items, list):
|
||
for it in items:
|
||
if isinstance(it, dict):
|
||
aid = it.get("anchorId")
|
||
if aid:
|
||
obj = dict(it)
|
||
result.append(obj)
|
||
elif isinstance(it, str):
|
||
result.append({"anchorId": it})
|
||
return result
|
||
|
||
if isinstance(items, str):
|
||
result.append({"anchorId": items})
|
||
return result
|
||
|
||
# -------- 追加(对象数组平铺追加) --------
|
||
@classmethod
|
||
def save_aclist_flat_append(cls, acList, filename="data/acList.json"):
|
||
"""
|
||
将 anchor 对象数组平铺追加到 JSON 文件(数组)中。
|
||
文件固定写到 项目根目录/data/acList.json
|
||
"""
|
||
|
||
# 找到当前文件所在目录,回退到项目根目录
|
||
root_dir = Path(__file__).resolve().parent.parent # 根据实际层级调整
|
||
log_dir = root_dir
|
||
log_dir.mkdir(parents=True, exist_ok=True) # 确保 log 目录存在
|
||
|
||
file_path = log_dir / filename
|
||
|
||
data = cls._read_json_list(file_path)
|
||
|
||
# 规范化输入,确保都是 {anchorId, country}
|
||
to_add = cls._normalize_anchor_items(acList)
|
||
if not to_add:
|
||
LogManager.info("[acList] 传入为空或不合规,跳过写入")
|
||
return
|
||
|
||
data.extend(to_add)
|
||
cls._write_json_list(file_path, data)
|
||
# LogManager.method_info(f"写入的路径是:{file_path}", "写入数据")
|
||
LogManager.info(f"[acList] 已追加 {len(to_add)} 条,当前总数={len(data)} -> {file_path}")
|
||
|
||
@classmethod
|
||
def pop_aclist_first(cls, filename="data/acList.json", mode="pop"):
|
||
"""
|
||
从 JSON 数组/对象(anchorList)中取出第一个 anchor 对象。
|
||
- mode="pop" : 取出并删除
|
||
- mode="move" : 取出并放到列表尾部
|
||
返回形如:{"anchorId": "...", "country": "...", ...}
|
||
"""
|
||
file_path = cls._resolve_path(filename)
|
||
|
||
|
||
if not file_path.exists():
|
||
return None
|
||
|
||
try:
|
||
raw = json.loads(file_path.read_text(encoding="utf-8-sig"))
|
||
except Exception as e:
|
||
LogManager.error(f"[pop_aclist_first] 读取失败: {e}")
|
||
return None
|
||
|
||
# 支持两种格式:list 或 dict{anchorList:[...]}
|
||
if isinstance(raw, list):
|
||
data, wrapper = raw, None
|
||
elif isinstance(raw, dict) and isinstance(raw.get("anchorList"), list):
|
||
data, wrapper = raw["anchorList"], raw
|
||
else:
|
||
return None
|
||
|
||
if not data:
|
||
return None
|
||
|
||
# 取第一个
|
||
first = data.pop(0)
|
||
norm = cls._normalize_anchor_items(first)
|
||
first = norm[0] if norm else None
|
||
|
||
if first and mode == "move":
|
||
# 放到尾部
|
||
data.append(first)
|
||
|
||
# 写回
|
||
to_write = wrapper if wrapper is not None else data
|
||
file_path.write_text(
|
||
json.dumps(to_write, ensure_ascii=False, indent=2),
|
||
encoding="utf-8"
|
||
)
|
||
return first
|
||
|
||
@classmethod
|
||
def bulk_update_anchors(cls, updates, filename="data/acList.json", case_insensitive=False):
|
||
"""
|
||
批量修改(文件根必须是数组,沿用 _read_json_list 的约定)
|
||
- updates:
|
||
dict: {"id1": {...}, "id2": {...}}
|
||
list[dict]: [{"anchorId":"id1", ...}, {"anchorId":"id2", ...}]
|
||
- case_insensitive: True 时用小写比较 anchorId
|
||
返回: {"updated": <int>, "missing": [ids...], "file": "<实际命中的路径>"}
|
||
"""
|
||
|
||
def norm_id(x: str) -> str:
|
||
s = str(x).strip()
|
||
return s.lower() if case_insensitive else s
|
||
|
||
# ✅ 关键:使用你已有的 _resolve_path,避免受 cwd 影响
|
||
file_path = cls._resolve_path(filename)
|
||
|
||
data = cls._read_json_list(file_path)
|
||
if not data:
|
||
return {"updated": 0, "missing": cls._collect_all_ids(updates), "file": str(file_path)}
|
||
|
||
# 1) 归一化 updates -> map[normalized_id] = patch
|
||
upd_map = {}
|
||
raw_ids = [] # 保留原始传入 id,用于返回 missing 时回显
|
||
if isinstance(updates, dict):
|
||
for aid, patch in updates.items():
|
||
if aid and isinstance(patch, dict):
|
||
key = norm_id(aid)
|
||
raw_ids.append(str(aid))
|
||
patch = {k: v for k, v in patch.items() if k != "anchorId"}
|
||
if patch:
|
||
upd_map[key] = {**upd_map.get(key, {}), **patch}
|
||
elif isinstance(updates, list):
|
||
for it in updates:
|
||
if isinstance(it, dict) and it.get("anchorId"):
|
||
rid = str(it["anchorId"])
|
||
key = norm_id(rid)
|
||
raw_ids.append(rid)
|
||
patch = {k: v for k, v in it.items() if k != "anchorId"}
|
||
if patch:
|
||
upd_map[key] = {**upd_map.get(key, {}), **patch}
|
||
|
||
if not upd_map:
|
||
return {"updated": 0, "missing": [], "file": str(file_path)}
|
||
|
||
# 2) 建索引:map[normalized_id] -> item
|
||
index = {}
|
||
for item in data:
|
||
if isinstance(item, dict) and "anchorId" in item:
|
||
key = norm_id(item.get("anchorId", ""))
|
||
if key:
|
||
index[key] = item
|
||
|
||
# 3) 执行更新
|
||
updated, seen = 0, set()
|
||
for key, patch in upd_map.items():
|
||
target = index.get(key)
|
||
if target is not None:
|
||
target.update(patch)
|
||
updated += 1
|
||
seen.add(key)
|
||
|
||
# 4) 写回
|
||
if updated > 0:
|
||
cls._write_json_list(file_path, data)
|
||
|
||
# 5) 计算未命中(按传入原始 ID 回显)
|
||
missing = []
|
||
for rid in raw_ids:
|
||
if norm_id(rid) not in seen:
|
||
missing.append(rid)
|
||
|
||
return {"updated": updated, "missing": missing, "file": str(file_path)}
|
||
|
||
@staticmethod
|
||
def _collect_all_ids(updates):
|
||
ids = []
|
||
if isinstance(updates, dict):
|
||
ids = [str(k) for k in updates.keys()]
|
||
elif isinstance(updates, list):
|
||
for it in updates:
|
||
if isinstance(it, dict) and it.get("anchorId"):
|
||
ids.append(str(it["anchorId"]))
|
||
return ids
|
||
|
||
@classmethod
|
||
def delete_anchors_by_ids(cls, ids: list[str], filename: str = "acList.json") -> int:
|
||
"""
|
||
根据 anchorId 列表,从项目根目录/data/acList.json 中删除匹配的 anchor。
|
||
- ids: 要删除的 anchorId 列表
|
||
- filename: 默认为 acList.json,可以传文件名或绝对路径
|
||
返回:删除数量
|
||
"""
|
||
# 计算文件路径
|
||
root_dir = Path(__file__).resolve().parent.parent
|
||
if Path(filename).is_absolute():
|
||
file_path = Path(filename)
|
||
else:
|
||
file_path = root_dir / "data" / filename
|
||
|
||
if not file_path.exists():
|
||
return 0
|
||
|
||
# 读取 JSON 文件
|
||
try:
|
||
with open(file_path, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
if not isinstance(data, list):
|
||
return 0
|
||
except Exception as e:
|
||
LogManager.error(f"[delete_anchors_by_ids] 读取失败: {e}")
|
||
return 0
|
||
|
||
before = len(data)
|
||
|
||
# 保留不在 ids 里的对象
|
||
data = [d for d in data if isinstance(d, dict) and d.get("anchorId") not in ids]
|
||
deleted = before - len(data)
|
||
|
||
# 写回 JSON 文件
|
||
try:
|
||
file_path.parent.mkdir(parents=True, exist_ok=True)
|
||
with open(file_path, "w", encoding="utf-8") as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
except Exception as e:
|
||
LogManager.error(f"[delete_anchors_by_ids] 写入失败: {e}")
|
||
|
||
return deleted
|
||
|
||
# -------- 查看第一个(取出但不删除) --------
|
||
@staticmethod
|
||
def _resolve_path(p) -> Path:
|
||
p = Path(p)
|
||
if p.is_absolute():
|
||
return p
|
||
# 以项目根目录 (iOSAI) 为基准 —— script 的上一级
|
||
base = Path(__file__).resolve().parents[1]
|
||
return (base / p).resolve()
|
||
|
||
@classmethod
|
||
def peek_aclist_first(cls, filename="data/acList.json"):
|
||
file_path = cls._resolve_path(filename)
|
||
if not file_path.exists():
|
||
print(f"[peek] 文件不存在: {file_path}")
|
||
return None
|
||
try:
|
||
raw = file_path.read_text(encoding="utf-8-sig").strip()
|
||
if not raw:
|
||
return None
|
||
data = json.loads(raw)
|
||
arr = data if isinstance(data, list) else data.get("anchorList") if isinstance(data, dict) else None
|
||
if not arr:
|
||
return None
|
||
first = arr[0]
|
||
norm = cls._normalize_anchor_items(first)
|
||
return norm[0] if norm else None
|
||
except Exception as e:
|
||
print(f"[peek] 读取失败: {e}")
|
||
return None
|
||
|
||
@staticmethod
|
||
def run_tidevice_command(udid, action, bundle_id, timeout=30):
|
||
"""
|
||
执行tidevice命令的辅助函数
|
||
|
||
:param udid: 设备UDID
|
||
:param action: 动作类型 ('kill' 或 'launch')
|
||
:param bundle_id: 应用的Bundle ID
|
||
:param timeout: 命令执行超时时间(秒)
|
||
:return: (bool) 成功返回True,失败返回False
|
||
"""
|
||
# 构建命令列表
|
||
cmd = ["tidevice", "--udid", udid, action, bundle_id]
|
||
try:
|
||
# 执行命令并捕获输出
|
||
result = subprocess.run(
|
||
cmd,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=timeout
|
||
)
|
||
# 检查命令是否成功执行(返回码为0通常表示成功)
|
||
if result.returncode == 0:
|
||
LogManager.info(f"Successfully {action}ed {bundle_id} on device {udid}.")
|
||
return True
|
||
else:
|
||
# 记录错误信息
|
||
LogManager.error(f"Failed to {action} {bundle_id} on device {udid}. Error: {result.stderr}")
|
||
return False
|
||
|
||
except subprocess.TimeoutExpired:
|
||
# 处理命令执行超时
|
||
LogManager.error(f"Command 'tidevice {action}' timed out after {timeout} seconds for device {udid}.")
|
||
return False
|
||
except FileNotFoundError:
|
||
# 处理tidevice命令未找到的情况(通常意味着tidevice未安装或不在PATH中)
|
||
LogManager.error(
|
||
"The 'tidevice' command was not found. Please ensure it is installed and in your system PATH.")
|
||
return False
|
||
except Exception as e:
|
||
# 捕获其他可能异常
|
||
LogManager.error(f"An unexpected error occurred while trying to {action} the app: {e}")
|
||
return False
|
||
|
||
@classmethod
|
||
def kill_wda(cls, udid, bundle_id="com.yolozsAgent.wda.xctrunner"):
|
||
"""
|
||
杀死指定设备上的WDA应用
|
||
|
||
:param udid: 设备UDID
|
||
:param bundle_id: WDA的Bundle ID,默认为 com.yolozsAgent.wda.xctrunner
|
||
:return: (bool) 成功返回True,失败返回False
|
||
"""
|
||
return cls.run_tidevice_command(udid, "kill", bundle_id)
|
||
|
||
@classmethod
|
||
def launch_wda(cls, udid, bundle_id="com.yolozsAgent.wda.xctrunner", timeout=60):
|
||
"""
|
||
启动指定设备上的WDA应用
|
||
|
||
:param udid: 设备UDID
|
||
:param bundle_id: WDA的Bundle ID,默认为 com.yolozsAgent.wda.xctrunner
|
||
:param timeout: 启动命令超时时间,默认为60秒(启动可能较慢)
|
||
:return: (bool) 成功返回True,失败返回False
|
||
"""
|
||
return cls.run_tidevice_command(udid, "launch", bundle_id, timeout)
|
||
|
||
|
||
|
||
@classmethod
|
||
def _screen_info(cls, udid: str):
|
||
try:
|
||
# 避免 c.home() 可能触发的阻塞,直接取 window_size
|
||
c = wda.USBClient(udid, wdaFunctionPort)
|
||
size = c.window_size()
|
||
print(f"[Screen] 成功获取屏幕 {int(size.width)}x{int(size.height)} {udid}")
|
||
return int(size.width), int(size.height), float(c.scale)
|
||
except Exception as e:
|
||
print(f"[Screen] 获取屏幕信息异常: {e} {udid}")
|
||
return 0, 0, 0.0 |