From a2287b47d537c82dc5dadd1da75cee1f3f439916 Mon Sep 17 00:00:00 2001 From: zhangkai <2403741920@qq.com> Date: Thu, 11 Sep 2025 18:49:40 +0800 Subject: [PATCH] =?UTF-8?q?20250904-=E5=88=9D=E6=AD=A5=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=E5=B7=B2=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .idea/workspace.xml | 43 +++++++--- Utils/AiUtils.py | 205 ++++++++++++++++++++++---------------------- 2 files changed, 132 insertions(+), 116 deletions(-) diff --git a/.idea/workspace.xml b/.idea/workspace.xml index 70bd987..de1f87b 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -5,15 +5,10 @@ - - + + - - - - - - + + + + + + + + + + + + + + + + @@ -266,8 +285,8 @@ - - + + \ No newline at end of file diff --git a/Utils/AiUtils.py b/Utils/AiUtils.py index f288e83..2f2aa60 100644 --- a/Utils/AiUtils.py +++ b/Utils/AiUtils.py @@ -298,110 +298,7 @@ class AiUtils(object): print(f"btn:{btn}") return cls.findNumber(btn.label) - # @classmethod - # def extract_messages_from_xml(cls, xml: str): - # """ - # 仅返回当前屏幕中“可见的”聊天内容(含时间分隔) - # """ - # from lxml import etree - # root = etree.fromstring(xml.encode("utf-8")) - # items = [] - # - # # 屏幕宽度 - # app = root.xpath('/XCUIElementTypeApplication') - # screen_w = cls.parse_float(app[0], 'width', 414.0) if app else 414.0 - # - # # 找 Table 的可见范围 - # table = root.xpath('//XCUIElementTypeTable') - # if table: - # table = table[0] - # table_top = cls.parse_float(table, 'y', 0.0) - # table_h = cls.parse_float(table, 'height', 0.0) - # table_bottom = table_top + table_h - # else: - # table_top, table_bottom = 0.0, cls.parse_float(app[0], 'height', 736.0) if app else 736.0 - # - # def in_view(el) -> bool: - # """元素在聊天区内并且可见""" - # if el.get('visible') != 'true': - # return False - # y = cls.parse_float(el, 'y', -1e9) - # h = cls.parse_float(el, 'height', 0.0) - # by = y + h - # return not (by <= table_top or y >= table_bottom) - # - # # 时间分隔 - # for t in root.xpath('//XCUIElementTypeStaticText[contains(@traits, "Header")]'): - # if not in_view(t): - # continue - # txt = (t.get('label') or t.get('name') or t.get('value') or '').strip() - # if txt: - # items.append({'type': 'time', 'text': txt, 'y': cls.parse_float(t, 'y')}) - # - # # 消息气泡 - # EXCLUDES = {'Heart', 'Lol', 'ThumbsUp', '分享发布内容', '视频贴纸标签页', '双击发送表情'} - # - # # —— 新增:系统横幅/提示卡片过滤(只文本判断,最小改动)—— - # SYSTEM_BANNER_PATTERNS = [ - # r"回复时接收通知", r"开启私信通知", r"开启通知", - # r"Turn on (DM|message|direct message)?\s*notifications", - # r"Enable notifications", - # r"Get notified when .* replies", - # ] - # SYSTEM_BANNER_REGEX = re.compile("|".join(SYSTEM_BANNER_PATTERNS), re.IGNORECASE) - # - # msg_nodes = table.xpath( - # './/XCUIElementTypeCell[@visible="true"]' - # '//XCUIElementTypeOther[@visible="true" and (@name or @label) and not(ancestor::XCUIElementTypeCollectionView)]' - # ) if table is not None else [] - # - # for o in msg_nodes: - # # 这里补上 value,避免少数节点只在 value 上有文本时漏读 - # text = (o.get('label') or o.get('name') or o.get('value') or '').strip() - # if not text or text in EXCLUDES: - # continue - # # 命中 TikTok 自带的“开启通知/回复时接收通知”类提示 → 直接剔除 - # if SYSTEM_BANNER_REGEX.search(text): - # continue - # if not in_view(o): - # continue - # - # # 找所在 Cell - # cell = o.getparent() - # while cell is not None and cell.get('type') != 'XCUIElementTypeCell': - # cell = cell.getparent() - # - # x = cls.parse_float(o, 'x') - # y = cls.parse_float(o, 'y') - # w = cls.parse_float(o, 'width') - # right_edge = x + w - # - # direction = None - # # 头像位置判定 - # if cell is not None: - # avatar_btns = cell.xpath( - # './/XCUIElementTypeButton[@visible="true" and (@name="图片头像" or @label="图片头像")]') - # if avatar_btns: - # ax = cls.parse_float(avatar_btns[0], 'x') - # direction = 'in' if ax < (screen_w / 2) else 'out' - # # 右对齐兜底 - # if direction is None: - # direction = 'out' if right_edge > (screen_w - 20) else 'in' - # - # items.append({'type': 'msg', 'dir': direction, 'text': text, 'y': y}) - # - # # 排序 & 清理 - # items.sort(key=lambda i: i['y']) - # for it in items: - # it.pop('y', None) - # return items - # - # @classmethod - # def parse_float(cls, el, attr, default=0.0): - # try: - # return float(el.get(attr, default)) - # except Exception: - # return default + @classmethod def extract_messages_from_xml(cls, xml: str): @@ -813,3 +710,103 @@ class AiUtils(object): except Exception as e: LogManager.error(f"[delete_anchors_by_ids] 写入失败: {e}") return deleted + + @staticmethod + def _get_data_path(filename: str) -> str: + """ + 根据文件名生成 data 目录下的完整路径 + """ + base_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) # 当前项目根目录 + data_dir = os.path.join(base_dir, "data") + Path(data_dir).mkdir(parents=True, exist_ok=True) # 确保 data 目录存在 + return os.path.join(data_dir, filename) + + @staticmethod + def read_json(filename: str) -> dict: + """ + 读取 JSON 文件,返回字典 + 如果文件不存在,返回空字典 + """ + file_path = AiUtils._get_data_path(filename) + try: + if not os.path.exists(file_path): + return {} + with open(file_path, "r", encoding="utf-8") as f: + data = json.load(f) + return data if isinstance(data, dict) else {} + except Exception as e: + print(f"读取 JSON 文件失败: {e}") + return {} + + @staticmethod + def write_json(filename: str, data: dict, overwrite: bool = True) -> bool: + """ + 将字典写入 JSON 文件 + :param filename: 文件名(自动放在 data 目录下) + :param data: 要写入的字典 + :param overwrite: True=覆盖写,False=合并更新 + """ + file_path = AiUtils._get_data_path(filename) + try: + if not overwrite and os.path.exists(file_path): + # 读取旧数据 + with open(file_path, "r", encoding="utf-8") as f: + old_data = json.load(f) + if not isinstance(old_data, dict): + old_data = {} + # 合并 + old_data.update(data) + data = old_data + + # 覆盖写(写回最终的 data) + with open(file_path, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=4) + return True + except Exception as e: + print(f"写入 JSON 文件失败: {e}") + return False + + @staticmethod + def update_json(filename: str, new_data: dict) -> bool: + """ + 修改 JSON 文件: + - 如果 key 已存在,则修改其值 + - 如果 key 不存在,则新增 + """ + try: + # 读取旧数据(没有则为空字典) + data = AiUtils.read_json(filename) + if not isinstance(data, dict): + data = {} + + # 合并(有则修改,无则新增) + data.update(new_data) + + # 写回 + return AiUtils.write_json(filename, data) + except Exception as e: + print(f"更新 JSON 文件失败: {e}") + return False + + @staticmethod + def delete_json_key(filename: str, key: str) -> bool: + """ + 删除 JSON 文件中的某个 key + - 如果 key 存在则删除 + - 如果 key 不存在则不处理 + """ + try: + # 读取旧数据 + data = AiUtils.read_json(filename) + if not isinstance(data, dict): + data = {} + + # 删除指定 key + if key in data: + del data[key] + + # 写回文件 + return AiUtils.write_json(filename, data) + except Exception as e: + print(f"删除 JSON key 失败: {e}") + return False