Files
iOSAI/script/ScriptManager.py

2175 lines
103 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import random
import re
import threading
import time
from pathlib import Path
import wda
import os
from datetime import datetime
from Entity import Variables
from Utils.AiUtils import AiUtils
from Utils.ControlUtils import ControlUtils
from Utils.CountryEnum import CountryLanguageMapper
from Utils.IOSAIStorage import IOSAIStorage
from Utils.JsonUtils import JsonUtils
from Utils.LogManager import LogManager
from Entity.Variables import anchorList
from Utils.OCRUtils import OCRUtils
from Utils.Requester import Requester
import Entity.Variables as ev
from Utils.TencentOCRUtils import TencentOCR
# 脚本管理类
class ScriptManager():
# # 单利对象
# _instance = None # 类变量,用于存储单例实例
#
# def __new__(cls):
# # 如果实例不存在,则创建一个新实例
# if cls._instance is None:
# cls._instance = super(ScriptManager, cls).__new__(cls)
# # 返回已存在的实例
# return cls._instance
_device_cache = {}
_cache_lock = threading.Lock() # 线程安全锁(可选,如果你有多线程)
@classmethod
def get_screen_info(cls, udid: str):
# 如果缓存中没有该设备的信息,则获取并缓存
if udid not in cls._device_cache:
with cls._cache_lock: # 防止并发写入
if udid not in cls._device_cache: # 双重检查
cls._device_cache[udid] = AiUtils._screen_info(udid)
return cls._device_cache[udid]
def __init__(self):
super().__init__()
# 初始化获取模版所在的地址
current_dir = Path(__file__).resolve().parent
# 项目根目录(假设你的类文件在项目的子目录里,比如 Module/OCR/OCRUtils.py
self.lock = threading.Lock()
project_root = current_dir.parent # 如果你确定这个文件就在项目根目录下,可省略这行
# resources 文件夹路径
# 获取相应的模板的地址
self.resources_dir = project_root / "resources"
# === 2. @2x 素材 (scale=2) ===
self.comment_dir2 = self.resources_dir / "comment2.png"
self.comment_add_dir_2x = self.resources_dir / "insert_comment2x.png"
self.comment_add_dir2_2x = self.resources_dir / "insert_comment2x.png"
# === 3. @3x 素材 (scale=3) ===
self.comment_dir = self.resources_dir / "comment.png"
self.comment_add_dir = self.resources_dir / "insert_comment.png"
self.comment_add_dir2 = self.resources_dir / "insert_comment2.png"
self.initialized = True # 标记已初始化
def _pick_template(self, scale: float):
"""
scale≈3 -> 返回 @3x 素材,除 3
其余默认 @2x 素材,除 2
"""
if abs(scale - 3.0) < 0.3: # 3x
return (self.comment_dir,
self.comment_add_dir,
self.comment_add_dir2, # 3x 兜底
3) # 除数
else: # 2x默认
return (self.comment_dir2,
self.comment_add_dir_2x,
self.comment_add_dir2_2x, # 2x 兜底
2) # 除数
def comment_flow(self, filePath, session, udid, recomend_cx, recomend_cy):
width, height, scale = self.get_screen_info(udid)
# 取当前分辨率的三张图 + 除数
comment_tpl, add_tpl, add_fb_tpl, div = self._pick_template(scale)
# ① 点评论按钮
coord = OCRUtils.find_template(str(comment_tpl), filePath)
LogManager.method_info(f"使用的模板路径是:{str(comment_tpl)}", "养号", udid)
if not coord:
print("无法检测到评论按钮")
LogManager.method_info("无法检测到评论按钮", "养号", udid)
return
cx, cy = coord[0]
session.click(int(cx / div), int(cy / div))
LogManager.method_info(f"点击评论坐标:{int(cx / div)}, {int(cy / div)}", "养号", udid)
time.sleep(2)
# ② 重新截图(防止键盘弹起)
img = session.screenshot()
filePath = os.path.join(os.path.dirname(filePath), "bgv_comment.png")
img.save(filePath)
# ③ 随机评论语
single_comment = random.choice(Variables.commentList) if Variables.commentList else "评论没有导入数据"
# ④ 找「添加评论」按钮
coord2 = OCRUtils.find_template(str(add_tpl), filePath)
click_count = False
if coord2: # 方案 1 命中
cx2, cy2 = coord2[0]
session.tap(int(cx2 / div), int(cy2 / div))
session.send_keys(f"{single_comment}\n")
click_count = True
LogManager.method_info("评论成功方案1", "养号", udid)
else: # 方案 2 兜底
time.sleep(1)
img = session.screenshot()
img.save(filePath)
coord3 = OCRUtils.find_template(str(add_fb_tpl), filePath)
if coord3:
cx3, cy3 = coord3[0]
session.tap(int(cx3 / div), int(cy3 / div))
session.send_keys(f"{single_comment}\n")
click_count = True
LogManager.method_info("评论成功方案2", "养号", udid)
# ⑤ 返回 / 取消
# tap_x = int(recomend_cx) if recomend_cx else 100
# tap_y = int(recomend_cy) if recomend_cy else 100
time.sleep(1)
session.tap(100, 100)
if not click_count: # 兜底多点一次
time.sleep(1)
session.tap(100, 100)
# 养号
def growAccount(self, udid, isComment, event, is_monitoring=False):
LogManager.method_info(f"调用刷视频", "养号", udid)
# ========= 初始化 =========
client = wda.USBClient(udid, ev.wdaFunctionPort)
session = client.session()
AiUtils.makeUdidDir(udid)
while not event.is_set():
try:
if not is_monitoring:
LogManager.method_info(f"开始养号重启tiktok", "养号", udid)
ControlUtils.closeTikTok(session, udid)
event.wait(timeout=1)
ControlUtils.openTikTok(session, udid)
event.wait(timeout=3)
recomend_cx = 0
recomend_cy = 0
# ========= 主循环 =========
while not event.is_set():
# 设置手机的节点深度为15,判断该页面是否正确
session.appium_settings({"snapshotMaxDepth": 15})
el = session.xpath(
'//XCUIElementTypeButton[@name="top_tabs_recomend" or @name="推荐" or @label="推荐"]'
)
# 获取推荐按钮所在的坐标
if el.exists:
bounds = el.bounds # 返回 [x, y, width, height]
recomend_cx = bounds[0] + bounds[2] // 2
recomend_cy = bounds[1] + bounds[3] // 2
if not el.exists:
# 记录日志
LogManager.method_error("找不到推荐按钮,养号出现问题,重启养号功能", "养号", udid=udid)
# 手动的抛出异常 重启流程
raise Exception("找不到推荐按钮,养号出现问题,重启养号功能")
if el.value != "1":
LogManager.method_error("当前页面不是推荐页面,养号出现问题,重启养号功能", "养号", udid=udid)
raise Exception("当前页面不是推荐页面,养号出现问题,重启养号功能")
LogManager.method_info("当前页面是推荐页面,开始养号", "养号", udid=udid)
# 重新设置节点的深度,防止手机进行卡顿
session.appium_settings({"snapshotMaxDepth": 0})
# ---- 截图保存 ----
try:
img = client.screenshot()
base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
resource_dir = os.path.join(base_dir, "resources", udid)
os.makedirs(resource_dir, exist_ok=True)
filePath = os.path.join(resource_dir, "bgv.png")
img.save(filePath)
LogManager.method_info(f"保存屏幕图像成功 -> {filePath}", "养号", udid)
print("保存了背景图:", filePath)
event.wait(timeout=1)
except Exception as e:
LogManager.method_info(f"截图或保存失败,失败原因:{e}", "养号", udid)
raise Exception("截图或保存失败,重启养号功能")
# ---- 视频逻辑 ----
try:
width, height, scale = self.get_screen_info(udid)
if scale == 3.0:
addX, addY = AiUtils.findImageInScreen("add", udid)
else:
addX, addY = AiUtils.findImageInScreen("like1", udid)
isSame = False
for i in range(2):
if scale == 3.0:
tx, ty = AiUtils.findImageInScreen("add", udid)
else:
tx, ty = AiUtils.findImageInScreen("like1", udid)
if addX == tx and addY == ty:
isSame = True
event.wait(timeout=1)
else:
isSame = False
# break
if addX > 0 and isSame:
needLike = random.randint(0, 100)
homeButton = AiUtils.findHomeButton(session)
if homeButton:
LogManager.method_info("有首页按钮,查看视频", "养号", udid)
videoTime = random.randint(5, 15)
LogManager.method_info("准备停止脚本", method="task")
for _ in range(videoTime): # 0.2 秒一片
if event.is_set():
LogManager.method_info("停止脚本中", method="task")
break
event.wait(timeout=1)
LogManager.method_info("停止脚本成功", method="task")
# 重置 session
session.appium_settings({"snapshotMaxDepth": 0})
if needLike < 25:
LogManager.method_info("进行点赞", "养号", udid)
ControlUtils.clickLike(session, udid)
LogManager.method_info("继续观看视频", "养号", udid)
LogManager.method_info("准备划到下一个视频", "养号", udid)
else:
LogManager.method_error("找不到首页按钮。出错了", "养号", udid)
if isComment and random.random() > 0.70:
self.comment_flow(filePath, session, udid, recomend_cx, recomend_cy)
event.wait(timeout=2)
home = AiUtils.findHomeButton(session)
if not home:
raise Exception("没有找到首页按钮,重置")
videoTime = random.randint(15, 30)
for _ in range(videoTime):
if event.is_set():
break
event.wait(timeout=1)
ControlUtils.swipe_up(client)
# 如果is_monitoring 为False 则说明和监控消息没有联动,反正 则证明和监控消息进行联动
if is_monitoring:
# 监控消息按钮,判断是否有消息
el = session.xpath(
'//XCUIElementTypeButton[@name="a11y_vo_inbox"]'
' | '
'//XCUIElementTypeButton[contains(@name,"收件箱")]'
' | '
'//XCUIElementTypeButton[.//XCUIElementTypeStaticText[@value="收件箱"]]'
)
# 判断收件箱是否有消息
if el.exists:
try:
m = re.search(r'(\d+)', el.label) # 抓到的第一个数字串
except Exception as e:
LogManager.method_error(f"解析收件箱数量异常: {e}", "检测消息", udid)
count = int(m.group(1)) if m else 0
if count:
break
else:
continue
except Exception as e:
print("刷视频脚本有错误:错误内容:", e)
LogManager.method_error(f"刷视频过程出现错误,重试", "养号", udid)
raise e # 抛出给上层,触发重生机制
except Exception as e:
print("刷视频遇到错误了。错误内容:", e)
LogManager.method_error(f"[{udid}] 养号出现异常,将重启流程: {e}", "养号", udid)
event.wait(timeout=3)
# 观看直播
def watchLiveForGrowth(self, udid, event, max_retries=None):
import random, wda
retry_count = 0
backoff_sec = 10
# —— 每次重启都新建 client/session ——
client = wda.USBClient(udid, ev.wdaFunctionPort)
session = client.session()
while not event.is_set():
try:
# 1) 先关再开
ControlUtils.closeTikTok(session, udid)
event.wait(timeout=1)
ControlUtils.openTikTok(session, udid)
event.wait(timeout=3)
session.appium_settings({"snapshotMaxDepth": 15})
# 2) 进入直播 (使用英文)
live_button = session(
xpath='//XCUIElementTypeButton[@name="LIVE" or @label="LIVE" or @name="直播" or @label="直播"] '
'| //XCUIElementTypeOther[@name="LIVE" or @label="LIVE" or @name="直播" or @label="直播"]'
)
if live_button.exists:
live_button.click()
else:
LogManager.method_error("无法找到直播间按钮 抛出异常 重新启动", "直播养号", udid)
# 抛出异常
raise Exception(f"找不到直播按钮,抛出异常 重新启动")
waitTime = random.randint(15, 20)
for _ in range(waitTime): # 0.2 秒一片
if event.is_set():
break
event.wait(timeout=1)
# live_button = session(xpath='//XCUIElementTypeButton[@name="直播"]')
live_button = session(
xpath='//XCUIElementTypeButton[@name="LIVE" or @label="LIVE" or @name="直播" or @label="直播"] '
'| //XCUIElementTypeOther[@name="LIVE" or @label="LIVE" or @name="直播" or @label="直播"]'
)
if live_button.exists:
continue
# 下滑一下
ControlUtils.swipe_up(client)
# 3) 取分辨率;可选重建 session 规避句柄陈旧
size = session.window_size()
width, height = size.width, size.height
# 4) 主循环:刷直播
while not event.is_set():
event.wait(timeout=3)
# 找到一个看直播的时候肯定有的元素,当这个元素没有的时候,就代表当前的页面出现了问题
# 需要抛出异常,重启这个流程
el1 = session.xpath('//XCUIElementTypeOther[@name="GBLFeedRootViewComponent"]')
el2 = session.xpath('//XCUIElementTypeOther[@value="0%"]')
if not (el1.exists or el2.exists):
print("当前页面不是直播间,重启刷直播")
LogManager.method_error("当前页面不是直播间,重启刷直播", "直播养号", udid=udid)
raise Exception("当前页面不是直播间")
else:
print("当前页面是直播间,继续刷直播")
LogManager.method_info("当前页面是直播间,继续刷直播", "直播养号", udid=udid)
# PK 直接划走
if session(xpath='//XCUIElementTypeOther[@name="kGBLInteractionViewMatchScoreBar"]').exists:
print("✅ 当前是 PK跳过")
LogManager.method_info("✅ 当前是 PK跳过", "直播养号", udid=udid)
ControlUtils.swipe_up(client)
continue
# 计算直播显示窗口数量(主画面+连麦小窗)
count = AiUtils.count_add_by_xml(session)
print(f"检测到直播显示区域窗口数:{count}")
if count > 1:
print("❌ 多窗口(有人连麦/分屏),划走")
LogManager.method_info("❌ 多窗口(有人连麦/分屏),划走", "直播养号", udid=udid)
ControlUtils.swipe_up(client)
continue
else:
print("✅ 单窗口,(10%概率)开始点赞")
LogManager.method_info("✅ 单窗口,(3%概率)开始点赞", "直播养号", udid=udid)
# 随机点赞(仍保留中途保护)
if random.random() >= 0.97:
print("开始点赞")
LogManager.method_info("开始点赞", "直播养号", udid=udid)
for _ in range(random.randint(10, 30)):
# 中途转PK/连麦立即跳过
if session(xpath='//XCUIElementTypeOther[@name="kGBLInteractionViewMatchScoreBar"]').exists \
or AiUtils.count_add_by_xml(session) > 1:
print("❗ 中途发生 PK/连麦,跳过")
LogManager.method_info("❗ 中途发生 PK/连麦,跳过", "直播养号", udid=udid)
ControlUtils.swipe_up(client)
break
x = width // 3 + random.randint(-10, 10)
y = height // 3 + random.randint(10, 20)
print("双击坐标:", x, y)
session.double_tap(x, y)
print("--------------------------------------------")
# 换成
total_seconds = random.randint(300, 600)
for _ in range(total_seconds): # 0.2 秒一片
if event.is_set():
break
event.wait(timeout=1)
ControlUtils.swipe_up(client)
# 正常退出(外部 event 触发)
break
except Exception as e:
retry_count += 1
LogManager.method_error(f"watchLiveForGrowth 异常(第{retry_count}次):{repr(e)}", "直播养号", udid)
# 尝试轻量恢复一次,避免一些短暂性 session 失效
try:
client = wda.USBClient(udid, ev.wdaFunctionPort)
_ = client.session()
except Exception:
pass
# 冷却后整段流程重来
for _ in range(backoff_sec): # 0.2 秒一片
if event.is_set():
break
event.wait(timeout=1)
continue
"""
外层包装,出现异常自动重试、
关注打招呼以及回复主播消息
"""
def safe_greetNewFollowers(self, udid, needReply, isComment, needTranslate, event):
retries = 0
while not event.is_set():
try:
self.greetNewFollowers(udid, needReply, isComment, needTranslate, event)
except Exception as e:
retries += 1
LogManager.method_error(f"greetNewFollowers 出现异常: {e},准备第 {retries} 次重试", "关注打招呼", udid)
event.wait(timeout=3)
if event.is_set():
LogManager.method_info("外层 while 检测到停止,即将 break", "关注打招呼", udid)
break
print("任务终止")
LogManager.method_error("greetNewFollowers任务终止", "关注打招呼", udid)
# 关注打招呼
def greetNewFollowers(self, udid, needReply, isComment, needTranslate, event):
client = wda.USBClient(udid, ev.wdaFunctionPort)
session = client.session()
print(f"是否要自动回复消息:{needReply}")
LogManager.method_info(f"是否要自动回复消息:{needReply}", "关注打招呼", udid)
# 先关闭Tik Tok
ControlUtils.closeTikTok(session, udid)
event.wait(timeout=1)
# 重新打开Tik Tok
ControlUtils.openTikTok(session, udid)
event.wait(timeout=3)
LogManager.method_info(f"重启tiktok", "关注打招呼", udid)
# 设置查找深度
session.appium_settings({"snapshotMaxDepth": 15})
session.appium_settings({"waitForQuiescence": False})
# 创建udid名称的目录
AiUtils.makeUdidDir(udid)
# 返回上一步
def goBack(count):
for i in range(count):
LogManager.method_info(f"返回上一步", "关注打招呼", udid)
session.appium_settings({"snapshotMaxDepth": 15})
ControlUtils.clickBack(session)
event.wait(timeout=2)
LogManager.method_info(f"循环条件1:{not event.is_set()}", "关注打招呼", udid)
LogManager.method_info(f"循环条件2:{len(anchorList) > 0}", "关注打招呼", udid)
LogManager.method_info(f"循环条件3:{not event.is_set() and len(anchorList) > 0}", "关注打招呼", udid)
# 循环条件。1、 循环关闭 2、 数据处理完毕
while not event.is_set():
print(f"关注打招呼开始循环,设备是:{udid}")
session.appium_settings({"waitForQuiescence": False})
LogManager.method_info("=== 外层 while 新一轮 ===", "关注打招呼", udid)
if event.is_set():
break
with self.lock:
# 获取一个主播,
LogManager.method_info(f"开始获取数据", "关注打招呼", udid)
# 获取一个主播,
result = AiUtils.peek_aclist_first()
LogManager.method_info(f"数据是:{result}", "关注打招呼", udid)
state = result.get("state", 0)
if not state:
LogManager.method_info(f"当前主播的状态是:{state} 不通行,取出数据移到列表尾部 继续下一个",
"关注打招呼",
udid)
AiUtils.pop_aclist_first(mode="move")
continue
# 并删除
anchor = AiUtils.pop_aclist_first()
LogManager.method_info(f"当前主播的状态是:{state} 通行,取出数据删除", "关注打招呼", udid)
if not anchor:
LogManager.method_info(f"数据库中的数据不足", "关注打招呼", udid)
if not self.interruptible_sleep(event, 30):
continue
aid = anchor.get("anchorId", "")
anchorCountry = anchor.get("country", "")
LogManager.method_info(f"主播的数据,用户名:{aid},国家:{anchorCountry}", "关注打招呼", udid)
# 点击搜索按钮
ControlUtils.clickSearch(session)
LogManager.method_info(f"点击搜索按钮", "关注打招呼", udid)
# 强制刷新session
session.appium_settings({"snapshotMaxDepth": 15})
# 查找输入框
input = session.xpath('//XCUIElementTypeSearchField')
if event.is_set():
break
# 如果找到了输入框,就点击并且输入内容
if input.exists:
input.click()
# 稍作停顿
event.wait(timeout=0.5)
else:
print(f"找不到输入框")
raise Exception("找不到输入框")
input = session.xpath('//XCUIElementTypeSearchField')
if input.exists:
input.clear_text()
event.wait(timeout=1)
# 输入主播id
LogManager.method_info(f"输入主播id:{aid or '暂无数据'}", "关注打招呼", udid)
input.set_text(f"{aid or '暂无数据'}\n")
# 定位 "关注" 按钮 通过关注按钮的位置点击主播首页
session.appium_settings({"snapshotMaxDepth": 25})
try:
if event.is_set():
break
# 点击进入首页
ControlUtils.clickFollow(session, aid)
if event.is_set():
break
LogManager.method_info("点击进入主播首页", "关注打招呼", udid)
except wda.WDAElementNotFoundError:
LogManager.method_info("未找到进入主播首页的按钮,使用第二个方案。", "关注打招呼", udid)
enter_room = ControlUtils.userClickProfile(session, aid)
if not enter_room:
goBack(2)
session.appium_settings({"snapshotMaxDepth": 15})
continue
event.wait(timeout=5)
# 找到并点击第一个视频
if event.is_set():
break
cellClickResult, workCount = ControlUtils.clickFirstVideoFromDetailPage(session)
LogManager.method_info(f"是否有视频:{cellClickResult} 作品的数量为:{workCount}", "关注打招呼", udid)
LogManager.method_info(f"点击第一个视频", "关注打招呼", udid)
event.wait(timeout=2)
# 观看主播视频
def viewAnchorVideo(workCount):
print("开始查看视频,并且重新调整查询深度")
LogManager.method_info("开始查看视频调整节点深度为5", "关注打招呼", udid)
session.appium_settings({"snapshotMaxDepth": 5})
if workCount > 3:
count = 3
else:
count = workCount
while count != 0:
if event.is_set():
break
LogManager.method_info("准备停止脚本", method="task")
for _ in range(5):
LogManager.method_info("停止脚本中", method="task")
if event.is_set():
break
event.wait(timeout=1)
LogManager.method_info("停止脚本成功", method="task")
img = client.screenshot()
event.wait(timeout=1)
# filePath = f"resources/{udid}/bgv.png"
base_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) # 当前脚本目录的上一级
filePath = os.path.join(base_dir, "resources", udid, "bgv.png")
dirPath = os.path.dirname(filePath)
if not os.path.exists(dirPath):
os.makedirs(dirPath)
img.save(filePath)
LogManager.method_info("保存屏幕图像成功", "关注打招呼", udid)
event.wait(timeout=2)
# 查找add图标
LogManager.method_info("开始点击点赞", "关注打招呼", udid)
ControlUtils.clickLike(session, udid)
count -= 1
LogManager.method_info("准备停止脚本", method="task")
# 随机看视频 15~30秒
for _ in range(random.randint(15, 30)):
LogManager.method_info("停止脚本中", method="task")
if event.is_set():
break
event.wait(timeout=1)
LogManager.method_info("停止脚本成功", method="task")
LogManager.method_info(f"是否进行评论:{isComment}", "关注打招呼", udid)
# 使用OCR进行评论
if isComment:
LogManager.method_info("调用方法进行评论", "关注打招呼", udid)
self.comment_flow(filePath, session, udid, 100, 100)
event.wait(timeout=2)
session.appium_settings({"snapshotMaxDepth": 12})
LogManager.method_info(f"检查当前是否为视频页面", "关注打招呼", udid)
# 最多尝试 3 次(第一次 + 再试两次)
for attempt in range(3):
is_back_enabled = ControlUtils.isClickBackEnabled(session)
if is_back_enabled: # 成功就立即跳出
break
# 失败日志
LogManager.method_info(f"返回失败,第 {attempt + 1} 次检查失败", "关注打招呼", udid)
# 最后一次失败不再点击,直接抛异常
if attempt == 2:
LogManager.method_info("返回失败,重启", "关注打招呼", udid)
raise Exception("返回失败,出现问题")
# 前两次失败:点一下再等 1 秒,进入下一次循环
session.tap(100, 100)
time.sleep(1)
if count != 0:
ControlUtils.swipe_up(client)
# 右滑返回
# client.swipe_right()
back_btn = ControlUtils.clickBack(session)
if not back_btn:
print("返回失败,出现问题")
raise Exception("返回失败,出现问题")
if event.is_set():
LogManager.method_info("viewAnchorVideo 检测到停止,提前退出", "关注打招呼", udid)
return
if event.is_set():
break
# 如果打开视频失败。说明该主播没有视频
if cellClickResult == True:
# 观看主播视频
LogManager.method_info("去查看主播视频", "关注打招呼", udid)
viewAnchorVideo(workCount)
event.wait(timeout=3)
LogManager.method_info("视频看完了,重置试图查询深度", "关注打招呼", udid)
session.appium_settings({"snapshotMaxDepth": 25})
event.wait(timeout=0.5)
# 向上滑动
ControlUtils.swipe_down(udid)
event.wait(timeout=2)
if event.is_set():
break
msgButton = AiUtils.getSendMesageButton(session)
event.wait(timeout=2)
if msgButton.exists:
# 进入聊天页面
msgButton.click()
LogManager.method_info("找到发消息按钮了", "关注打招呼", udid)
print("找到发消息按钮了")
else:
LogManager.method_info("没有识别出发消息按钮", "关注打招呼", udid)
print("没有识别出发消息按钮")
goBack(3)
session.appium_settings({"snapshotMaxDepth": 15})
continue
event.wait(timeout=3)
# 查找聊天界面中的输入框节点
chatInput = session.xpath("//TextView")
if chatInput.exists:
print("找到输入框了, 准备发送一条打招呼消息")
LogManager.method_info("找到输入框了, 准备发送一条打招呼消息", "关注打招呼", udid)
# LogManager.method_info(f"传递的打招呼的数据:{ev.prologueList}", "关注打招呼", udid)
# 取出国家进行对应国家语言代码
anchorCountry_code = CountryLanguageMapper.get_language_code(anchorCountry)
LogManager.method_info(f"获取的语言代码是:{ev.prologueList}", "关注打招呼", udid)
LogManager.method_info(f"存储的打招呼语句是:{ev.prologueList}", "关注打招呼", udid)
# 判断对应的语言代码是否在传入的字典中
if anchorCountry_code in ev.prologueList:
LogManager.method_info(f"在存储的字典中 打招呼语句是:{ev.prologueList}", "关注打招呼", udid)
# 进行原本的进行传入
privateMessageList = ev.prologueList[anchorCountry_code]
text = random.choice(privateMessageList)
msg = text
else:
LogManager.method_info(f"不在存储的字典中 打招呼语句是:{ev.prologueList}", "关注打招呼", udid)
# 需要翻译
privateMessageList = ev.prologueList['yolo']
# 准备打招呼的文案
text = random.choice(privateMessageList)
LogManager.method_info(f"取出打招呼的数据,{text}", "关注打招呼",
udid)
if needTranslate:
LogManager.method_info(f"需要翻译:{text},参数为:国家为{anchorCountry}, 即将进行翻译",
"关注打招呼", udid)
msg = Requester.translation(text, anchorCountry)
LogManager.method_info(f"翻译成功:{msg}, ", "关注打招呼", udid)
else:
msg = text
LogManager.method_info(f"即将发送的私信内容:{msg}", "关注打招呼", udid)
if event.is_set():
break
# 准备发送一条信息
chatInput = session.xpath("//TextView")
if chatInput.exists:
chatInput.click()
LogManager.method_info(f"即将发送的私信内容:{msg or '暂无数据'}", "关注打招呼", udid)
chatInput.set_text(f"{msg or '暂无数据'}\n")
event.wait(timeout=2)
# 发送消息
# input.set_text(f"{aid or '暂无数据'}\n")
event.wait(timeout=1)
else:
print("无法发送信息")
LogManager.method_info(f"给主播{aid} 发送消息失败", "关注打招呼", udid)
# 接着下一个主播
goBack(1)
# 点击关注按钮
# followButton = AiUtils.getFollowButton(session).get(timeout=5)
# if followButton is not None:
# # LogManager.method_info("找到关注按钮了", "关注打招呼", udid)
# # followButton.click()
# x, y, w, h = followButton.bounds
# cx = int(x + w / 2)
# cy = int(y + h / 2)
# # 随机偏移 ±5 px可自己改范围
# cx += random.randint(-5, 5)
# cy += random.randint(-5, 5)
#
# session.click(cx, cy)
#
# else:
# LogManager.method_info("没找到关注按钮", "关注打招呼", udid)
# time.sleep(1)
# goBack(4)
# session.appium_settings({"snapshotMaxDepth": 15})
# continue
if event.is_set():
break
session.appium_settings({"snapshotMaxDepth": 15})
goBack(3)
else:
print(f"{aid}:该主播没有视频")
LogManager.method_info(f"{aid}:该主播没有视频", "关注打招呼", udid)
goBack(3)
session.appium_settings({"snapshotMaxDepth": 15})
continue
# 设置查找深度
session.appium_settings({"snapshotMaxDepth": 15})
event.wait(timeout=2)
print("即将要回复消息")
LogManager.method_info("即将要回复消息", "关注打招呼", udid)
LogManager.method_info(f"是否需要进行监控消息:{needReply}", "监控消息")
if event.is_set():
break
if needReply:
print("如果需要回复主播消息。走此逻辑")
print("----------------------------------------------------------")
LogManager.method_info(f"进入准备监控消息方法", "监控消息")
# 执行回复消息逻辑
self.monitorMessages(session, udid, event)
homeButton = AiUtils.findHomeButton(session)
if homeButton.exists:
homeButton.click()
else:
ControlUtils.closeTikTok(session, udid)
event.wait(timeout=2)
ControlUtils.openTikTok(session, udid)
event.wait(timeout=3)
# 执行完成之后。继续点击搜索
session.appium_settings({"snapshotMaxDepth": 15})
else:
session.appium_settings({"snapshotMaxDepth": 15})
print("任务终止2")
print("greetNewFollowers方法执行完毕")
def safe_followAndGreetUnion(self, udid, needReply, needTranslate, event):
retries = 0
while not event.is_set():
try:
self.followAndGreetUnion(udid, needReply, needTranslate, event)
except Exception as e:
retries += 1
LogManager.method_error(f"greetNewFollowers 出现异常: {e},准备第 {retries} 次重试", "关注打招呼", udid)
event.wait(timeout=3)
if event.is_set():
LogManager.method_info("外层 while 检测到停止,即将 break", "关注打招呼", udid)
break
LogManager.method_error("greetNewFollowers 重试次数耗尽,任务终止", "关注打招呼", udid)
# 关注打招呼以及回复主播消息(联盟号)
def followAndGreetUnion(self, udid, needReply, needTranslate, event):
client = wda.USBClient(udid, ev.wdaFunctionPort)
session = client.session()
print(f"是否要自动回复消息:{needReply}")
LogManager.method_info(f"是否要自动回复消息:{needReply}", "关注打招呼(联盟号)", udid)
# 先关闭Tik Tok
ControlUtils.closeTikTok(session, udid)
event.wait(timeout=1)
# 重新打开Tik Tok
ControlUtils.openTikTok(session, udid)
event.wait(timeout=3)
LogManager.method_info(f"重启tiktok", "关注打招呼(联盟号)", udid)
# 设置查找深度
session.appium_settings({"snapshotMaxDepth": 15})
# 创建udid名称的目录
AiUtils.makeUdidDir(udid)
# 返回上一步
def goBack(count):
for i in range(count):
LogManager.method_info(f"返回上一步", "关注打招呼(联盟号)", udid)
session.appium_settings({"snapshotMaxDepth": 15})
ControlUtils.clickBack(session)
event.wait(timeout=2)
LogManager.method_info(f"循环条件1:{not event.is_set()}", "关注打招呼(联盟号)", udid)
LogManager.method_info(f"循环条件2:{len(anchorList) > 0}", "关注打招呼(联盟号)", udid)
LogManager.method_info(f"循环条件3:{not event.is_set() and len(anchorList) > 0}", "关注打招呼(联盟号)", udid)
# 循环条件。1、 循环关闭 2、 数据处理完毕
while not event.is_set():
LogManager.method_info("=== 外层 while 新一轮 ===", "关注打招呼(联盟号)", udid)
if event.is_set():
break
with self.lock:
# 获取一个主播,
LogManager.method_info(f"开始获取数据", "关注打招呼(联盟号)", udid)
# 获取一个主播,
result = AiUtils.peek_aclist_first()
LogManager.method_info(f"数据是:{result}", "关注打招呼(联盟号)", udid)
state = result.get("state", 0)
if not state:
LogManager.method_info(f"当前主播的状态是:{state} 不通行,取出数据移到列表尾部 继续下一个",
"关注打招呼(联盟号)",
udid)
AiUtils.pop_aclist_first(mode="move")
continue
# 并删除
anchor = AiUtils.pop_aclist_first()
LogManager.method_info(f"当前主播的状态是:{state} 通行,取出数据删除", "关注打招呼(联盟号)", udid)
if not anchor:
LogManager.method_info(f"数据库中的数据不足", "关注打招呼(联盟号)", udid)
if not self.interruptible_sleep(event, 30):
continue
aid = anchor.get("anchorId", "")
anchorCountry = anchor.get("country", "")
LogManager.method_info(f"主播的数据,用户名:{aid},国家:{anchorCountry}", "关注打招呼(联盟号)", udid)
# 点击搜索按钮
ControlUtils.clickSearch(session)
LogManager.method_info(f"点击搜索按钮", "关注打招呼(联盟号)", udid)
# 强制刷新session
session.appium_settings({"snapshotMaxDepth": 15})
# 查找输入框
input = session.xpath('//XCUIElementTypeSearchField')
# 如果找到了输入框,就点击并且输入内容
if input.exists:
input.click()
# 稍作停顿
event.wait(timeout=0.5)
else:
print(f"找不到输入框")
raise Exception("找不到输入框")
input = session.xpath('//XCUIElementTypeSearchField')
if input.exists:
input.clear_text()
event.wait(timeout=1)
# 输入主播id
LogManager.method_info(f"搜索主播名称:{aid or '暂无数据'}", "关注打招呼(联盟号)", udid)
input.set_text(f"{aid or '暂无数据'}\n")
# 定位 "关注" 按钮 通过关注按钮的位置点击主播首页
session.appium_settings({"snapshotMaxDepth": 25})
try:
# 点击进入首页
ControlUtils.clickFollow(session, aid)
LogManager.method_info("点击进入主播首页", "关注打招呼(联盟号)", udid)
except wda.WDAElementNotFoundError:
LogManager.method_info("未找到进入主播首页的按钮,使用第二个方案。", "关注打招呼(联盟号)", udid)
enter_room = ControlUtils.userClickProfile(session, aid)
if not enter_room:
goBack(2)
session.appium_settings({"snapshotMaxDepth": 15})
continue
event.wait(timeout=2)
session.appium_settings({"snapshotMaxDepth": 25})
event.wait(timeout=0.5)
# 向上滑动
ControlUtils.swipe_down(udid)
event.wait(timeout=2)
msgButton = AiUtils.getSendMesageButton(session)
event.wait(timeout=2)
if msgButton.exists:
# 进入聊天页面
msgButton.click()
LogManager.method_info("找到发消息按钮了", "关注打招呼(联盟号)", udid)
print("找到发消息按钮了")
else:
LogManager.method_info("没有识别出发消息按钮", "关注打招呼(联盟号)", udid)
print("没有识别出发消息按钮")
goBack(3)
session.appium_settings({"snapshotMaxDepth": 15})
continue
event.wait(timeout=3)
# 查找聊天界面中的输入框节点
chatInput = session.xpath("//TextView")
if chatInput.exists:
print("找到输入框了, 准备发送一条打招呼消息")
LogManager.method_info("找到输入框了, 准备发送一条打招呼消息", "关注打招呼(联盟号)", udid)
# 取出国家进行对应国家语言代码
anchorCountry_code = CountryLanguageMapper.get_language_code(anchorCountry)
print(anchorCountry_code)
print("存储的是:", ev.prologueList)
# 判断对应的语言代码是否在传入的字典中
if anchorCountry_code in ev.prologueList:
# 进行原本的进行传入
privateMessageList = ev.prologueList[anchorCountry_code]
text = random.choice(privateMessageList)
msg = text
else:
# 从yolo中拿取
privateMessageList = ev.prologueList['yolo']
text = random.choice(privateMessageList)
if needTranslate:
# 翻译成主播国家的语言
LogManager.method_info(f"需要翻译:{text},参数为:国家为{anchorCountry}, 即将进行翻译",
"关注打招呼(联盟号)", udid)
msg = Requester.translation(text, anchorCountry)
LogManager.method_info(f"翻译成功:{msg}, ", "关注打招呼(联盟号)", udid)
else:
msg = text
LogManager.method_info(f"即将发送的私信内容:{msg}", "关注打招呼(联盟号)", udid)
# 准备发送一条信息
chatInput = session.xpath("//TextView")
if chatInput.exists:
chatInput.click()
LogManager.method_info(f"即将发送的私信内容:{msg or '暂无数据'}", "关注打招呼(联盟号)", udid)
chatInput.set_text(f"{msg or '暂无数据'}\n")
event.wait(timeout=2)
# 发送消息
# input.set_text(f"{aid or '暂无数据'}\n")
event.wait(timeout=1)
else:
print("无法发送信息")
LogManager.method_info(f"给主播{aid} 发送消息失败", "关注打招呼(联盟号)", udid)
# 接着下一个主播
goBack(1)
# 点击关注按钮
# followButton = AiUtils.getFollowButton(session).get(timeout=5)
# if followButton is not None:
# # LogManager.method_info("找到关注按钮了", "关注打招呼", udid)
# # followButton.click()
# x, y, w, h = followButton.bounds
# cx = int(x + w / 2)
# cy = int(y + h / 2)
# # 随机偏移 ±5 px可自己改范围
# cx += random.randint(-5, 5)
# cy += random.randint(-5, 5)
#
# ControlUtils.tap_mini_cluster(cx, cy, session)
#
# else:
# LogManager.method_info("没找到关注按钮", "关注打招呼", udid)
# time.sleep(1)
# goBack(4)
# session.appium_settings({"snapshotMaxDepth": 15})
# continue
session.appium_settings({"snapshotMaxDepth": 15})
goBack(3)
# 设置查找深度
session.appium_settings({"snapshotMaxDepth": 15})
event.wait(timeout=2)
print("即将要回复消息")
LogManager.method_info("即将要回复消息", "关注打招呼(联盟号)", udid)
if needReply:
print("如果需要回复主播消息。走此逻辑")
print("----------------------------------------------------------")
print("监控回复消息")
# 执行回复消息逻辑
self.monitorMessages(session, udid, event)
homeButton = AiUtils.findHomeButton(session)
if homeButton.exists:
homeButton.click()
else:
ControlUtils.closeTikTok(session, udid)
event.wait(timeout=2)
ControlUtils.openTikTok(session, udid)
event.wait(timeout=3)
print("重新创建wda会话 防止wda会话失效")
# 执行完成之后。继续点击搜索
session.appium_settings({"snapshotMaxDepth": 15})
else:
session.appium_settings({"snapshotMaxDepth": 15})
print("greetNewFollowers方法执行完毕")
# 检测消息
def replyMessages(self, udid, event):
try:
client = wda.USBClient(udid, ev.wdaFunctionPort)
session = client.session()
except Exception as e:
LogManager.method_error(f"创建wda会话异常: {e}", "检测消息", udid)
return
LogManager.method_info("开始重启tiktok", "监控消息")
ControlUtils.closeTikTok(session, udid)
event.wait(timeout=2)
# time.sleep(1)
ControlUtils.openTikTok(session, udid)
event.wait(timeout=3)
# time.sleep(1)
LogManager.method_info("重启tiktok成功", "监控消息")
while not event.is_set():
try:
# 调用检测消息的方法
self.monitorMessages(session, udid, event)
except Exception as e:
LogManager.method_error(f"监控消息 出现异常: {e},重新启动监控直播", "检测消息", udid)
LogManager.method_info(f"出现异常时,稍等再重启 TikTok 并重试 异常是: {e}", "监控消息", udid)
LogManager.method_info(f"出现异常重新创建wda", "监控消息", udid)
LogManager.method_info(f"重启 TikTok", "监控消息", udid)
# 出现异常时,稍等再重启 TikTok 并重试
ControlUtils.closeTikTok(session, udid)
event.wait(timeout=2)
# time.sleep(1)
ControlUtils.openTikTok(session, udid)
event.wait(timeout=3)
# time.sleep(1)
LogManager.method_info("TikTok 重启成功", "监控消息", udid)
continue # 重新进入 while 循环,调用 monitorMessages
# def monitorMessages(self, session, udid, event):
#
# LogManager.method_info("脚本开始执行中", "监控消息")
#
# # 调整节点的深度为 7
# session.appium_settings({"snapshotMaxDepth": 7})
#
# el = session.xpath(
# '//XCUIElementTypeButton[@name="a11y_vo_inbox"]'
# ' | '
# '//XCUIElementTypeButton[contains(@name,"收件箱")]'
# ' | '
# '//XCUIElementTypeButton[.//XCUIElementTypeStaticText[@value="收件箱"]]'
# )
#
# # 如果收件箱有消息 则进行点击
# if el.exists:
# try:
# m = re.search(r'(\d+)', el.label) # 抓到的第一个数字串
# except Exception as e:
# LogManager.method_error(f"解析收件箱数量异常: {e}", "检测消息", udid)
#
# count = int(m.group(1)) if m else 0
#
# if count:
# el.click()
# session.appium_settings({"snapshotMaxDepth": 25})
# event.wait(timeout=3)
# while True:
# print("循环开始")
# info_count = 0
# # 创建新的会话
# el = session.xpath(
# '//XCUIElementTypeButton[@name="a11y_vo_inbox"]'
# ' | '
# '//XCUIElementTypeButton[contains(@name,"收件箱")]'
# ' | '
# '//XCUIElementTypeButton[.//XCUIElementTypeStaticText[@value="收件箱"]]'
# )
#
# if not el.exists:
# LogManager.method_error(f"检测不到收件箱", "检测消息", udid)
# raise Exception("当前页面找不到收件箱,重启")
# # break
#
# # 支持中文“收件箱”和英文“Inbox”
# xpath_query = (
# "//XCUIElementTypeStaticText"
# "[@value='收件箱' or @label='收件箱' or @name='收件箱'"
# " or @value='Inbox' or @label='Inbox' or @name='Inbox']"
# )
#
# # 查找所有收件箱节点
# inbox_nodes = session.xpath(xpath_query).find_elements()
# if len(inbox_nodes) < 2:
# LogManager.method_error(f"当前页面不再收件箱页面,重启", "检测消息", udid)
# raise Exception("当前页面不再收件箱页面,重启")
#
# m = re.search(r'(\d+)', el.label) # 抓到的第一个数字串
# count = int(m.group(1)) if m else 0
#
# if not count:
# LogManager.method_info(f"当前收件箱的总数量{count}", "检测消息", udid)
# break
#
# # 新粉丝
# xp_new_fan_badge = (
# "//XCUIElementTypeCell[.//XCUIElementTypeLink[@name='新粉丝']]"
# "//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
# )
#
# # 活动
# xp_activity_badge = (
# "//XCUIElementTypeCell[.//XCUIElementTypeLink[@name='活动']]"
# "//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
# )
#
# # 系统通知
# xp_system_badge = (
# "//XCUIElementTypeCell[.//XCUIElementTypeLink[@name='系统通知']]"
# "//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
# )
#
# # 消息请求
# xp_request_badge = (
# "//XCUIElementTypeCell"
# "[.//*[self::XCUIElementTypeLink or self::XCUIElementTypeStaticText]"
# " [@name='消息请求' or @label='消息请求' or @value='消息请求']]"
# "//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
# )
#
# # 用户消息
# xp_badge_numeric = (
# "("
# # 你的两类未读容器组件 + 数字徽标value 纯数字)
# "//XCUIElementTypeOther["
# " @name='AWEIMChatListCellUnreadCountViewComponent'"
# " or @name='TikTokIMImpl.InboxCellUnreadCountViewBuilder'"
# "]//XCUIElementTypeStaticText[@value and translate(@value,'0123456789','')='']"
# ")/ancestor::XCUIElementTypeCell[1]"
# " | "
# # 兜底:任何在 CollectionView 下、value 纯数字的徽标 → 找其最近的 Cell
# "//XCUIElementTypeCollectionView//XCUIElementTypeStaticText"
# "[@value and translate(@value,'0123456789','')='']"
# "/ancestor::XCUIElementTypeCell[1]"
# )
#
# try:
# # 如果 2 秒内找不到,会抛异常
# user_text = session.xpath(xp_badge_numeric).get(timeout=2.0)
# val = (user_text.info.get("value") or
# user_text.info.get("label") or
# user_text.info.get("name"))
# LogManager.method_info(f"用户未读数量:{val}", "检测消息", udid)
# except Exception:
# LogManager.method_warning("当前屏幕没有找到 用户 未读徽标数字", "检测消息", udid)
# print("当前屏幕没有找到 用户消息 未读徽标数字", udid)
# user_text = None
# info_count += 1
#
# if user_text:
#
# user_text.tap()
# event.wait(timeout=3)
#
#
#
# xml = session.source()
# time.sleep(1)
# msgs = AiUtils.extract_messages_from_xml(xml)
# time.sleep(1)
#
# last_in = None # 对方最后一句
# last_out = None # 我方最后一句
# attempt = 0 # 已试次数(首次算第 1 次)
#
# while attempt < 3:
# # 1. 本轮扫描
# for item in reversed(msgs):
# if item.get('type') != 'msg':
# continue
# if last_in is None and item['dir'] == 'in':
# last_in = item['text']
# if last_out is None and item['dir'] == 'out':
# last_out = item['text']
# if last_in and last_out:
# break
#
# # 2. 有一条为空就重试
# if not last_in or not last_out:
# attempt += 1
# if attempt == 3:
# break # 三次用完,放弃
# time.sleep(0.2)
# xml = session.source()
# msgs = AiUtils.extract_messages_from_xml(xml)
# continue
# else:
# break # 至少一条有内容,成功退出
#
# LogManager.method_info(f"检测到对方最后发送的消息:{last_in}", "检测消息", udid)
# LogManager.method_info(f"检测我发送的最后一条信息:{last_out}", "检测消息", udid)
#
#
# # 获取主播的名称
# # anchor_name = AiUtils.get_navbar_anchor_name(session)
# anchor_name = ""
# for _ in range(3): # 最多 2 次重试 + 1 次初始
# anchor_name = AiUtils.get_navbar_anchor_name(session)
# if anchor_name:
# break
# time.sleep(1)
#
# LogManager.method_info(f"获取主播的名称:{anchor_name}", "检测消息", udid)
# LogManager.method_info(f"获取主播最后发送的消息 即将翻译:{last_in}", "检测消息", udid)
#
# if last_in is not None:
# chinese_last_msg_text = Requester.translationToChinese(last_in)
# else:
# chinese_last_msg_text = ""
#
# # 进行判断,判断翻译后是否
#
# LogManager.method_info(f"翻译中文后的内容,交给前端进行展示:{chinese_last_msg_text}", "检测消息",
# udid)
#
# # 找到输入框
# last_data = [{
# "sender": anchor_name,
# "device": udid,
# "time": datetime.now().strftime("%Y-%m-%d %H:%M"),
# "text": chinese_last_msg_text,
# "status": 0
# }]
#
# LogManager.method_info(f"主播最后发送的数据,传递给前端进行记录:{chinese_last_msg_text}",
# "检测消息", udid)
#
# # 把主播的名称存储到c盘
# JsonUtils.append_json_items(last_data, "log/last_message.json")
#
# # 从C盘中读取数据
# anchorWithSession = IOSAIStorage.load()
#
# sel = session.xpath("//TextView")
# if anchor_name not in anchorWithSession:
#
# # 如果是第一次发消息(没有sessionId的情况)
# LogManager.method_info(f"第一次发消息:{anchor_name},没有记忆 开始请求ai", "检测消息", udid)
# LogManager.method_info(f"向ai发送的参数: 文本为:{last_in}", "检测消息", udid)
#
# if last_in is None:
# LogManager.method_info(f"检测不到对方发送的最后一条消息,发送一条打招呼", "检测消息",
# udid)
#
# text = "ok"
# if last_out:
# text = last_out
#
# if sel.exists:
# sel.click() # 聚焦
# event.wait(timeout=1)
# sel.clear_text()
#
# LogManager.method_info(
# f"发送的消息检测不到对方发送的消息不走ai{text or '暂无数据'}", "检测消息",
# udid)
#
# sel.set_text(f"{text or '暂无数据'}\n")
# else:
# LogManager.method_error("找不到输入框,重启", "检测消息", udid)
# raise Exception("找不到输入框,重启")
# else:
# aiResult, sessionId = Requester.chatToAi({"query": last_in, "user": "1"})
# IOSAIStorage.save({anchor_name: sessionId}, mode="merge")
#
# # 找到输入框输入ai返回出来的消息
#
# if sel.exists:
# sel.click() # 聚焦
# event.wait(timeout=1)
# sel.clear_text()
# LogManager.method_info(
# f"发送的消息检测到对方发送的消息进行走ai没记忆{aiResult or '暂无数据'}",
# "检测消息",
# udid)
# sel.set_text(f"{aiResult or '暂无数据'}\n")
# else:
# LogManager.method_error("找不到输入框,重启", "检测消息", udid)
# raise Exception("找不到输入框,重启")
# else:
# print("有记忆")
#
# LogManager.method_info(f"不是一次发消息:{anchor_name},有记忆", "检测消息", udid)
# # 如果不是第一次发消息证明存储的有sessionId
# sessionId = anchorWithSession[anchor_name]
#
# if last_in is None:
# last_in = "ok"
# if sel.exists:
# sel.click() # 聚焦
# event.wait(timeout=1)
# sel.clear_text()
# LogManager.method_info(
# f"发送的消息检测到对方发送的消息进行走ai有记忆{last_in or '暂无数据'}",
# "检测消息",
# udid)
# sel.set_text(f"{last_in or '暂无数据'}\n")
# else:
#
# # TODO: user后续添加暂时写死
#
# LogManager.method_info(f"向ai发送的参数: 文本为:{last_in}", "检测消息", udid)
#
# aiResult, sessionId = Requester.chatToAi(
# {"query": last_in, "conversation_id": sessionId, "user": "1"})
#
# if sel.exists:
# sel.click() # 聚焦
# event.wait(timeout=1)
# sel.clear_text()
# LogManager.method_info(
# f"发送的消息检测到对方发送的消息进行走ai有记忆{aiResult or '暂无数据'}",
# "检测消息",
# udid)
# sel.set_text(f"{aiResult or '暂无数据'}\n")
#
# LogManager.method_info(f"存储的sessionId:{anchorWithSession}", "检测消息", udid)
# event.wait(timeout=1)
#
# # 返回
# ControlUtils.clickBack(session)
#
# # 重新回到收件箱页面后,强制刷新节点
# session.appium_settings({"snapshotMaxDepth": 25})
# event.wait(timeout=1)
#
# try:
# # 如果 2 秒内找不到,会抛异常
# badge_text = session.xpath(xp_new_fan_badge).get(timeout=2.0)
# val = (badge_text.info.get("value") or
# badge_text.info.get("label") or
# badge_text.info.get("name"))
#
# LogManager.method_info(f"新粉丝未读数量:{val}", "检测消息", udid)
# if badge_text:
# badge_text.tap()
# event.wait(timeout=1)
# ControlUtils.clickBack(session)
# event.wait(timeout=1)
# except Exception:
# LogManager.method_warning("当前屏幕没有找到 新粉丝 未读徽标数字", "检测消息", udid)
# print("当前屏幕没有找到 新粉丝 未读徽标数字", udid)
# badge_text = None
# info_count += 1
#
# try:
# # 如果 2 秒内找不到,会抛异常
# badge_text = session.xpath(xp_activity_badge).get(timeout=2.0)
# val = (badge_text.info.get("value") or
# badge_text.info.get("label") or
# badge_text.info.get("name"))
# LogManager.method_info(f"活动未读数量:{val}", "检测消息", udid)
# if badge_text:
# badge_text.tap()
# event.wait(timeout=1)
# ControlUtils.clickBack(session)
# event.wait(timeout=1)
# except Exception:
# LogManager.method_warning("当前屏幕没有找到 活动 未读徽标数字", "检测消息", udid)
# print("当前屏幕没有找到 活动 未读徽标数字", udid)
# badge_text = None
# info_count += 1
#
# try:
# # 如果 2 秒内找不到,会抛异常
# badge_text = session.xpath(xp_system_badge).get(timeout=2.0)
# val = (badge_text.info.get("value") or
# badge_text.info.get("label") or
# badge_text.info.get("name"))
# LogManager.method_info(f"系统通知未读数量:{val}", "检测消息", udid)
# if badge_text:
# badge_text.tap()
# event.wait(timeout=1)
# ControlUtils.clickBack(session)
# event.wait(timeout=1)
# except Exception:
# LogManager.method_warning("当前屏幕没有找到 系统通知 未读徽标数字", "检测消息", udid)
# print("当前屏幕没有找到 系统通知 未读徽标数字", udid)
# badge_text = None
# info_count += 1
#
# try:
# # 如果 2 秒内找不到,会抛异常
# badge_text = session.xpath(xp_request_badge).get(timeout=2.0)
# val = (badge_text.info.get("value") or
# badge_text.info.get("label") or
# badge_text.info.get("name"))
# LogManager.method_info(f"消息请求未读数量:{val}", "检测消息", udid)
# if badge_text:
# badge_text.tap()
# event.wait(timeout=1)
# ControlUtils.clickBack(session)
# event.wait(timeout=1)
# except Exception:
# LogManager.method_warning("当前屏幕没有找到 消息请求 未读徽标数字", "检测消息", udid)
# print("当前屏幕没有找到 消息请求 未读徽标数字", udid)
# badge_text = None
# info_count += 1
#
# # 双击收件箱 定位到消息的位置
# if info_count == 5:
# r = el.bounds # 可能是命名属性,也可能是 tuple
# cx = int((r.x + r.width / 2) if hasattr(r, "x") else (r[0] + r[2] / 2))
# cy = int((r.y + r.height / 2) if hasattr(r, "y") else (r[1] + r[3] / 2))
# session.double_tap(cx, cy) # 可能抛异常:方法不存在
# LogManager.method_info(f"双击收件箱 定位到信息", "检测消息", udid)
# else:
#
# return
# else:
# LogManager.method_error(f"检测不到收件箱", "检测消息", udid)
# raise Exception("当前页面找不到收件箱,重启")
def monitorMessages(self, session, udid, event):
LogManager.method_info("脚本开始执行中", "监控消息")
# 调整节点的深度为 7
session.appium_settings({"snapshotMaxDepth": 7})
el = session.xpath(
'//XCUIElementTypeButton[@name="a11y_vo_inbox"]'
' | '
'//XCUIElementTypeButton[contains(@name,"收件箱")]'
' | '
'//XCUIElementTypeButton[.//XCUIElementTypeStaticText[@value="收件箱"]]'
)
# 如果收件箱有消息 则进行点击
if el.exists:
try:
m = re.search(r'(\d+)', el.label) # 抓到的第一个数字串
except Exception as e:
LogManager.method_error(f"解析收件箱数量异常: {e}", "检测消息", udid)
count = int(m.group(1)) if m else 0
if count:
el.click()
session.appium_settings({"snapshotMaxDepth": 25})
event.wait(timeout=3)
while True:
print("循环开始")
info_count = 0
# 创建新的会话
el = session.xpath(
'//XCUIElementTypeButton[@name="a11y_vo_inbox"]'
' | '
'//XCUIElementTypeButton[contains(@name,"收件箱")]'
' | '
'//XCUIElementTypeButton[.//XCUIElementTypeStaticText[@value="收件箱"]]'
)
if not el.exists:
LogManager.method_error(f"检测不到收件箱", "检测消息", udid)
raise Exception("当前页面找不到收件箱,重启")
# break
# 支持中文“收件箱”和英文“Inbox”
xpath_query = (
"//XCUIElementTypeStaticText"
"[@value='收件箱' or @label='收件箱' or @name='收件箱'"
" or @value='Inbox' or @label='Inbox' or @name='Inbox']"
)
# 查找所有收件箱节点
inbox_nodes = session.xpath(xpath_query).find_elements()
if len(inbox_nodes) < 2:
LogManager.method_error(f"当前页面不再收件箱页面,重启", "检测消息", udid)
raise Exception("当前页面不再收件箱页面,重启")
m = re.search(r'(\d+)', el.label) # 抓到的第一个数字串
count = int(m.group(1)) if m else 0
if not count:
LogManager.method_info(f"当前收件箱的总数量{count}", "检测消息", udid)
break
# 新粉丝
xp_new_fan_badge = (
"//XCUIElementTypeCell[.//XCUIElementTypeLink[@name='新粉丝']]"
"//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
)
# 活动
# xp_activity_badge = (
# "//XCUIElementTypeCell[.//XCUIElementTypeLink[@name='活动']]"
# "//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
# )
xp_activity_badge = (
"//XCUIElementTypeLink[@name='活动']/ancestor::XCUIElementTypeCell[1]"
"//XCUIElementTypeStaticText["
" @value and "
" (translate(@value,'0123456789','')='' or @value='99+')"
"]"
)
# 系统通知
xp_system_badge = (
"//XCUIElementTypeCell[.//XCUIElementTypeLink[@name='系统通知']]"
"//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
)
# 消息请求
xp_request_badge = (
"//XCUIElementTypeCell"
"[.//*[self::XCUIElementTypeLink or self::XCUIElementTypeStaticText]"
" [@name='消息请求' or @label='消息请求' or @value='消息请求']]"
"//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
)
# 用户消息
xp_badge_numeric = (
"("
# 你的两类未读容器组件 + 数字徽标value 纯数字)
"//XCUIElementTypeOther["
" @name='AWEIMChatListCellUnreadCountViewComponent'"
" or @name='TikTokIMImpl.InboxCellUnreadCountViewBuilder'"
"]//XCUIElementTypeStaticText[@value and translate(@value,'0123456789','')='']"
")/ancestor::XCUIElementTypeCell[1]"
)
try:
# 如果 2 秒内找不到,会抛异常
user_text = session.xpath(xp_badge_numeric).get(timeout=2.0)
val = (user_text.info.get("value") or
user_text.info.get("label") or
user_text.info.get("name"))
LogManager.method_info(f"用户未读数量:{val}", "检测消息", udid)
except Exception:
LogManager.method_warning("当前屏幕没有找到 用户 未读徽标数字", "检测消息", udid)
print("当前屏幕没有找到 用户消息 未读徽标数字", udid)
user_text = None
info_count += 1
if user_text:
print("点击进入用户主页")
user_text.tap()
event.wait(timeout=3)
xml = session.source()
time.sleep(1)
msgs = AiUtils.extract_messages_from_xml(xml)
time.sleep(1)
last_in = None # 对方最后一句
last_out = None # 我方最后一句
attempt = 0 # 已试次数(首次算第 1 次)
while attempt < 3:
# 1. 本轮扫描
for item in reversed(msgs):
if item.get('type') != 'msg':
continue
if last_in is None and item['dir'] == 'in':
last_in = item['text']
if last_out is None and item['dir'] == 'out':
last_out = item['text']
if last_in and last_out:
break
# 2. 有一条为空就重试
if not last_in or not last_out:
attempt += 1
if attempt == 3:
break # 三次用完,放弃
time.sleep(0.2)
xml = session.source()
msgs = AiUtils.extract_messages_from_xml(xml)
continue
else:
break # 至少一条有内容,成功退出
LogManager.method_info(f"检测到对方最后发送的消息:{last_in}", "检测消息", udid)
LogManager.method_info(f"检测我发送的最后一条信息:{last_out}", "检测消息", udid)
# 获取主播的名称
# anchor_name = AiUtils.get_navbar_anchor_name(session)
anchor_name = ""
for _ in range(3): # 最多 2 次重试 + 1 次初始
anchor_name = AiUtils.get_navbar_anchor_name(session)
if anchor_name:
break
time.sleep(1)
LogManager.method_info(f"获取主播的名称:{anchor_name}", "检测消息", udid)
LogManager.method_info(f"获取主播最后发送的消息 即将翻译:{last_in}", "检测消息", udid)
chinese_last_msg_text = ""
if last_in is not None:
for attempt in range(3):
chinese_last_msg_text = Requester.translationToChinese(last_in)
if chinese_last_msg_text: # 非空则跳出循环
break
else:
chinese_last_msg_text = ""
# 进行判断,判断翻译后是否
LogManager.method_info(f"翻译中文后的内容,交给前端进行展示:{chinese_last_msg_text}", "检测消息",
udid)
# 找到输入框
last_data = [{
"sender": anchor_name,
"device": udid,
"time": datetime.now().strftime("%Y-%m-%d %H:%M"),
"text": chinese_last_msg_text,
"status": 0
}]
LogManager.method_info(f"主播最后发送的数据,传递给前端进行记录:{chinese_last_msg_text}",
"检测消息", udid)
if last_data != "" and last_data != "消息请求":
# 把主播的名称存储到c盘
JsonUtils.append_json_items(last_data, "log/last_message.json")
# 从C盘中读取数据
anchorWithSession = IOSAIStorage.load()
sel = session.xpath("//TextView")
if anchor_name not in anchorWithSession:
# 如果是第一次发消息(没有sessionId的情况)
LogManager.method_info(f"第一次发消息:{anchor_name},没有记忆 开始请求ai", "检测消息", udid)
LogManager.method_info(f"向ai发送的参数: 文本为:{last_in}", "检测消息", udid)
if last_in is None:
LogManager.method_info(f"检测不到对方发送的最后一条消息,发送一条打招呼", "检测消息",
udid)
text = "ok"
if last_out and last_out != "已看过":
text = last_out
if sel.exists:
sel.click() # 聚焦
event.wait(timeout=1)
sel.clear_text()
LogManager.method_info(
f"发送的消息检测不到对方发送的消息不走ai{text or '暂无数据'}", "检测消息",
udid)
sel.set_text(f"{text or '暂无数据'}\n")
else:
LogManager.method_error("找不到输入框,重启", "检测消息", udid)
raise Exception("找不到输入框,重启")
else:
aiResult, sessionId = Requester.chatToAi({"query": last_in, "user": "1"})
if anchor_name and anchor_name != "消息请求":
IOSAIStorage.save({anchor_name: sessionId}, mode="merge")
else:
LogManager.method_warning(f"跳过保存 sessionIdanchor_name 不合法: '{anchor_name}'",
"检测消息", udid)
# 找到输入框输入ai返回出来的消息
if sel.exists:
sel.click() # 聚焦
event.wait(timeout=1)
sel.clear_text()
LogManager.method_info(
f"发送的消息检测到对方发送的消息进行走ai没记忆{aiResult or '暂无数据'}",
"检测消息",
udid)
sel.set_text(f"{aiResult or '暂无数据'}\n")
else:
LogManager.method_error("找不到输入框,重启", "检测消息", udid)
raise Exception("找不到输入框,重启")
else:
print("有记忆")
LogManager.method_info(f"不是一次发消息:{anchor_name},有记忆", "检测消息", udid)
# 如果不是第一次发消息证明存储的有sessionId
if anchor_name and anchor_name != "消息请求":
sessionId = anchorWithSession[anchor_name]
if last_in is None:
last_in = "ok"
if sel.exists:
sel.click() # 聚焦
event.wait(timeout=1)
sel.clear_text()
LogManager.method_info(
f"发送的消息检测到对方发送的消息进行走ai有记忆{last_in or '暂无数据'}",
"检测消息",
udid)
sel.set_text(f"{last_in or '暂无数据'}\n")
else:
# TODO: user后续添加暂时写死
LogManager.method_info(f"向ai发送的参数: 文本为:{last_in}", "检测消息", udid)
aiResult, sessionId = Requester.chatToAi(
{"query": last_in, "conversation_id": sessionId, "user": "1"})
if sel.exists:
sel.click() # 聚焦
event.wait(timeout=1)
sel.clear_text()
LogManager.method_info(
f"发送的消息检测到对方发送的消息进行走ai有记忆{aiResult or '暂无数据'}",
"检测消息",
udid)
sel.set_text(f"{aiResult or '暂无数据'}\n")
LogManager.method_info(f"存储的sessionId:{anchorWithSession}", "检测消息", udid)
event.wait(timeout=1)
# 返回
ControlUtils.clickBack(session)
# 重新回到收件箱页面后,强制刷新节点
session.appium_settings({"snapshotMaxDepth": 25})
event.wait(timeout=1)
try:
# 如果 2 秒内找不到,会抛异常
badge_text = session.xpath(xp_new_fan_badge).get(timeout=2.0)
val = (badge_text.info.get("value") or
badge_text.info.get("label") or
badge_text.info.get("name"))
LogManager.method_info(f"新粉丝未读数量:{val}", "检测消息", udid)
if badge_text:
badge_text.tap()
event.wait(timeout=1)
ControlUtils.clickBack(session)
event.wait(timeout=1)
except Exception:
LogManager.method_warning("当前屏幕没有找到 新粉丝 未读徽标数字", "检测消息", udid)
print("当前屏幕没有找到 新粉丝 未读徽标数字", udid)
badge_text = None
info_count += 1
try:
# 如果 2 秒内找不到,会抛异常
badge_text = session.xpath(xp_activity_badge).get(timeout=2.0)
val = (badge_text.info.get("value") or
badge_text.info.get("label") or
badge_text.info.get("name"))
LogManager.method_info(f"活动未读数量:{val}", "检测消息", udid)
if badge_text:
badge_text.tap()
event.wait(timeout=1)
ControlUtils.clickBack(session)
event.wait(timeout=1)
except Exception:
LogManager.method_warning("当前屏幕没有找到 活动 未读徽标数字", "检测消息", udid)
print("当前屏幕没有找到 活动 未读徽标数字", udid)
badge_text = None
info_count += 1
try:
# 如果 2 秒内找不到,会抛异常
badge_text = session.xpath(xp_system_badge).get(timeout=2.0)
val = (badge_text.info.get("value") or
badge_text.info.get("label") or
badge_text.info.get("name"))
LogManager.method_info(f"系统通知未读数量:{val}", "检测消息", udid)
if badge_text:
badge_text.tap()
event.wait(timeout=1)
ControlUtils.clickBack(session)
event.wait(timeout=1)
except Exception:
LogManager.method_warning("当前屏幕没有找到 系统通知 未读徽标数字", "检测消息", udid)
print("当前屏幕没有找到 系统通知 未读徽标数字", udid)
badge_text = None
info_count += 1
try:
# 如果 2 秒内找不到,会抛异常
badge_text = session.xpath(xp_request_badge).get(timeout=2.0)
val = (badge_text.info.get("value") or
badge_text.info.get("label") or
badge_text.info.get("name"))
LogManager.method_info(f"消息请求未读数量:{val}", "检测消息", udid)
if badge_text:
badge_text.tap()
event.wait(timeout=1)
ControlUtils.clickBack(session)
event.wait(timeout=1)
except Exception:
LogManager.method_warning("当前屏幕没有找到 消息请求 未读徽标数字", "检测消息", udid)
print("当前屏幕没有找到 消息请求 未读徽标数字", udid)
badge_text = None
info_count += 1
# 双击收件箱 定位到消息的位置
if info_count == 5:
r = el.bounds # 可能是命名属性,也可能是 tuple
cx = int((r.x + r.width / 2) if hasattr(r, "x") else (r[0] + r[2] / 2))
cy = int((r.y + r.height / 2) if hasattr(r, "y") else (r[1] + r[3] / 2))
session.double_tap(cx, cy) # 可能抛异常:方法不存在
LogManager.method_info(f"双击收件箱 定位到信息", "检测消息", udid)
else:
return
else:
LogManager.method_error(f"检测不到收件箱", "检测消息", udid)
raise Exception("当前页面找不到收件箱,重启")
# 放在 ScriptManager 类外面或 utils 里
def interruptible_sleep(self, event: threading.Event, seconds: float, slice_: float = 1.0):
"""把一次长 sleep 拆成 1 秒一片,随时响应 event"""
left = seconds
while left > 0 and not event.is_set():
timeout = min(slice_, left)
event.wait(timeout=timeout)
left -= timeout
return not event.is_set() # 返回 True 表示正常睡完False 被中断
# 切换账号工具方法
def _norm_txt(self, s: str) -> str:
return (s or "").strip()
def _dedup_preserve_order(self, seq):
seen = set()
out = []
for x in seq:
if x not in seen:
out.append(x)
seen.add(x)
return out
# 切换账号
def changeAccount(self, udid):
client = wda.USBClient(udid, ev.wdaFunctionPort)
session = client.session()
width, height, scale = self.get_screen_info(udid)
count = 0
while count <= 5:
try:
# 重启打开
ControlUtils.closeTikTok(session, udid)
time.sleep(1)
ControlUtils.openTikTok(session, udid)
LogManager.method_info("开始进行切换账号", "切换账号", udid)
LogManager.method_info("重启进行切换账号", "切换账号", udid)
#
# 打开个人主页
session.appium_settings({"snapshotMaxDepth": 15})
user_home = session.xpath('//XCUIElementTypeButton[@name="a11y_vo_profile" or @label="主页"]')
LogManager.method_info("检测主页按钮", "切换账号", udid)
if user_home.exists:
user_home.click()
LogManager.method_info("进入主页成功", "切换账号", udid)
else:
LogManager.method_info("未检测到主页按钮,后续流程可能失败", "切换账号", udid)
count += 1
continue
# 点击“切换账号”按钮(能点就点;点不到再走 OCR
session.appium_settings({"snapshotMaxDepth": 25})
switch_btn = session.xpath(
'//XCUIElementTypeButton['
'@name="Switch accounts" or @label="Switch accounts" or '
'@name="切换账号" or @label="切换账号" or '
'@name="切換帳號" or @label="切換帳號"'
']'
)
if switch_btn.exists:
switch_btn.click()
LogManager.method_info("已点击“切换账号”", "切换账号", udid)
else:
LogManager.method_info("未检测到“切换账号”按钮,转入 OCR 兜底", "切换账号", udid)
# 截图 → OCR
try:
img = client.screenshot()
base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
res_dir = os.path.join(base_dir, "resources", udid)
os.makedirs(res_dir, exist_ok=True)
file_path = os.path.join(res_dir, "account_list.png")
img.save(file_path)
time.sleep(1)
LogManager.method_info(f"保存屏幕图像成功 -> {file_path}", "切换账号", udid)
except Exception as e:
LogManager.method_info(f"截图或保存失败, 原因:{e}", "切换账号", udid)
count += 1
continue
image_path = AiUtils.imagePathWithName(udid, "account_list")
ocr_json = TencentOCR.recognize(image_path=image_path, action="GeneralBasicOCR")
LogManager.method_info(f"OCR 结果: {ocr_json}", "切换账号", udid)
# 提取“切换账号”与“添加账号”之间的用户名候选(只看像用户名的)
items = TencentOCR.slice_texts_between(ocr_json, "切换账号", "添加账号", username_like=True)
# 稳定排序Y 再 X
items_sorted = sorted(
items,
key=lambda d: (d.get("ItemPolygon", {}).get("Y", 0),
d.get("ItemPolygon", {}).get("X", 0))
)
# 规范化 & 去重保序
def _norm_txt(s: str) -> str:
return (s or "").strip().lstrip("@")
def _dedup(seq):
seen, out = set(), []
for x in seq:
if x and x not in seen:
seen.add(x)
out.append(x)
return out
usernames_from_ocr = _dedup([_norm_txt(d.get("DetectedText", "")) for d in items_sorted])
LogManager.method_info(f"OCR 提取候选账号(排序后): {usernames_from_ocr}", "切换账号", udid)
if not usernames_from_ocr:
LogManager.method_info("OCR 未发现任何账号,无法切换", "切换账号", udid)
count += 1
continue
# —— 读取与更新轮询状态 —— #
state_path = f"{udid}/accountState.json"
state = IOSAIStorage.load(state_path) or {"accounts": [], "idx": 0}
old_accounts = state.get("accounts") or []
# 只有“集合真的变化”才更新 accounts否则保持旧顺序不动保证轮询不抖动
if set(old_accounts) != set(usernames_from_ocr):
merged = [a for a in old_accounts if a in usernames_from_ocr] + \
[u for u in usernames_from_ocr if u not in old_accounts]
state["accounts"] = merged
state["idx"] = 0 # 集合变化才重置
LogManager.method_info(f"账号集合变化,合并后顺序: {merged},重置 idx=0", "切换账号", udid)
else:
if not old_accounts:
state["accounts"] = usernames_from_ocr
state["idx"] = 0
accounts = state["accounts"]
if not accounts:
LogManager.method_info("账号列表为空", "切换账号", udid)
count += 1
continue
# —— 核心轮询1→2→0→1→2→… —— #
n = len(accounts)
try:
idx = int(state.get("idx", 0)) % n
except Exception:
idx = 0
next_idx = (idx + 1) % n # 本次选择“下一位”
target_account = accounts[next_idx]
# 立刻推进并落盘:保存为 next_idx下一次会从它的下一位继续
state["idx"] = next_idx
IOSAIStorage.save(state, state_path)
LogManager.method_info(
f"本次切换到账号: {target_account} (use={next_idx}, next={(next_idx + 1) % n})",
"切换账号",
udid
)
print(f"本次切换到账号: {target_account} (use={next_idx}, next={(next_idx + 1) % n})", "切换账号", udid)
# 在同一份 OCR 结果里定位该用户名的坐标并点击
result = TencentOCR.find_last_name_bbox(ocr_json, target_account)
if not result:
LogManager.method_info(f"OCR 未找到目标账号文本: {target_account}", "切换账号", udid)
count += 1
continue
center_x = result["center"]["x"]
center_y = result["center"]["y"]
# 随机偏移(增强拟人)
num = random.randint(-10, 10)
# 分辨率/坐标映射(按你设备比例;你原来是 /3
if scale == 3.0:
tap_x = int((center_x + num) / 3)
tap_y = int((center_y + num) / 3)
else:
tap_x = int((center_x + num) / 2)
tap_y = int((center_y + num) / 2)
LogManager.method_info(f"点击坐标: ({tap_x}, {tap_y}),账号: {target_account}", "切换账号", udid)
session.tap(tap_x, tap_y)
time.sleep(5)
return 200, "成功"
except Exception as e:
LogManager.method_error(f"切换账号失败, 错误: {e}", "切换账号", udid)
count += 1
return 500, "失败"
def test(self):
# 找到输入框
anchor_name = "sss"
udid = "sss"
last_data = [{
"sender": anchor_name,
"device": udid,
"time": datetime.now().strftime("%Y-%m-%d %H:%M"),
"text": "哈哈哈",
"status": 0
}]
print(last_data)
JsonUtils.append_json_items(last_data, "log/last_message.json")