同步代码。修复若干bug

This commit is contained in:
2025-11-06 22:26:27 +08:00
parent 9ed5602b86
commit a9c9e39143
2 changed files with 425 additions and 38 deletions

View File

@@ -1128,6 +1128,389 @@ class ScriptManager():
LogManager.method_info("TikTok 重启成功", "监控消息", udid)
continue # 重新进入 while 循环,调用 monitorMessages
# def monitorMessages(self, session, udid, event):
#
# LogManager.method_info("脚本开始执行中", "监控消息")
#
# # 调整节点的深度为 7
# session.appium_settings({"snapshotMaxDepth": 7})
#
# el = session.xpath(
# '//XCUIElementTypeButton[@name="a11y_vo_inbox"]'
# ' | '
# '//XCUIElementTypeButton[contains(@name,"收件箱")]'
# ' | '
# '//XCUIElementTypeButton[.//XCUIElementTypeStaticText[@value="收件箱"]]'
# )
#
# # 如果收件箱有消息 则进行点击
# if el.exists:
# try:
# m = re.search(r'(\d+)', el.label) # 抓到的第一个数字串
# except Exception as e:
# LogManager.method_error(f"解析收件箱数量异常: {e}", "检测消息", udid)
#
# count = int(m.group(1)) if m else 0
#
# if count:
# el.click()
# session.appium_settings({"snapshotMaxDepth": 25})
# event.wait(timeout=3)
# while True:
# print("循环开始")
# info_count = 0
# # 创建新的会话
# el = session.xpath(
# '//XCUIElementTypeButton[@name="a11y_vo_inbox"]'
# ' | '
# '//XCUIElementTypeButton[contains(@name,"收件箱")]'
# ' | '
# '//XCUIElementTypeButton[.//XCUIElementTypeStaticText[@value="收件箱"]]'
# )
#
# if not el.exists:
# LogManager.method_error(f"检测不到收件箱", "检测消息", udid)
# raise Exception("当前页面找不到收件箱,重启")
# # break
#
# # 支持中文“收件箱”和英文“Inbox”
# xpath_query = (
# "//XCUIElementTypeStaticText"
# "[@value='收件箱' or @label='收件箱' or @name='收件箱'"
# " or @value='Inbox' or @label='Inbox' or @name='Inbox']"
# )
#
# # 查找所有收件箱节点
# inbox_nodes = session.xpath(xpath_query).find_elements()
# if len(inbox_nodes) < 2:
# LogManager.method_error(f"当前页面不再收件箱页面,重启", "检测消息", udid)
# raise Exception("当前页面不再收件箱页面,重启")
#
# m = re.search(r'(\d+)', el.label) # 抓到的第一个数字串
# count = int(m.group(1)) if m else 0
#
# if not count:
# LogManager.method_info(f"当前收件箱的总数量{count}", "检测消息", udid)
# break
#
# # 新粉丝
# xp_new_fan_badge = (
# "//XCUIElementTypeCell[.//XCUIElementTypeLink[@name='新粉丝']]"
# "//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
# )
#
# # 活动
# xp_activity_badge = (
# "//XCUIElementTypeCell[.//XCUIElementTypeLink[@name='活动']]"
# "//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
# )
#
# # 系统通知
# xp_system_badge = (
# "//XCUIElementTypeCell[.//XCUIElementTypeLink[@name='系统通知']]"
# "//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
# )
#
# # 消息请求
# xp_request_badge = (
# "//XCUIElementTypeCell"
# "[.//*[self::XCUIElementTypeLink or self::XCUIElementTypeStaticText]"
# " [@name='消息请求' or @label='消息请求' or @value='消息请求']]"
# "//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
# )
#
# # 用户消息
# xp_badge_numeric = (
# "("
# # 你的两类未读容器组件 + 数字徽标value 纯数字)
# "//XCUIElementTypeOther["
# " @name='AWEIMChatListCellUnreadCountViewComponent'"
# " or @name='TikTokIMImpl.InboxCellUnreadCountViewBuilder'"
# "]//XCUIElementTypeStaticText[@value and translate(@value,'0123456789','')='']"
# ")/ancestor::XCUIElementTypeCell[1]"
# " | "
# # 兜底:任何在 CollectionView 下、value 纯数字的徽标 → 找其最近的 Cell
# "//XCUIElementTypeCollectionView//XCUIElementTypeStaticText"
# "[@value and translate(@value,'0123456789','')='']"
# "/ancestor::XCUIElementTypeCell[1]"
# )
#
# try:
# # 如果 2 秒内找不到,会抛异常
# user_text = session.xpath(xp_badge_numeric).get(timeout=2.0)
# val = (user_text.info.get("value") or
# user_text.info.get("label") or
# user_text.info.get("name"))
# LogManager.method_info(f"用户未读数量:{val}", "检测消息", udid)
# except Exception:
# LogManager.method_warning("当前屏幕没有找到 用户 未读徽标数字", "检测消息", udid)
# print("当前屏幕没有找到 用户消息 未读徽标数字", udid)
# user_text = None
# info_count += 1
#
# if user_text:
#
# user_text.tap()
# event.wait(timeout=3)
#
#
#
# xml = session.source()
# time.sleep(1)
# msgs = AiUtils.extract_messages_from_xml(xml)
# time.sleep(1)
#
# last_in = None # 对方最后一句
# last_out = None # 我方最后一句
# attempt = 0 # 已试次数(首次算第 1 次)
#
# while attempt < 3:
# # 1. 本轮扫描
# for item in reversed(msgs):
# if item.get('type') != 'msg':
# continue
# if last_in is None and item['dir'] == 'in':
# last_in = item['text']
# if last_out is None and item['dir'] == 'out':
# last_out = item['text']
# if last_in and last_out:
# break
#
# # 2. 有一条为空就重试
# if not last_in or not last_out:
# attempt += 1
# if attempt == 3:
# break # 三次用完,放弃
# time.sleep(0.2)
# xml = session.source()
# msgs = AiUtils.extract_messages_from_xml(xml)
# continue
# else:
# break # 至少一条有内容,成功退出
#
# LogManager.method_info(f"检测到对方最后发送的消息:{last_in}", "检测消息", udid)
# LogManager.method_info(f"检测我发送的最后一条信息:{last_out}", "检测消息", udid)
#
#
# # 获取主播的名称
# # anchor_name = AiUtils.get_navbar_anchor_name(session)
# anchor_name = ""
# for _ in range(3): # 最多 2 次重试 + 1 次初始
# anchor_name = AiUtils.get_navbar_anchor_name(session)
# if anchor_name:
# break
# time.sleep(1)
#
# LogManager.method_info(f"获取主播的名称:{anchor_name}", "检测消息", udid)
# LogManager.method_info(f"获取主播最后发送的消息 即将翻译:{last_in}", "检测消息", udid)
#
# if last_in is not None:
# chinese_last_msg_text = Requester.translationToChinese(last_in)
# else:
# chinese_last_msg_text = ""
#
# # 进行判断,判断翻译后是否
#
# LogManager.method_info(f"翻译中文后的内容,交给前端进行展示:{chinese_last_msg_text}", "检测消息",
# udid)
#
# # 找到输入框
# last_data = [{
# "sender": anchor_name,
# "device": udid,
# "time": datetime.now().strftime("%Y-%m-%d %H:%M"),
# "text": chinese_last_msg_text,
# "status": 0
# }]
#
# LogManager.method_info(f"主播最后发送的数据,传递给前端进行记录:{chinese_last_msg_text}",
# "检测消息", udid)
#
# # 把主播的名称存储到c盘
# JsonUtils.append_json_items(last_data, "log/last_message.json")
#
# # 从C盘中读取数据
# anchorWithSession = IOSAIStorage.load()
#
# sel = session.xpath("//TextView")
# if anchor_name not in anchorWithSession:
#
# # 如果是第一次发消息(没有sessionId的情况)
# LogManager.method_info(f"第一次发消息:{anchor_name},没有记忆 开始请求ai", "检测消息", udid)
# LogManager.method_info(f"向ai发送的参数: 文本为:{last_in}", "检测消息", udid)
#
# if last_in is None:
# LogManager.method_info(f"检测不到对方发送的最后一条消息,发送一条打招呼", "检测消息",
# udid)
#
# text = "ok"
# if last_out:
# text = last_out
#
# if sel.exists:
# sel.click() # 聚焦
# event.wait(timeout=1)
# sel.clear_text()
#
# LogManager.method_info(
# f"发送的消息检测不到对方发送的消息不走ai{text or '暂无数据'}", "检测消息",
# udid)
#
# sel.set_text(f"{text or '暂无数据'}\n")
# else:
# LogManager.method_error("找不到输入框,重启", "检测消息", udid)
# raise Exception("找不到输入框,重启")
# else:
# aiResult, sessionId = Requester.chatToAi({"query": last_in, "user": "1"})
# IOSAIStorage.save({anchor_name: sessionId}, mode="merge")
#
# # 找到输入框输入ai返回出来的消息
#
# if sel.exists:
# sel.click() # 聚焦
# event.wait(timeout=1)
# sel.clear_text()
# LogManager.method_info(
# f"发送的消息检测到对方发送的消息进行走ai没记忆{aiResult or '暂无数据'}",
# "检测消息",
# udid)
# sel.set_text(f"{aiResult or '暂无数据'}\n")
# else:
# LogManager.method_error("找不到输入框,重启", "检测消息", udid)
# raise Exception("找不到输入框,重启")
# else:
# print("有记忆")
#
# LogManager.method_info(f"不是一次发消息:{anchor_name},有记忆", "检测消息", udid)
# # 如果不是第一次发消息证明存储的有sessionId
# sessionId = anchorWithSession[anchor_name]
#
# if last_in is None:
# last_in = "ok"
# if sel.exists:
# sel.click() # 聚焦
# event.wait(timeout=1)
# sel.clear_text()
# LogManager.method_info(
# f"发送的消息检测到对方发送的消息进行走ai有记忆{last_in or '暂无数据'}",
# "检测消息",
# udid)
# sel.set_text(f"{last_in or '暂无数据'}\n")
# else:
#
# # TODO: user后续添加暂时写死
#
# LogManager.method_info(f"向ai发送的参数: 文本为:{last_in}", "检测消息", udid)
#
# aiResult, sessionId = Requester.chatToAi(
# {"query": last_in, "conversation_id": sessionId, "user": "1"})
#
# if sel.exists:
# sel.click() # 聚焦
# event.wait(timeout=1)
# sel.clear_text()
# LogManager.method_info(
# f"发送的消息检测到对方发送的消息进行走ai有记忆{aiResult or '暂无数据'}",
# "检测消息",
# udid)
# sel.set_text(f"{aiResult or '暂无数据'}\n")
#
# LogManager.method_info(f"存储的sessionId:{anchorWithSession}", "检测消息", udid)
# event.wait(timeout=1)
#
# # 返回
# ControlUtils.clickBack(session)
#
# # 重新回到收件箱页面后,强制刷新节点
# session.appium_settings({"snapshotMaxDepth": 25})
# event.wait(timeout=1)
#
# try:
# # 如果 2 秒内找不到,会抛异常
# badge_text = session.xpath(xp_new_fan_badge).get(timeout=2.0)
# val = (badge_text.info.get("value") or
# badge_text.info.get("label") or
# badge_text.info.get("name"))
#
# LogManager.method_info(f"新粉丝未读数量:{val}", "检测消息", udid)
# if badge_text:
# badge_text.tap()
# event.wait(timeout=1)
# ControlUtils.clickBack(session)
# event.wait(timeout=1)
# except Exception:
# LogManager.method_warning("当前屏幕没有找到 新粉丝 未读徽标数字", "检测消息", udid)
# print("当前屏幕没有找到 新粉丝 未读徽标数字", udid)
# badge_text = None
# info_count += 1
#
# try:
# # 如果 2 秒内找不到,会抛异常
# badge_text = session.xpath(xp_activity_badge).get(timeout=2.0)
# val = (badge_text.info.get("value") or
# badge_text.info.get("label") or
# badge_text.info.get("name"))
# LogManager.method_info(f"活动未读数量:{val}", "检测消息", udid)
# if badge_text:
# badge_text.tap()
# event.wait(timeout=1)
# ControlUtils.clickBack(session)
# event.wait(timeout=1)
# except Exception:
# LogManager.method_warning("当前屏幕没有找到 活动 未读徽标数字", "检测消息", udid)
# print("当前屏幕没有找到 活动 未读徽标数字", udid)
# badge_text = None
# info_count += 1
#
# try:
# # 如果 2 秒内找不到,会抛异常
# badge_text = session.xpath(xp_system_badge).get(timeout=2.0)
# val = (badge_text.info.get("value") or
# badge_text.info.get("label") or
# badge_text.info.get("name"))
# LogManager.method_info(f"系统通知未读数量:{val}", "检测消息", udid)
# if badge_text:
# badge_text.tap()
# event.wait(timeout=1)
# ControlUtils.clickBack(session)
# event.wait(timeout=1)
# except Exception:
# LogManager.method_warning("当前屏幕没有找到 系统通知 未读徽标数字", "检测消息", udid)
# print("当前屏幕没有找到 系统通知 未读徽标数字", udid)
# badge_text = None
# info_count += 1
#
# try:
# # 如果 2 秒内找不到,会抛异常
# badge_text = session.xpath(xp_request_badge).get(timeout=2.0)
# val = (badge_text.info.get("value") or
# badge_text.info.get("label") or
# badge_text.info.get("name"))
# LogManager.method_info(f"消息请求未读数量:{val}", "检测消息", udid)
# if badge_text:
# badge_text.tap()
# event.wait(timeout=1)
# ControlUtils.clickBack(session)
# event.wait(timeout=1)
# except Exception:
# LogManager.method_warning("当前屏幕没有找到 消息请求 未读徽标数字", "检测消息", udid)
# print("当前屏幕没有找到 消息请求 未读徽标数字", udid)
# badge_text = None
# info_count += 1
#
# # 双击收件箱 定位到消息的位置
# if info_count == 5:
# r = el.bounds # 可能是命名属性,也可能是 tuple
# cx = int((r.x + r.width / 2) if hasattr(r, "x") else (r[0] + r[2] / 2))
# cy = int((r.y + r.height / 2) if hasattr(r, "y") else (r[1] + r[3] / 2))
# session.double_tap(cx, cy) # 可能抛异常:方法不存在
# LogManager.method_info(f"双击收件箱 定位到信息", "检测消息", udid)
# else:
#
# return
# else:
# LogManager.method_error(f"检测不到收件箱", "检测消息", udid)
# raise Exception("当前页面找不到收件箱,重启")
def monitorMessages(self, session, udid, event):
LogManager.method_info("脚本开始执行中", "监控消息")
@@ -1228,11 +1611,6 @@ class ScriptManager():
" or @name='TikTokIMImpl.InboxCellUnreadCountViewBuilder'"
"]//XCUIElementTypeStaticText[@value and translate(@value,'0123456789','')='']"
")/ancestor::XCUIElementTypeCell[1]"
" | "
# 兜底:任何在 CollectionView 下、value 纯数字的徽标 → 找其最近的 Cell
"//XCUIElementTypeCollectionView//XCUIElementTypeStaticText"
"[@value and translate(@value,'0123456789','')='']"
"/ancestor::XCUIElementTypeCell[1]"
)
try:
@@ -1248,13 +1626,13 @@ class ScriptManager():
user_text = None
info_count += 1
if user_text:
print("当前页面的节点", session.source())
if user_text:
print("点击进入用户主页")
user_text.tap()
event.wait(timeout=3)
xml = session.source()
time.sleep(1)
msgs = AiUtils.extract_messages_from_xml(xml)
@@ -1291,7 +1669,6 @@ class ScriptManager():
LogManager.method_info(f"检测到对方最后发送的消息:{last_in}", "检测消息", udid)
LogManager.method_info(f"检测我发送的最后一条信息:{last_out}", "检测消息", udid)
# 获取主播的名称
# anchor_name = AiUtils.get_navbar_anchor_name(session)
anchor_name = ""
@@ -1326,8 +1703,9 @@ class ScriptManager():
LogManager.method_info(f"主播最后发送的数据,传递给前端进行记录:{chinese_last_msg_text}",
"检测消息", udid)
# 把主播的名称存储到c盘
JsonUtils.append_json_items(last_data, "log/last_message.json")
if last_data != "" and last_data != "消息请求":
# 把主播的名称存储到c盘
JsonUtils.append_json_items(last_data, "log/last_message.json")
# 从C盘中读取数据
anchorWithSession = IOSAIStorage.load()
@@ -1362,7 +1740,12 @@ class ScriptManager():
raise Exception("找不到输入框,重启")
else:
aiResult, sessionId = Requester.chatToAi({"query": last_in, "user": "1"})
IOSAIStorage.save({anchor_name: sessionId}, mode="merge")
if anchor_name and anchor_name != "消息请求":
IOSAIStorage.save({anchor_name: sessionId}, mode="merge")
else:
LogManager.method_warning(f"跳过保存 sessionIdanchor_name 不合法: '{anchor_name}'",
"检测消息", udid)
# 找到输入框输入ai返回出来的消息
@@ -1383,37 +1766,40 @@ class ScriptManager():
LogManager.method_info(f"不是一次发消息:{anchor_name},有记忆", "检测消息", udid)
# 如果不是第一次发消息证明存储的有sessionId
sessionId = anchorWithSession[anchor_name]
if last_in is None:
last_in = "ok"
if sel.exists:
sel.click() # 聚焦
event.wait(timeout=1)
sel.clear_text()
LogManager.method_info(
f"发送的消息检测到对方发送的消息进行走ai有记忆{last_in or '暂无数据'}",
"检测消息",
udid)
sel.set_text(f"{last_in or '暂无数据'}\n")
else:
if anchor_name and anchor_name != "消息请求":
# TODO: user后续添加暂时写死
sessionId = anchorWithSession[anchor_name]
LogManager.method_info(f"向ai发送的参数: 文本为:{last_in}", "检测消息", udid)
if last_in is None:
last_in = "ok"
if sel.exists:
sel.click() # 聚焦
event.wait(timeout=1)
sel.clear_text()
LogManager.method_info(
f"发送的消息检测到对方发送的消息进行走ai有记忆{last_in or '暂无数据'}",
"检测消息",
udid)
sel.set_text(f"{last_in or '暂无数据'}\n")
else:
aiResult, sessionId = Requester.chatToAi(
{"query": last_in, "conversation_id": sessionId, "user": "1"})
# TODO: user后续添加暂时写死
if sel.exists:
sel.click() # 聚焦
event.wait(timeout=1)
sel.clear_text()
LogManager.method_info(
f"发送的消息检测到对方发送的消息进行走ai有记忆{aiResult or '暂无数据'}",
"检测消息",
udid)
sel.set_text(f"{aiResult or '暂无数据'}\n")
LogManager.method_info(f"向ai发送的参数: 文本为:{last_in}", "检测消息", udid)
aiResult, sessionId = Requester.chatToAi(
{"query": last_in, "conversation_id": sessionId, "user": "1"})
if sel.exists:
sel.click() # 聚焦
event.wait(timeout=1)
sel.clear_text()
LogManager.method_info(
f"发送的消息检测到对方发送的消息进行走ai有记忆{aiResult or '暂无数据'}",
"检测消息",
udid)
sel.set_text(f"{aiResult or '暂无数据'}\n")
LogManager.method_info(f"存储的sessionId:{anchorWithSession}", "检测消息", udid)
event.wait(timeout=1)
@@ -1512,6 +1898,7 @@ class ScriptManager():
LogManager.method_error(f"检测不到收件箱", "检测消息", udid)
raise Exception("当前页面找不到收件箱,重启")
# 放在 ScriptManager 类外面或 utils 里
def interruptible_sleep(self, event: threading.Event, seconds: float, slice_: float = 1.0):
"""把一次长 sleep 拆成 1 秒一片,随时响应 event"""