Files
iOSAI/Utils/AiUtils.py

816 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
from pathlib import Path
import cv2
import numpy as np
import unicodedata
import wda
from Utils.LogManager import LogManager
import xml.etree.ElementTree as ET
import re, html
from lxml import etree
from wda import Client
# 工具类
class AiUtils(object):
# 在屏幕中找到对应的图片
@classmethod
def findImageInScreen(cls, target, udid):
try:
# 加载原始图像和模板图像
image_path = AiUtils.imagePathWithName(udid, "bgv") # 替换为你的图像路径
template_path = AiUtils.imagePathWithName("", target) # 替换为你的模板路径
# 读取图像和模板,确保它们都是单通道灰度图
image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
template = cv2.imread(template_path, cv2.IMREAD_GRAYSCALE)
if image is None:
LogManager.error("加载背景图失败")
return -1, -1
if template is None:
LogManager.error("加载模板图失败")
return -1, -1
# 获取模板的宽度和高度
w, h = template.shape[::-1]
# 使用模板匹配方法
res = cv2.matchTemplate(image, template, cv2.TM_CCOEFF_NORMED)
threshold = 0.7 # 匹配度阈值,可以根据需要调整
loc = np.where(res >= threshold)
# 检查是否有匹配结果
if loc[0].size > 0:
# 取第一个匹配位置
pt = zip(*loc[::-1]).__next__() # 获取第一个匹配点的坐标
center_x = int(pt[0] + w // 2)
center_y = int(pt[1] + h // 2)
# print(f"第一个匹配到的小心心中心坐标: ({center_x}, {center_y})")
return center_x, center_y
else:
return -1, -1
except Exception as e:
LogManager.error(f"加载素材失败:{e}", udid)
print(e)
return -1, -1
# 使用正则查找字符串中的数字
@classmethod
def findNumber(cls, str):
# 使用正则表达式匹配数字
match = re.search(r'\d+', str)
if match:
return int(match.group()) # 将匹配到的数字转换为整数
return None # 如果没有找到数字,返回 None
# 选择截图
@classmethod
def screenshot(cls):
client = wda.USBClient("eca000fcb6f55d7ed9b4c524055214c26a7de7aa")
session = client.session()
image = session.screenshot()
image_path = "screenshot.png"
image.save(image_path)
image = cv2.imread(image_path)
# 如果图像过大,缩小显示
scale_percent = 50 # 缩小比例
width = int(image.shape[1] * scale_percent / 100)
height = int(image.shape[0] * scale_percent / 100)
dim = (width, height)
resized_image = cv2.resize(image, dim, interpolation=cv2.INTER_AREA)
# 创建一个窗口并显示缩小后的图像
cv2.namedWindow("Image")
cv2.imshow("Image", resized_image)
print("请在图像上选择爱心图标区域然后按Enter键确认。")
# 使用selectROI函数手动选择区域
roi = cv2.selectROI("Image", resized_image, showCrosshair=True, fromCenter=False)
# 将ROI坐标按原始图像尺寸放大
x, y, w, h = roi
x = int(x * image.shape[1] / resized_image.shape[1])
y = int(y * image.shape[0] / resized_image.shape[0])
w = int(w * image.shape[1] / resized_image.shape[1])
h = int(h * image.shape[0] / resized_image.shape[0])
# 根据选择的ROI提取爱心图标
if w > 0 and h > 0: # 确保选择的区域有宽度和高度
heart_icon = image[y:y + h, x:x + w]
# 转换为HSV颜色空间
hsv = cv2.cvtColor(heart_icon, cv2.COLOR_BGR2HSV)
# 定义红色的HSV范围
lower_red1 = np.array([0, 120, 70])
upper_red1 = np.array([10, 255, 255])
lower_red2 = np.array([170, 120, 70])
upper_red2 = np.array([180, 255, 255])
# 创建掩模
mask1 = cv2.inRange(hsv, lower_red1, upper_red1)
mask2 = cv2.inRange(hsv, lower_red2, upper_red2)
mask = mask1 + mask2
# 反转掩模,因为我们想要的是爱心图标,而不是背景
mask_inv = cv2.bitwise_not(mask)
# 应用掩模
heart_icon = cv2.bitwise_and(heart_icon, heart_icon, mask=mask_inv)
# 创建一个全透明的背景
height, width, channels = heart_icon.shape
roi = np.zeros((height, width, channels), dtype=np.uint8)
# 将爱心图标粘贴到透明背景上
for c in range(channels):
roi[:, :, c] = np.where(mask_inv == 255, heart_icon[:, :, c], roi[:, :, c])
# 图片名称
imgName = "temp.png"
# 保存结果
cv2.imwrite(imgName, roi)
# 显示结果
cv2.imshow("Heart Icon with Transparent Background", roi)
cv2.waitKey(0)
cv2.destroyAllWindows()
else:
print("未选择有效区域。")
# 根据名称获取图片完整地址
@classmethod
def imagePathWithName(cls, udid, name):
current_file_path = os.path.abspath(__file__)
# 获取当前文件所在的目录即script目录
current_dir = os.path.dirname(current_file_path)
# 由于script目录位于项目根目录下一级因此需要向上一级目录移动两次
project_root = os.path.abspath(os.path.join(current_dir, '..'))
# 构建资源文件的完整路径,向上两级目录,然后进入 resources 目录
resource_path = os.path.abspath(os.path.join(project_root, "resources", udid, name + ".png")).replace('/', '\\')
return resource_path
# 获取根目录
@classmethod
def getRootDir(cls):
current_file = os.path.abspath(__file__)
# 获取当前文件所在的目录
current_dir = os.path.dirname(current_file)
# 获取项目根目录(假设根目录是当前文件的父目录的父目录)
project_root = os.path.dirname(current_dir)
# 返回根目录
return project_root
# 创建一个以udid命名的目录
@classmethod
def makeUdidDir(cls, udid):
# 获取项目根目录
home = cls.getRootDir()
# 拼接 resources 目录的路径
resources_dir = os.path.join(home, "resources")
# 拼接 udid 目录的路径
udid_dir = os.path.join(resources_dir, udid)
# 检查 udid 目录是否存在,如果不存在则创建
if not os.path.exists(udid_dir):
try:
os.makedirs(udid_dir)
LogManager.info(f"目录 {udid_dir} 创建成功", udid)
print(f"目录 {udid_dir} 创建成功")
except Exception as e:
print(f"创建目录时出错: {e}")
LogManager.error(f"创建目录时出错: {e}", udid)
else:
LogManager.info(f"目录 {udid_dir} 已存在,跳过创建", udid)
print(f"目录 {udid_dir} 已存在,跳过创建")
# 查找首页按钮
# uuid 设备id
# click 是否点击该按钮
@classmethod
def findHomeButton(cls, udid="eca000fcb6f55d7ed9b4c524055214c26a7de7aa"):
client = wda.USBClient(udid)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 10})
homeButton = session.xpath("//*[@label='首页']")
try:
if homeButton.label == "首页":
print("1.找到了")
return homeButton
else:
print("1.没找到")
return None
except Exception as e:
print(e)
return None
# 查找关闭按钮
@classmethod
def findLiveCloseButton(cls, udid="eca000fcb6f55d7ed9b4c524055214c26a7de7aa"):
client = wda.USBClient(udid)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 10})
r = session.xpath("//XCUIElementTypeButton[@name='关闭屏幕']")
try:
if r.label == "关闭屏幕":
return r
else:
return None
except Exception as e:
print(e)
return None
# 获取直播间窗口数量
@classmethod
def count_add_by_xml(cls, session):
xml = session.source()
root = ET.fromstring(xml)
return sum(
1 for e in root.iter()
if e.get('type') in ('XCUIElementTypeButton', 'XCUIElementTypeImage')
and (e.get('name') == '添加' or e.get('label') == '添加')
and (e.get('visible') in (None, 'true'))
)
# 获取关注按钮
@classmethod
def getFollowButton(cls, session: Client):
# followButton = session.xpath("//Window[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[2]/Other[2]/Other[1]/Other[1]/Other[3]/Other[1]/Other[1]/Button[1]")
followButton = session.xpath('//XCUIElementTypeButton[@name="关注" or @label="关注"]')
if followButton.exists:
print("2.关注找到了")
LogManager.info("2.关注找到了")
return followButton
else:
print("2.关注没找到")
print("2.关注没找到")
return None
# 查找发消息按钮
@classmethod
def getSendMesageButton(cls, session: Client):
msgButton = session.xpath(
'//XCUIElementTypeButton['
'(@name="发消息" or @label="发消息" or '
'@name="发送 👋" or @label="发送 👋" or '
'@name="消息" or @label="消息")'
' and @visible="true"]'
)
if msgButton.exists:
print("3.发消息按钮找到了")
LogManager.info("3.发消息按钮找到了")
return msgButton
else:
print("3.发消息按钮没找到")
LogManager.info("3.发消息按钮没找到")
return None
# 获取当前屏幕上的节点
@classmethod
def getCurrentScreenSource(cls):
client = wda.USBClient("eca000fcb6f55d7ed9b4c524055214c26a7de7aa")
print(client.source())
# 查找app主页上的收件箱按钮
@classmethod
def getMsgBoxButton(cls, session: Client):
# box = session.xpath("//XCUIElementTypeButton[name='a11y_vo_inbox']")
box = session(xpath='//XCUIElementTypeButton[@name="a11y_vo_inbox"]')
if box.exists:
return box
else:
return None
# 获取收件箱中的未读消息数
@classmethod
def getUnReadMsgCount(cls, session: Client):
btn = cls.getMsgBoxButton(session)
print(f"btn:{btn}")
return cls.findNumber(btn.label)
# @classmethod
# def extract_messages_from_xml(cls, xml: str):
# """
# 仅返回当前屏幕中“可见的”聊天内容(含时间分隔)
# """
# from lxml import etree
# root = etree.fromstring(xml.encode("utf-8"))
# items = []
#
# # 屏幕宽度
# app = root.xpath('/XCUIElementTypeApplication')
# screen_w = cls.parse_float(app[0], 'width', 414.0) if app else 414.0
#
# # 找 Table 的可见范围
# table = root.xpath('//XCUIElementTypeTable')
# if table:
# table = table[0]
# table_top = cls.parse_float(table, 'y', 0.0)
# table_h = cls.parse_float(table, 'height', 0.0)
# table_bottom = table_top + table_h
# else:
# table_top, table_bottom = 0.0, cls.parse_float(app[0], 'height', 736.0) if app else 736.0
#
# def in_view(el) -> bool:
# """元素在聊天区内并且可见"""
# if el.get('visible') != 'true':
# return False
# y = cls.parse_float(el, 'y', -1e9)
# h = cls.parse_float(el, 'height', 0.0)
# by = y + h
# return not (by <= table_top or y >= table_bottom)
#
# # 时间分隔
# for t in root.xpath('//XCUIElementTypeStaticText[contains(@traits, "Header")]'):
# if not in_view(t):
# continue
# txt = (t.get('label') or t.get('name') or t.get('value') or '').strip()
# if txt:
# items.append({'type': 'time', 'text': txt, 'y': cls.parse_float(t, 'y')})
#
# # 消息气泡
# EXCLUDES = {'Heart', 'Lol', 'ThumbsUp', '分享发布内容', '视频贴纸标签页', '双击发送表情'}
#
# # —— 新增:系统横幅/提示卡片过滤(只文本判断,最小改动)——
# SYSTEM_BANNER_PATTERNS = [
# r"回复时接收通知", r"开启私信通知", r"开启通知",
# r"Turn on (DM|message|direct message)?\s*notifications",
# r"Enable notifications",
# r"Get notified when .* replies",
# ]
# SYSTEM_BANNER_REGEX = re.compile("|".join(SYSTEM_BANNER_PATTERNS), re.IGNORECASE)
#
# msg_nodes = table.xpath(
# './/XCUIElementTypeCell[@visible="true"]'
# '//XCUIElementTypeOther[@visible="true" and (@name or @label) and not(ancestor::XCUIElementTypeCollectionView)]'
# ) if table is not None else []
#
# for o in msg_nodes:
# # 这里补上 value避免少数节点只在 value 上有文本时漏读
# text = (o.get('label') or o.get('name') or o.get('value') or '').strip()
# if not text or text in EXCLUDES:
# continue
# # 命中 TikTok 自带的“开启通知/回复时接收通知”类提示 → 直接剔除
# if SYSTEM_BANNER_REGEX.search(text):
# continue
# if not in_view(o):
# continue
#
# # 找所在 Cell
# cell = o.getparent()
# while cell is not None and cell.get('type') != 'XCUIElementTypeCell':
# cell = cell.getparent()
#
# x = cls.parse_float(o, 'x')
# y = cls.parse_float(o, 'y')
# w = cls.parse_float(o, 'width')
# right_edge = x + w
#
# direction = None
# # 头像位置判定
# if cell is not None:
# avatar_btns = cell.xpath(
# './/XCUIElementTypeButton[@visible="true" and (@name="图片头像" or @label="图片头像")]')
# if avatar_btns:
# ax = cls.parse_float(avatar_btns[0], 'x')
# direction = 'in' if ax < (screen_w / 2) else 'out'
# # 右对齐兜底
# if direction is None:
# direction = 'out' if right_edge > (screen_w - 20) else 'in'
#
# items.append({'type': 'msg', 'dir': direction, 'text': text, 'y': y})
#
# # 排序 & 清理
# items.sort(key=lambda i: i['y'])
# for it in items:
# it.pop('y', None)
# return items
#
# @classmethod
# def parse_float(cls, el, attr, default=0.0):
# try:
# return float(el.get(attr, default))
# except Exception:
# return default
@classmethod
def extract_messages_from_xml(cls, xml: str):
"""
解析 TikTok 聊天 XML返回当前屏幕可见的消息与时间分隔
[{"type":"time","text":"..."}, {"type":"msg","dir":"in|out","text":"..."}]
兼容 Table / CollectionView / ScrollView过滤系统提示/底部工具栏;可见性使用“重叠可视+容差”。
"""
if not isinstance(xml, str) or not xml.strip():
return []
try:
root = etree.fromstring(xml.encode("utf-8"))
except Exception:
return []
# ---------- 小工具 ----------
def get_text(el):
s = (el.get('label') or el.get('name') or el.get('value') or '') or ''
return html.unescape(s.strip())
def is_visible(el):
"""无 visible 属性按可见处理;有且为 'false' 才视为不可见。"""
v = el.get('visible')
return (v is None) or (v.lower() == 'true')
# ---------- 屏幕尺寸 ----------
app = root.xpath('/XCUIElementTypeApplication')
screen_w = cls.parse_float(app[0], 'width', 414.0) if app else 414.0
screen_h = cls.parse_float(app[0], 'height', 736.0) if app else 736.0
# ---------- 主容器探测(评分选择最像聊天区的容器) ----------
def pick_container():
cands = []
for xp, ctype in (
('//XCUIElementTypeTable', 'table'),
('//XCUIElementTypeCollectionView', 'collection'),
('//XCUIElementTypeScrollView', 'scroll'),
):
nodes = [n for n in root.xpath(xp) if is_visible(n)]
for n in nodes:
y = cls.parse_float(n, 'y', 0.0)
h = cls.parse_float(n, 'height', screen_h)
# Cell 数越多越像聊天列表;越靠中间越像
cells = n.xpath('.//XCUIElementTypeCell')
score = len(cells) * 10 - abs((y + h / 2) - screen_h / 2)
cands.append((score, n, ctype))
if cands:
cands.sort(key=lambda t: t[0], reverse=True)
return cands[0][1], cands[0][2]
return None, None
container, container_type = pick_container()
# ---------- 可视区area_top, area_bot ----------
if container is not None:
area_top = cls.parse_float(container, 'y', 0.0)
area_h = cls.parse_float(container, 'height', screen_h)
area_bot = area_top + area_h
else:
# 顶栏底缘作为上边界(选最靠上的宽>200的块
blocks = [n for n in root.xpath('//XCUIElementTypeOther[@y and @height and @width>="200"]') if
is_visible(n)]
area_top = 0.0
if blocks:
blocks.sort(key=lambda n: cls.parse_float(n, 'y', 0.0))
b = blocks[0]
area_top = cls.parse_float(b, 'y', 0.0) + cls.parse_float(b, 'height', 0.0)
# 输入框 TextView 顶边作为下边界
tvs = [n for n in root.xpath('//XCUIElementTypeTextView') if is_visible(n)]
if tvs:
tvs.sort(key=lambda n: cls.parse_float(n, 'y', 0.0))
area_bot = cls.parse_float(tvs[-1], 'y', screen_h)
else:
area_bot = screen_h
if area_bot - area_top < 100:
area_top, area_bot = 0.0, screen_h
def in_view(el) -> bool:
if not is_visible(el):
return False
y = cls.parse_float(el, 'y', -1e9)
h = cls.parse_float(el, 'height', 0.0)
by = y + h
tol = 8.0 # 容差,避免边缘误判
return not (by <= area_top + tol or y >= area_bot - tol)
# ---------- 时间分隔Header ----------
items = []
for t in root.xpath('//XCUIElementTypeStaticText[contains(@traits, "Header")]'):
if not in_view(t):
continue
txt = get_text(t)
if txt:
items.append({'type': 'time', 'text': txt, 'y': cls.parse_float(t, 'y', 0.0)})
# ---------- 系统提示/横幅过滤 ----------
EXCLUDES_LITERAL = {
'Heart', 'Lol', 'ThumbsUp',
'分享发布内容', '视频贴纸标签页', '双击发送表情', '贴纸',
}
SYSTEM_PATTERNS = [
r"回复时接收通知", r"开启(私信)?通知", r"开启通知",
r"你打开了这个与 .* 的聊天。.*隐私",
r"在此用户接受你的消息请求之前,你最多只能发送 ?\d+ 条消息。?",
r"聊天消息条数已达上限,你将无法向该用户发送消息。?",
r"未发送$",
r"Turn on (DM|message|direct message)?\s*notifications",
r"Enable notifications",
r"Get notified when .* replies",
r"You opened this chat .* privacy",
r"Only \d+ message can be sent .* accepts .* request",
]
SYSTEM_RE = re.compile("|".join(SYSTEM_PATTERNS), re.IGNORECASE)
# 排除底部贴纸/GIF/分享栏(通常是位于底部、较矮的一排 CollectionView
def is_toolbar_like(o) -> bool:
txt = get_text(o)
if txt in EXCLUDES_LITERAL:
return True
y = cls.parse_float(o, 'y', 0.0)
h = cls.parse_float(o, 'height', 0.0)
near_bottom = (area_bot - (y + h)) < 48
is_short = h <= 40
return near_bottom and is_short
# ---------- 收集消息候选 ----------
msg_nodes = []
if container is not None:
# 容器内优先找 Cell 下的文本节点Other/StaticText/TextView
cand = container.xpath(
'.//XCUIElementTypeCell//*[self::XCUIElementTypeOther or self::XCUIElementTypeStaticText or self::XCUIElementTypeTextView]'
'[@y and (@name or @label or @value)]'
)
for o in cand:
if not in_view(o):
continue
if is_toolbar_like(o):
continue
txt = get_text(o)
if not txt or SYSTEM_RE.search(txt):
continue
msg_nodes.append(o)
else:
# 全局兜底:排除直接挂在 CollectionView底部工具栏下的节点
cand = root.xpath(
'//XCUIElementTypeOther[@y and (@name or @label or @value)]'
' | //XCUIElementTypeStaticText[@y and (@name or @label or @value)]'
' | //XCUIElementTypeTextView[@y and (@name or @label or @value)]'
)
for o in cand:
p = o.getparent()
if p is not None and p.get('type') == 'XCUIElementTypeCollectionView':
continue
if not in_view(o) or is_toolbar_like(o):
continue
txt = get_text(o)
if not txt or SYSTEM_RE.search(txt):
continue
msg_nodes.append(o)
# ---------- 方向判定 & 组装 ----------
for o in msg_nodes:
txt = get_text(o)
if not txt or txt in EXCLUDES_LITERAL:
continue
# 找所在 Cell用于查头像
cell = o.getparent()
while cell is not None and cell.get('type') != 'XCUIElementTypeCell':
cell = cell.getparent()
x = cls.parse_float(o, 'x', 0.0)
y = cls.parse_float(o, 'y', 0.0)
w = cls.parse_float(o, 'width', 0.0)
right_edge = x + w
direction = None
if cell is not None:
avatars = [a for a in cell.xpath(
'.//XCUIElementTypeButton[@visible="true" and (@name="图片头像" or @label="图片头像")]'
) if is_visible(a)]
if avatars:
ax = cls.parse_float(avatars[0], 'x', 0.0)
direction = 'in' if ax < (screen_w / 2) else 'out'
if direction is None:
direction = 'out' if right_edge > (screen_w * 0.75) else 'in'
items.append({'type': 'msg', 'dir': direction, 'text': txt, 'y': y})
# ---------- 排序 & 收尾 ----------
if items:
items.sort(key=lambda i: i.get('y', 0.0))
for it in items:
it.pop('y', None)
return items
@classmethod
def parse_float(cls, el, attr, default=0.0):
try:
v = el.get(attr)
if v is None:
return default
return float(v)
except Exception:
return default
# 从导航栏读取主播名称。找不到时返回空字符串
@classmethod
def get_navbar_anchor_name(cls, session, timeout: float = 2.0) -> str:
"""只从导航栏读取主播名称。找不到时返回空字符串。"""
# 可选:限制快照深度,提升解析速度/稳定性
try:
session.appium_settings({"snapshotMaxDepth": 22})
except Exception:
pass
def _text_of(el) -> str:
info = getattr(el, "info", {}) or {}
return (info.get("label") or info.get("name") or info.get("value") or "").strip()
def _clean_tail(s: str) -> str:
return re.sub(r"[,、,。.\s]+$", "", s).strip()
# 导航栏容器:从“返回”按钮向上找最近祖先,且该祖先内包含“更多/举报”(多语言兜底)
NAV_CONTAINER = (
"//XCUIElementTypeButton"
"[@name='返回' or @label='返回' or @name='Back' or @label='Back' or @name='戻る' or @label='戻る']"
"/ancestor::XCUIElementTypeOther"
"[ .//XCUIElementTypeButton"
" [@name='更多' or @label='更多' or @name='More' or @label='More' or "
" @name='その他' or @label='その他' or @name='詳細' or @label='詳細' or "
" @name='举报' or @label='举报' or @name='Report' or @label='Report' or "
" @name='報告' or @label='報告']"
"][1]"
)
# ① 优先:可访问的 Other自身有文本且不含子 Button更贴近 TikTok 的实现
XPATH_TITLE_OTHER = (
NAV_CONTAINER +
"//XCUIElementTypeOther[@accessible='true' and count(.//XCUIElementTypeButton)=0 "
" and (string-length(@name)>0 or string-length(@label)>0 or string-length(@value)>0)"
"][1]"
)
# ② 退路:第一个 StaticText
XPATH_TITLE_STATIC = (
NAV_CONTAINER +
"//XCUIElementTypeStaticText[string-length(@value)>0 or string-length(@label)>0 or string-length(@name)>0][1]"
)
# 尝试 ①
q = session.xpath(XPATH_TITLE_OTHER)
if q.wait(timeout):
t = _clean_tail(_text_of(q.get()))
if t:
return t
# 尝试 ②
q2 = session.xpath(XPATH_TITLE_STATIC)
if q2.wait(1.0):
t = _clean_tail(_text_of(q2.get()))
if t:
return t
return ""
# 检查字符串中是否包含中文
@classmethod
def contains_chinese(cls, text):
"""
判断字符串中是否包含中文字符。
参数:
text (str): 要检测的字符串。
返回:
bool: 如果字符串中包含中文,返回 True否则返回 False。
"""
# 使用正则表达式匹配中文字符
pattern = re.compile(r'[\u4e00-\u9fff]')
return bool(pattern.search(text))
@classmethod
def is_language(cls, text: str) -> bool:
if not text:
return False
for ch in text:
if unicodedata.category(ch).startswith("L"):
return True
return False
@classmethod
def _read_json_list(cls, file_path: Path) -> list:
"""读取为 list读取失败或不是 list 则返回空数组"""
if not file_path.exists():
return []
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
return data if isinstance(data, list) else []
except Exception as e:
LogManager.error(f"[acList] 读取失败,将按空数组处理: {e}")
return []
@classmethod
def _write_json_list(cls, file_path: Path, data: list) -> None:
file_path.parent.mkdir(parents=True, exist_ok=True)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
@staticmethod
def _normalize_anchor_items(items):
"""
规范化输入为 [{anchorId, country}] 的列表:
- 允许传入:单个对象、对象列表、字符串(当 anchorId 用)
- 过滤不合规项
"""
result = []
if items is None:
return result
if isinstance(items, dict):
# 单个对象
aid = items.get("anchorId")
if aid:
result.append({"anchorId": str(aid), "country": items.get("country", "")})
return result
if isinstance(items, list):
for it in items:
if isinstance(it, dict):
aid = it.get("anchorId")
if aid:
result.append({"anchorId": str(aid), "country": it.get("country", "")})
elif isinstance(it, str):
result.append({"anchorId": it, "country": ""})
return result
if isinstance(items, str):
result.append({"anchorId": items, "country": ""})
return result
# -------- 追加(对象数组平铺追加) --------
@classmethod
def save_aclist_flat_append(cls, acList, filename="log/acList.json"):
"""
将 anchor 对象数组平铺追加到 JSON 文件(数组)中。
期望 acList 形如:
[
{"anchorId": "ldn327_", "country": ""},
{"anchorId": "tianliang30", "country": ""}
]
"""
file_path = Path(filename)
data = cls._read_json_list(file_path)
# 规范化输入,确保都是 {anchorId, country}
to_add = cls._normalize_anchor_items(acList)
if not to_add:
LogManager.info("[acList] 传入为空或不合规,跳过写入")
return
data.extend(to_add)
cls._write_json_list(file_path, data)
LogManager.info(f"[acList] 已追加 {len(to_add)} 条,当前总数={len(data)} -> {file_path}")
# -------- 弹出(取一个删一个) --------
@classmethod
def pop_aclist_first(cls, filename="log/acList.json"):
"""
从 JSON 数组中取出第一个 anchor 对象,并删除它;为空或文件不存在返回 None。
返回形如:{"anchorId": "...", "country": "..."}
"""
file_path = Path(filename)
data = cls._read_json_list(file_path)
if not data:
return None
first = data.pop(0)
# 兜底保证结构
norm = cls._normalize_anchor_items(first)
first = norm[0] if norm else None
cls._write_json_list(file_path, data)
return first
@classmethod
def delete_anchors_by_ids(cls, ids: list[str], filename="log/acList.json") -> int:
"""
根据 anchorId 列表从 JSON 文件中删除匹配的 anchor。
返回删除数量。
"""
file_path = Path(filename)
if not file_path.exists():
return 0
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, list):
return 0
except Exception as e:
LogManager.error(f"[delete_anchors_by_ids] 读取失败: {e}")
return 0
before = len(data)
# 保留不在 ids 里的对象
data = [d for d in data if isinstance(d, dict) and d.get("anchorId") not in ids]
deleted = before - len(data)
try:
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
LogManager.error(f"[delete_anchors_by_ids] 写入失败: {e}")
return deleted