Files
iOSAI/script/ScriptManager.py
2025-09-03 19:03:34 +08:00

790 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import random
import re
import threading
import time
from enum import Enum
import wda
import os
from Utils.AiUtils import AiUtils
from Utils.ControlUtils import ControlUtils
from Utils.LogManager import LogManager
from Entity.Variables import anchorList, removeModelFromAnchorList, prologueList, anchorWithSession
from Utils.Requester import Requester
# 脚本管理类
class ScriptManager():
# 单利对象
_instance = None # 类变量,用于存储单例实例
def __new__(cls):
# 如果实例不存在,则创建一个新实例
if cls._instance is None:
cls._instance = super(ScriptManager, cls).__new__(cls)
# 返回已存在的实例
return cls._instance
def __init__(self):
super().__init__()
self.initialized = True # 标记已初始化
# 养号
def growAccount(self, udid, event, max_retries=None):
"""
自动养号方法,带重生机制
:param udid: 设备 UDID
:param event: 线程控制 Event
:param max_retries: 最大重试次数 (None 表示无限重试)
"""
retries = 10
while not event.is_set():
try:
# ========= 初始化 =========
client = wda.USBClient(udid)
session = client.session()
# 关闭并重新打开 TikTok
ControlUtils.closeTikTok(session, udid)
time.sleep(1)
ControlUtils.openTikTok(session, udid)
time.sleep(3)
LogManager.method_info("养号重启tiktok", "养号", udid)
AiUtils.makeUdidDir(udid)
# ========= 主循环 =========
while not event.is_set():
# 设置手机的节点深度为15,判断该页面是否正确
session.appium_settings({"snapshotMaxDepth": 15})
# 判断当前页面上是否有推荐按钮
# el = session(xpath='//XCUIElementTypeButton[@name="top_tabs_recomend"]')
# node = session.xpath(
# '//XCUIElementTypeButton[@name="top_tabs_recomend" or @name="推荐" or @label="推荐"]'
# )
el = session.xpath(
'//XCUIElementTypeButton[@name="top_tabs_recomend" or @name="推荐" or @label="推荐"]'
)
if not el.exists:
# 记录日志
LogManager.method_error("找不到推荐按钮,养号出现问题,重启养号功能", "养号", udid=udid)
# 手动的抛出异常 重启流程
raise Exception("找不到推荐按钮,养号出现问题,重启养号功能")
if el.value != "1":
LogManager.method_error("当前页面不是推荐页面,养号出现问题,重启养号功能", "养号", udid=udid)
raise Exception("当前页面不是推荐页面,养号出现问题,重启养号功能")
LogManager.method_info("当前页面是推荐页面,开始养号", "养号", udid=udid)
# 重新设置节点的深度,防止手机进行卡顿
session.appium_settings({"snapshotMaxDepth": 0})
# ---- 截图保存 ----
try:
img = client.screenshot()
base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
resource_dir = os.path.join(base_dir, "resources", udid)
os.makedirs(resource_dir, exist_ok=True)
filePath = os.path.join(resource_dir, "bgv.png")
img.save(filePath)
LogManager.method_info(f"保存屏幕图像成功 -> {filePath}", "养号", udid)
print("保存了背景图:", filePath)
time.sleep(1)
except Exception as e:
LogManager.method_info(f"截图或保存失败,失败原因:{e}", "养号", udid)
raise Exception("截图或保存失败,重启养号功能")
# ---- 视频逻辑 ----
try:
addX, addY = AiUtils.findImageInScreen("add", udid)
isSame = False
for i in range(2):
tx, ty = AiUtils.findImageInScreen("add", udid)
if addX == tx and addY == ty:
isSame = True
time.sleep(1)
else:
isSame = False
break
if addX > 0 and isSame:
needLike = random.randint(0, 100)
homeButton = AiUtils.findHomeButton(udid)
if homeButton:
LogManager.method_info("有首页按钮,查看视频", "养号", udid)
videoTime = random.randint(5, 15)
time.sleep(videoTime)
# 重置 session
client = wda.USBClient(udid)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 0})
if needLike < 23:
LogManager.method_info("进行点赞", "养号", udid)
ControlUtils.clickLike(session, udid)
LogManager.method_info("继续观看视频", "养号", udid)
videoTime = random.randint(10, 30)
time.sleep(videoTime)
LogManager.method_info("准备划到下一个视频", "养号", udid)
client.swipe_up()
else:
LogManager.method_error("找不到首页按钮。出错了", "养号", udid)
else:
nextTime = random.randint(1, 5)
time.sleep(nextTime)
client.swipe_up()
except Exception as e:
LogManager.method_error(f"刷视频过程出现错误,重试", "养号", udid)
raise e # 抛出给上层,触发重生机制
except Exception as e:
LogManager.method_error(f"[{udid}] 养号出现异常,将重启流程: {e}", "养号", udid)
retries += 1
time.sleep(3) # 等待后重生
# 观看直播
def watchLiveForGrowth(self, udid, event, max_retries=None):
import time, random, wda
retry_count = 0
backoff_sec = 10 # 异常后冷却,避免频繁重启
while not event.is_set():
try:
# —— 每次重启都新建 client/session ——
client = wda.USBClient(udid)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 15})
# 1) 先关再开
ControlUtils.closeTikTok(session, udid)
time.sleep(1)
ControlUtils.openTikTok(session, udid)
time.sleep(3)
# 2) 进入直播 (使用英文)
live_button = session(xpath='//XCUIElementTypeButton[@name="直播"]')
if live_button.exists:
live_button.click()
else:
LogManager.method_error("无法找到直播间按钮 抛出异常 重新启动", "直播养号", udid)
# 抛出异常
raise Exception(f"找不到直播按钮,抛出异常 重新启动")
time.sleep(20)
live_button = session(xpath='//XCUIElementTypeButton[@name="直播"]')
if live_button.exists:
continue
# 下滑一下
client.swipe_up()
# 3) 取分辨率;可选重建 session 规避句柄陈旧
size = session.window_size()
width, height = size.width, size.height
session = client.session()
# 4) 主循环:刷直播
while not event.is_set():
time.sleep(3)
# 找到一个看直播的时候肯定有的元素,当这个元素没有的时候,就代表当前的页面出现了问题
# 需要抛出异常,重启这个流程
el1 = session.xpath('//XCUIElementTypeOther[@name="GBLFeedRootViewComponent"]')
el2 = session.xpath('//XCUIElementTypeOther[@value="0%"]')
if not (el1.exists or el2.exists):
print("当前页面不是直播间,重启刷直播")
LogManager.method_error("当前页面不是直播间,重启刷直播", "直播养号", udid=udid)
raise Exception("当前页面不是直播间")
else:
print("当前页面是直播间,继续刷直播")
LogManager.method_info("当前页面是直播间,继续刷直播", "直播养号", udid=udid)
# PK 直接划走
if session(xpath='//XCUIElementTypeOther[@name="kGBLInteractionViewMatchScoreBar"]').exists:
print("✅ 当前是 PK跳过")
LogManager.method_info("✅ 当前是 PK跳过", "直播养号", udid=udid)
session.swipe_up()
continue
# 计算直播显示窗口数量(主画面+连麦小窗)
count = AiUtils.count_add_by_xml(session)
print(f"检测到直播显示区域窗口数:{count}")
if count > 1:
print("❌ 多窗口(有人连麦/分屏),划走")
LogManager.method_info("❌ 多窗口(有人连麦/分屏),划走", "直播养号", udid=udid)
session.swipe_up()
continue
else:
print("✅ 单窗口,(10%概率)开始点赞")
LogManager.method_info("✅ 单窗口,(3%概率)开始点赞", "直播养号", udid=udid)
# 随机点赞(仍保留中途保护)
if random.random() >= 0.97:
print("开始点赞")
LogManager.method_info("开始点赞", "直播养号", udid=udid)
for _ in range(random.randint(10, 30)):
# 中途转PK/连麦立即跳过
if session(xpath='//XCUIElementTypeOther[@name="kGBLInteractionViewMatchScoreBar"]').exists \
or AiUtils.count_add_by_xml(session) > 1:
print("❗ 中途发生 PK/连麦,跳过")
LogManager.method_info("❗ 中途发生 PK/连麦,跳过", "直播养号", udid=udid)
session.swipe_up()
break
x = width // 3 + random.randint(-10, 10)
y = height // 3 + random.randint(10, 20)
print("双击坐标:", x, y)
session.double_tap(x, y)
print("--------------------------------------------")
time.sleep(random.randint(180, 300))
session.swipe_up()
# 正常退出(外部 event 触发)
break
except Exception as e:
retry_count += 1
LogManager.method_error(f"watchLiveForGrowth 异常(第{retry_count}次):{repr(e)}", "直播养号", udid)
# 尝试轻量恢复一次,避免一些短暂性 session 失效
try:
client = wda.USBClient(udid)
_ = client.session()
except Exception:
pass
time.sleep(backoff_sec) # 冷却后整段流程重来
continue
"""
外层包装,出现异常自动重试、
关注打招呼以及回复主播消息
"""
def safe_greetNewFollowers(self, udid, needReply, event):
retries = 0
while not event.is_set():
try:
self.greetNewFollowers(udid, needReply, event)
return # 成功执行就退出
except Exception as e:
retries += 1
LogManager.method_error(f"greetNewFollowers 出现异常: {e},准备第 {retries} 次重试", "关注打招呼", udid)
time.sleep(3)
LogManager.method_error("greetNewFollowers 重试次数耗尽,任务终止", "关注打招呼", udid)
# 关注打招呼以及回复主播消息
def greetNewFollowers(self, udid, needReply, event):
client = wda.USBClient(udid)
session = client.session()
print(f"是否要自动回复消息:{needReply}")
LogManager.method_info(f"是否要自动回复消息:{needReply}", "关注打招呼", udid)
# 先关闭Tik Tok
ControlUtils.closeTikTok(session, udid)
time.sleep(1)
# 重新打开Tik Tok
ControlUtils.openTikTok(session, udid)
time.sleep(3)
# 设置查找深度
session.appium_settings({"snapshotMaxDepth": 15})
# 创建udid名称的目录
AiUtils.makeUdidDir(udid)
# 返回上一步
def goBack(count):
for i in range(count):
ControlUtils.clickBack(session)
time.sleep(2)
# 循环条件。1、 循环关闭 2、 数据处理完毕
while not event.is_set() and len(anchorList) > 0:
# 获取一个主播,并删除
anchor = anchorList.pop(0)
aid = anchor.anchorId
anchorCountry = anchor.country
# 点击搜索按钮
ControlUtils.clickSearch(session)
# 强制刷新session
session.appium_settings({"snapshotMaxDepth": 15})
# 查找输入框
input = session.xpath('//XCUIElementTypeSearchField')
# 如果找到了输入框,就点击并且输入内容
if input.exists:
input.click()
# 稍作停顿
time.sleep(0.5)
else:
print(f"找不到输入框")
input.clear_text()
time.sleep(1)
# 输入主播id
input.set_text(aid + "\n")
# 定位 "关注" 按钮 通过关注按钮的位置点击主播首页
session.appium_settings({"snapshotMaxDepth": 23})
try:
# 点击关注按钮
ControlUtils.clickFollow(session, aid)
except wda.WDAElementNotFoundError:
# 如果没有“关注”按钮,则不点击
print("未找到‘关注’按钮,跳过点击。")
LogManager.method_info("未找到‘关注’按钮,跳过点击。", "关注打招呼", udid)
goBack(2)
session.appium_settings({"snapshotMaxDepth": 15})
continue
time.sleep(2)
# 找到并点击第一个视频
cellClickResult, workCount = ControlUtils.clickFirstVideoFromDetailPage(session)
time.sleep(2)
# 观看主播视频
def viewAnchorVideo(workCount):
print("开始查看视频,并且重新调整查询深度")
session.appium_settings({"snapshotMaxDepth": 5})
if workCount > 3:
count = 3
else:
count = workCount
while count != 0:
time.sleep(5)
img = client.screenshot()
time.sleep(1)
# filePath = f"resources/{udid}/bgv.png"
base_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) # 当前脚本目录的上一级
filePath = os.path.join(base_dir, "resources", udid, "bgv.png")
dirPath = os.path.dirname(filePath)
if not os.path.exists(dirPath):
os.makedirs(dirPath)
img.save(filePath)
LogManager.method_info("保存屏幕图像成功", "关注打招呼", udid)
time.sleep(2)
# 查找add图标
r = ControlUtils.clickLike(session, udid)
# 点赞成功。
# if r == True:
count -= 1
# 随机看视频 15~30秒
time.sleep(random.randint(15, 30))
if count != 0:
client.swipe_up()
# 右滑返回
client.swipe_right()
# 如果打开视频失败。说明该主播没有视频
if cellClickResult == True:
# 观看主播视频
LogManager.method_info("去查看主播视频", "关注打招呼", udid)
viewAnchorVideo(workCount)
time.sleep(3)
LogManager.method_info("视频看完了,重置试图查询深度", "关注打招呼", udid)
session.appium_settings({"snapshotMaxDepth": 25})
# 向上滑动
session.swipe_down()
# 点击关注按钮
followButton = AiUtils.getFollowButton(session)
if followButton is not None:
LogManager.method_info("找到关注按钮了", "关注打招呼", udid)
followButton.click()
else:
LogManager.method_info("没找到关注按钮", "关注打招呼", udid)
time.sleep(1)
goBack(3)
session.appium_settings({"snapshotMaxDepth": 15})
continue
time.sleep(2)
msgButton = AiUtils.getSendMesageButton(session)
time.sleep(2)
if msgButton is not None:
LogManager.method_info("找到发消息按钮了", "关注打招呼", udid)
print("找到发消息按钮了")
# 进入聊天页面
msgButton.click()
else:
LogManager.method_info("没有识别出发消息按钮", "关注打招呼", udid)
print("没有识别出发消息按钮")
goBack(3)
session.appium_settings({"snapshotMaxDepth": 15})
continue
time.sleep(3)
# 查找聊天界面中的输入框节点
chatInput = session.xpath("//TextView")
if chatInput.exists:
print("找到输入框了, 准备发送一条打招呼消息")
LogManager.method_info("找到输入框了, 准备发送一条打招呼消息", "关注打招呼", udid)
# 准备打招呼的文案
text = random.choice(prologueList)
# 翻译成主播国家的语言
msg = Requester.translation(text, anchorCountry)
# 准备发送一条信息
chatInput.click()
time.sleep(2)
# 发送消息
chatInput.set_text(msg + "\n")
time.sleep(1)
else:
print("无法发送信息")
LogManager.method_info(f"给主播{anchor.anchorId} 发送消息失败", "关注打招呼", udid)
# 接着下一个主播
# removeModelFromAnchorList(anchor)
session.appium_settings({"snapshotMaxDepth": 15})
goBack(4)
else:
print(f"{anchor.anchorId}:该主播没有视频")
LogManager.method_info(f"{anchor.anchorId}:该主播没有视频", "关注打招呼", udid)
goBack(3)
session.appium_settings({"snapshotMaxDepth": 15})
continue
# 设置查找深度
session.appium_settings({"snapshotMaxDepth": 15})
time.sleep(2)
print("即将要回复消息")
LogManager.method_info("即将要回复消息", "关注打招呼", udid)
if needReply:
print("如果需要回复主播消息。走此逻辑")
print("----------------------------------------------------------")
# if AiUtils.getUnReadMsgCount(session) > 0:
print("监控回复消息")
# 执行回复消息逻辑
self.monitorMessages(session, udid)
homeButton = AiUtils.findHomeButton(udid)
if homeButton.exists:
homeButton.click()
else:
ControlUtils.closeTikTok(session, udid)
time.sleep(2)
ControlUtils.openTikTok(session, udid)
time.sleep(3)
print("重新创建wda会话 防止wda会话失效")
client = wda.USBClient(udid)
session = client.session()
# 执行完成之后。继续点击搜索
session.appium_settings({"snapshotMaxDepth": 15})
# 点击搜索按钮
# ControlUtils.clickSearch(session)
else:
session.appium_settings({"snapshotMaxDepth": 15})
# 点击搜索按钮
# ControlUtils.clickSearch(session)
print("greetNewFollowers方法执行完毕")
# 检测消息
def replyMessages(self, udid, event):
client = wda.USBClient(udid)
session = client.session()
ControlUtils.closeTikTok(session, udid)
time.sleep(2)
ControlUtils.openTikTok(session, udid)
time.sleep(3)
while not event.is_set():
try:
# 调用检测消息的方法
self.monitorMessages(session, udid)
except Exception as e:
LogManager.method_error(f"监控消息 出现异常: {e},重新启动监控直播", "检测消息", udid)
# 出现异常时,稍等再重启 TikTok 并重试
ControlUtils.closeTikTok(session, udid)
time.sleep(2)
ControlUtils.openTikTok(session, udid)
time.sleep(3)
continue # 重新进入 while 循环,调用 monitorMessages
# 检查未读消息并回复
def monitorMessages(self, session, udid):
# 调整节点的深度为 7
session.appium_settings({"snapshotMaxDepth": 7})
el = session(xpath='//XCUIElementTypeButton[@name="a11y_vo_inbox"]')
# 如果收件箱有消息 则进行点击
if el.exists:
m = re.search(r'(\d+)', el.label) # 抓到的第一个数字串
count = int(m.group(1)) if m else 0
if count:
el.click()
session.appium_settings({"snapshotMaxDepth": 25})
time.sleep(3)
while True:
info_count = 0
# 创建新的会话
el = session(xpath='//XCUIElementTypeButton[@name="a11y_vo_inbox"]')
print("el", el)
if not el.exists:
LogManager.method_error(f"检测不到收件箱", "检测消息", udid)
raise Exception("当前页面找不到收件箱,重启")
# break
# 支持中文“收件箱”和英文“Inbox”
xpath_query = (
"//XCUIElementTypeStaticText"
"[@value='收件箱' or @label='收件箱' or @name='收件箱'"
" or @value='Inbox' or @label='Inbox' or @name='Inbox']"
)
# 查找所有收件箱节点
inbox_nodes = session.xpath(xpath_query).find_elements()
if len(inbox_nodes) < 2:
LogManager.method_error(f"当前页面不再收件箱页面,重启", "检测消息", udid)
raise Exception("当前页面不再收件箱页面,重启")
m = re.search(r'(\d+)', el.label) # 抓到的第一个数字串
count = int(m.group(1)) if m else 0
if not count:
LogManager.method_info(f"当前收件箱的总数量{count}", "检测消息", udid)
break
# 新粉丝
xp_new_fan_badge = (
"//XCUIElementTypeCell[.//XCUIElementTypeLink[@name='新粉丝']]"
"//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
)
# 活动
xp_activity_badge = (
"//XCUIElementTypeCell[.//XCUIElementTypeLink[@name='活动']]"
"//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
)
# 系统通知
xp_system_badge = (
"//XCUIElementTypeCell[.//XCUIElementTypeLink[@name='系统通知']]"
"//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
)
# 消息请求
xp_request_badge = (
"//XCUIElementTypeCell"
"[.//*[self::XCUIElementTypeLink or self::XCUIElementTypeStaticText]"
" [@name='消息请求' or @label='消息请求' or @value='消息请求']]"
"//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
)
# 用户消息
xp_badge_numeric = (
'//XCUIElementTypeOther['
' @name="AWEIMChatListCellUnreadCountViewComponent"'
' or @name="TikTokIMImpl.InboxCellUnreadCountViewBuilder"'
']//XCUIElementTypeStaticText[@value and translate(@value,"0123456789","")=""]'
)
try:
# 如果 2 秒内找不到,会抛异常
user_text = session.xpath(xp_badge_numeric).get(timeout=2.0)
val = (user_text.info.get("value") or
user_text.info.get("label") or
user_text.info.get("name"))
LogManager.method_info(f"用户未读数量:{val}", "检测消息", udid)
except Exception:
LogManager.method_warning("当前屏幕没有找到 用户 未读徽标数字", "检测消息", udid)
print("当前屏幕没有找到 用户消息 未读徽标数字", udid)
user_text = None
info_count += 1
if user_text:
user_text.tap()
time.sleep(3)
xml = session.source()
msgs = AiUtils.extract_messages_from_xml(xml)
# 检测出对方发的最后一条信息
# last_msg_text = next(item['text'] for item in reversed(msgs) if item['type'] == 'msg')
text_list = ['What do you think of my live stream?',
'What do you think makes my streams special?',
'Do you think Im one of the most engaging streamers youve seen?']
last_msg_text = next((item['text'] for item in reversed(msgs) if item['type'] == 'msg'),
random.choice(text_list))
# 向ai发送信息
# 获取主播的名称
anchor_name = AiUtils.get_navbar_anchor_name(session)
# 找到输入框
sel = session.xpath("//TextView")
if anchor_name not in anchorWithSession:
# 如果是第一次发消息(没有sessionId的情况)
response = Requester.chatToAi({"msg": last_msg_text})
aiResult = response['result']
sessionId = response['session_id']
anchorWithSession[anchor_name] = sessionId
# 找到输入框输入ai返回出来的消息
if sel.exists:
sel.click() # 聚焦
time.sleep(1)
sel.clear_text()
sel.set_text(aiResult + "\n")
else:
LogManager.method_error("找不到输入框,重启", "检测消息", udid)
raise Exception("找不到输入框,重启")
else:
# 如果不是第一次发消息证明存储的有sessionId
sessionId = anchorWithSession[anchor_name]
response = Requester.chatToAi({"msg": last_msg_text, "sid": sessionId})
aiResult = response['result']
if sel.exists:
sel.click() # 聚焦
time.sleep(1)
sel.clear_text()
sel.set_text(aiResult + "\n")
LogManager.method_info(f"存储的sessionId:{anchorWithSession}", "检测消息", udid)
time.sleep(1)
# 返回
ControlUtils.clickBack(session)
# 重新回到收件箱页面后,强制刷新节点
session.appium_settings({"snapshotMaxDepth": 25})
time.sleep(1)
try:
# 如果 2 秒内找不到,会抛异常
badge_text = session.xpath(xp_new_fan_badge).get(timeout=2.0)
val = (badge_text.info.get("value") or
badge_text.info.get("label") or
badge_text.info.get("name"))
LogManager.method_info(f"新粉丝未读数量:{val}", "检测消息", udid)
if badge_text:
badge_text.tap()
time.sleep(1)
ControlUtils.clickBack(session)
time.sleep(1)
except Exception:
LogManager.method_warning("当前屏幕没有找到 新粉丝 未读徽标数字", "检测消息", udid)
print("当前屏幕没有找到 新粉丝 未读徽标数字", udid)
badge_text = None
info_count += 1
try:
# 如果 2 秒内找不到,会抛异常
badge_text = session.xpath(xp_activity_badge).get(timeout=2.0)
val = (badge_text.info.get("value") or
badge_text.info.get("label") or
badge_text.info.get("name"))
LogManager.method_info(f"活动未读数量:{val}", "检测消息", udid)
if badge_text:
badge_text.tap()
time.sleep(1)
ControlUtils.clickBack(session)
time.sleep(1)
except Exception:
LogManager.method_warning("当前屏幕没有找到 活动 未读徽标数字", "检测消息", udid)
print("当前屏幕没有找到 活动 未读徽标数字", udid)
badge_text = None
info_count += 1
try:
# 如果 2 秒内找不到,会抛异常
badge_text = session.xpath(xp_system_badge).get(timeout=2.0)
val = (badge_text.info.get("value") or
badge_text.info.get("label") or
badge_text.info.get("name"))
LogManager.method_info(f"系统通知未读数量:{val}", "检测消息", udid)
if badge_text:
badge_text.tap()
time.sleep(1)
ControlUtils.clickBack(session)
time.sleep(1)
except Exception:
LogManager.method_warning("当前屏幕没有找到 系统通知 未读徽标数字", "检测消息", udid)
print("当前屏幕没有找到 系统通知 未读徽标数字", udid)
badge_text = None
info_count += 1
try:
# 如果 2 秒内找不到,会抛异常
badge_text = session.xpath(xp_request_badge).get(timeout=2.0)
val = (badge_text.info.get("value") or
badge_text.info.get("label") or
badge_text.info.get("name"))
LogManager.method_info(f"消息请求未读数量:{val}", "检测消息", udid)
if badge_text:
badge_text.tap()
time.sleep(1)
ControlUtils.clickBack(session)
time.sleep(1)
except Exception:
LogManager.method_warning("当前屏幕没有找到 消息请求 未读徽标数字", "检测消息", udid)
print("当前屏幕没有找到 消息请求 未读徽标数字", udid)
badge_text = None
info_count += 1
# 双击收件箱 定位到消息的位置
if info_count == 5:
r = el.bounds # 可能是命名属性,也可能是 tuple
cx = int((r.x + r.width / 2) if hasattr(r, "x") else (r[0] + r[2] / 2))
cy = int((r.y + r.height / 2) if hasattr(r, "y") else (r[1] + r[3] / 2))
session.double_tap(cx, cy) # 可能抛异常:方法不存在
LogManager.method_info(f"双击收件箱 定位到信息", "检测消息", udid)
else:
return
else:
LogManager.method_error(f"检测不到收件箱", "检测消息", udid)
raise Exception("当前页面找不到收件箱,重启")