20250904-初步功能已完成

This commit is contained in:
2025-09-11 18:49:40 +08:00
parent fcbcc2f6b6
commit a2287b47d5
2 changed files with 132 additions and 116 deletions

View File

@@ -298,110 +298,7 @@ class AiUtils(object):
print(f"btn:{btn}")
return cls.findNumber(btn.label)
# @classmethod
# def extract_messages_from_xml(cls, xml: str):
# """
# 仅返回当前屏幕中“可见的”聊天内容(含时间分隔)
# """
# from lxml import etree
# root = etree.fromstring(xml.encode("utf-8"))
# items = []
#
# # 屏幕宽度
# app = root.xpath('/XCUIElementTypeApplication')
# screen_w = cls.parse_float(app[0], 'width', 414.0) if app else 414.0
#
# # 找 Table 的可见范围
# table = root.xpath('//XCUIElementTypeTable')
# if table:
# table = table[0]
# table_top = cls.parse_float(table, 'y', 0.0)
# table_h = cls.parse_float(table, 'height', 0.0)
# table_bottom = table_top + table_h
# else:
# table_top, table_bottom = 0.0, cls.parse_float(app[0], 'height', 736.0) if app else 736.0
#
# def in_view(el) -> bool:
# """元素在聊天区内并且可见"""
# if el.get('visible') != 'true':
# return False
# y = cls.parse_float(el, 'y', -1e9)
# h = cls.parse_float(el, 'height', 0.0)
# by = y + h
# return not (by <= table_top or y >= table_bottom)
#
# # 时间分隔
# for t in root.xpath('//XCUIElementTypeStaticText[contains(@traits, "Header")]'):
# if not in_view(t):
# continue
# txt = (t.get('label') or t.get('name') or t.get('value') or '').strip()
# if txt:
# items.append({'type': 'time', 'text': txt, 'y': cls.parse_float(t, 'y')})
#
# # 消息气泡
# EXCLUDES = {'Heart', 'Lol', 'ThumbsUp', '分享发布内容', '视频贴纸标签页', '双击发送表情'}
#
# # —— 新增:系统横幅/提示卡片过滤(只文本判断,最小改动)——
# SYSTEM_BANNER_PATTERNS = [
# r"回复时接收通知", r"开启私信通知", r"开启通知",
# r"Turn on (DM|message|direct message)?\s*notifications",
# r"Enable notifications",
# r"Get notified when .* replies",
# ]
# SYSTEM_BANNER_REGEX = re.compile("|".join(SYSTEM_BANNER_PATTERNS), re.IGNORECASE)
#
# msg_nodes = table.xpath(
# './/XCUIElementTypeCell[@visible="true"]'
# '//XCUIElementTypeOther[@visible="true" and (@name or @label) and not(ancestor::XCUIElementTypeCollectionView)]'
# ) if table is not None else []
#
# for o in msg_nodes:
# # 这里补上 value避免少数节点只在 value 上有文本时漏读
# text = (o.get('label') or o.get('name') or o.get('value') or '').strip()
# if not text or text in EXCLUDES:
# continue
# # 命中 TikTok 自带的“开启通知/回复时接收通知”类提示 → 直接剔除
# if SYSTEM_BANNER_REGEX.search(text):
# continue
# if not in_view(o):
# continue
#
# # 找所在 Cell
# cell = o.getparent()
# while cell is not None and cell.get('type') != 'XCUIElementTypeCell':
# cell = cell.getparent()
#
# x = cls.parse_float(o, 'x')
# y = cls.parse_float(o, 'y')
# w = cls.parse_float(o, 'width')
# right_edge = x + w
#
# direction = None
# # 头像位置判定
# if cell is not None:
# avatar_btns = cell.xpath(
# './/XCUIElementTypeButton[@visible="true" and (@name="图片头像" or @label="图片头像")]')
# if avatar_btns:
# ax = cls.parse_float(avatar_btns[0], 'x')
# direction = 'in' if ax < (screen_w / 2) else 'out'
# # 右对齐兜底
# if direction is None:
# direction = 'out' if right_edge > (screen_w - 20) else 'in'
#
# items.append({'type': 'msg', 'dir': direction, 'text': text, 'y': y})
#
# # 排序 & 清理
# items.sort(key=lambda i: i['y'])
# for it in items:
# it.pop('y', None)
# return items
#
# @classmethod
# def parse_float(cls, el, attr, default=0.0):
# try:
# return float(el.get(attr, default))
# except Exception:
# return default
@classmethod
def extract_messages_from_xml(cls, xml: str):
@@ -813,3 +710,103 @@ class AiUtils(object):
except Exception as e:
LogManager.error(f"[delete_anchors_by_ids] 写入失败: {e}")
return deleted
@staticmethod
def _get_data_path(filename: str) -> str:
"""
根据文件名生成 data 目录下的完整路径
"""
base_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) # 当前项目根目录
data_dir = os.path.join(base_dir, "data")
Path(data_dir).mkdir(parents=True, exist_ok=True) # 确保 data 目录存在
return os.path.join(data_dir, filename)
@staticmethod
def read_json(filename: str) -> dict:
"""
读取 JSON 文件,返回字典
如果文件不存在,返回空字典
"""
file_path = AiUtils._get_data_path(filename)
try:
if not os.path.exists(file_path):
return {}
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
return data if isinstance(data, dict) else {}
except Exception as e:
print(f"读取 JSON 文件失败: {e}")
return {}
@staticmethod
def write_json(filename: str, data: dict, overwrite: bool = True) -> bool:
"""
将字典写入 JSON 文件
:param filename: 文件名(自动放在 data 目录下)
:param data: 要写入的字典
:param overwrite: True=覆盖写False=合并更新
"""
file_path = AiUtils._get_data_path(filename)
try:
if not overwrite and os.path.exists(file_path):
# 读取旧数据
with open(file_path, "r", encoding="utf-8") as f:
old_data = json.load(f)
if not isinstance(old_data, dict):
old_data = {}
# 合并
old_data.update(data)
data = old_data
# 覆盖写(写回最终的 data
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
return True
except Exception as e:
print(f"写入 JSON 文件失败: {e}")
return False
@staticmethod
def update_json(filename: str, new_data: dict) -> bool:
"""
修改 JSON 文件:
- 如果 key 已存在,则修改其值
- 如果 key 不存在,则新增
"""
try:
# 读取旧数据(没有则为空字典)
data = AiUtils.read_json(filename)
if not isinstance(data, dict):
data = {}
# 合并(有则修改,无则新增)
data.update(new_data)
# 写回
return AiUtils.write_json(filename, data)
except Exception as e:
print(f"更新 JSON 文件失败: {e}")
return False
@staticmethod
def delete_json_key(filename: str, key: str) -> bool:
"""
删除 JSON 文件中的某个 key
- 如果 key 存在则删除
- 如果 key 不存在则不处理
"""
try:
# 读取旧数据
data = AiUtils.read_json(filename)
if not isinstance(data, dict):
data = {}
# 删除指定 key
if key in data:
del data[key]
# 写回文件
return AiUtils.write_json(filename, data)
except Exception as e:
print(f"删除 JSON key 失败: {e}")
return False