Files
iOSAI/script/ScriptManager.py
2025-08-14 13:49:28 +08:00

542 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import random
import re
import threading
import time
from enum import Enum
import wda
import os
from Utils.AiUtils import AiUtils
from Utils.ControlUtils import ControlUtils
from Utils.LogManager import LogManager
from Entity.Variables import anchorList, removeModelFromAnchorList, prologueList, anchorWithSession
from Utils.Requester import Requester
# 脚本管理类
class ScriptManager():
# 单利对象
_instance = None # 类变量,用于存储单例实例
def __new__(cls):
# 如果实例不存在,则创建一个新实例
if cls._instance is None:
cls._instance = super(ScriptManager, cls).__new__(cls)
# 返回已存在的实例
return cls._instance
def __init__(self):
super().__init__()
self.initialized = True # 标记已初始化
# 养号
def growAccount(self, udid, event):
client = wda.USBClient(udid)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 0})
# 先关闭Tik Tok
ControlUtils.closeTikTok(session, udid)
time.sleep(1)
# 重新打开Tik Tok
ControlUtils.openTikTok(session, udid)
time.sleep(3)
# 创建udid名称的目录
AiUtils.makeUdidDir(udid)
# 假设此时有个开关
while not event.is_set():
try:
img = client.screenshot()
filePath = f"resources/{udid}/bgv.png"
img.save(filePath)
LogManager.info("保存屏幕图像成功", udid)
print("保存了背景图")
time.sleep(1)
except Exception as e:
LogManager.error(e, udid)
print(e)
try:
# 判断视频类型
addX, addY = AiUtils.findImageInScreen("add", udid)
# 多次获取结果是否一致,如果有不一致的结果就切换视频
isSame = False
for i in range(2):
tx, ty = AiUtils.findImageInScreen("add", udid)
if addX == tx and addY == ty:
isSame = True
time.sleep(1)
else:
isSame = False
break
# 如果找到普通视频
if addX > 0 and isSame:
needLike = random.randint(0, 10)
# 查找首页按钮
homeButton = AiUtils.findHomeButton(udid)
if homeButton:
print("有首页按钮,查看视频")
videoTime = random.randint(5, 15)
time.sleep(videoTime)
# 百分之三的概率点赞
if needLike < 3:
print("点赞")
ControlUtils.clickLike(session, udid)
print("继续观看视频")
videoTime = random.randint(10, 30)
time.sleep(videoTime)
print("准备划到下一个视频")
client.swipe_up()
else:
print("找不到首页按钮。出错了")
else:
nextTime = random.randint(1, 5)
time.sleep(nextTime)
client.swipe_up()
except Exception as e:
print(f"发生异常:{e}")
client.swipe_up()
# 观看直播
def watchLiveForGrowth(self, udid, event):
client = wda.USBClient(udid)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 15})
# 先关闭Tik Tok
ControlUtils.closeTikTok(session, udid)
time.sleep(1)
# 重新打开Tik Tok
ControlUtils.openTikTok(session, udid)
time.sleep(3)
# 进入直播
live_button = session(xpath='//XCUIElementTypeButton[@name="直播"]')
if live_button.exists:
live_button.click()
time.sleep(20)
size = session.window_size()
width, height = size.width, size.height
print(f"屏幕的宽高是:{width},{height}")
# 可选:重新拉起 session规避偶发 Stale 会话
session = client.session()
# session.appium_settings({"snapshotMaxDepth": 25})
while not event.is_set():
try:
time.sleep(3)
# 如果处于 PK分数条直接划走
if session(xpath='//XCUIElementTypeOther[@name="kGBLInteractionViewMatchScoreBar"]').exists:
print("✅ 当前是 PK跳过")
session.swipe_up()
continue
# 数直播显示区域窗口(主画面 + 连麦小窗)
count = AiUtils.count_add_by_xml(session)
print(f"检测到直播显示区域窗口数:{count}")
if count > 1:
print("❌ 多窗口(有人连麦/分屏),划走")
session.swipe_up()
continue
else:
print("✅ 单窗口(只有一个主播),(目前是20%概率)开始点赞")
# 点赞仍保留中途转PK的保护
if random.random() >= 0.89:
print("开始点赞")
for _ in range(random.randint(10, 30)):
if session(xpath='//XCUIElementTypeOther[@name="kGBLInteractionViewMatchScoreBar"]').exists:
print("❗ 中途开始 PK停止点赞并跳过")
session.swipe_up()
break
x = width // 3 + random.randint(-10, 10)
y = height // 3 + random.randint(10, 20)
print("双击坐标:", x, y)
session.double_tap(x, y)
print("--------------------------------------------")
time.sleep(random.randint(100, 300))
session.swipe_up()
except Exception as e:
print("循环异常,重试:", repr(e))
# 轻量恢复:重新获取 session避免因为快照或元素句柄失效卡死
try:
session = client.session()
except Exception:
time.sleep(2)
session = client.session()
# 关注打招呼以及回复主播消息
def greetNewFollowers(self, udid, needReply, event):
client = wda.USBClient(udid)
session = client.session()
# 先关闭Tik Tok
ControlUtils.closeTikTok(session, udid)
time.sleep(1)
# 重新打开Tik Tok
ControlUtils.openTikTok(session, udid)
time.sleep(3)
# 设置查找深度
session.appium_settings({"snapshotMaxDepth": 15})
# 点击搜索按钮
ControlUtils.clickSearch(session)
# 返回上一步
def goBack(count):
for i in range(count):
ControlUtils.clickBack(session)
time.sleep(2)
# 循环条件。1、 循环关闭 2、 数据处理完毕
while not event.is_set() or len(anchorList) > 0:
# 查找输入框
input = session.xpath('//XCUIElementTypeSearchField')
# 如果找到了输入框,就点击并且输入内容
input.click()
# 稍作停顿
time.sleep(1)
# 获取一个主播
anchor = anchorList[0]
aid = anchor.anchorId
anchorCountry = anchor.country
input.clear_text()
time.sleep(2)
# 输入主播id
input.set_text(aid + "\n")
# 切换UI查找深度
session.appium_settings({"snapshotMaxDepth": 25})
# 定位 "关注" 按钮 通过关注按钮的位置点击主播首页
follow_button = session.xpath("//XCUIElementTypeButton[@traits='Button' and @index='1']")
time.sleep(2)
if follow_button.exists:
print(follow_button.bounds)
# session.appium_settings({"snapshotMaxDepth": 10})
print("找到关注按钮!")
x = follow_button.bounds.x - 100
y = follow_button.bounds.y
print(x, y)
client.click(x, y)
print("进入主播首页啦")
else:
goBack(1)
removeModelFromAnchorList(anchor)
print("未找到关注按钮")
continue
time.sleep(3)
session.appium_settings({"snapshotMaxDepth": 25})
time.sleep(2)
# 找到并点击第一个视频
cellClickResult = ControlUtils.clickFirstVideoFromDetailPage(session)
time.sleep(2)
# 观看主播视频
def viewAnchorVideo():
print("开始查看视频,并且重新调整查询深度")
session.appium_settings({"snapshotMaxDepth": 5})
count = 3
while count != 0:
time.sleep(5)
img = client.screenshot()
time.sleep(1)
filePath = f"resources/{udid}/bgv.png"
img.save(filePath)
LogManager.info("保存屏幕图像成功", udid)
time.sleep(2)
# 查找add图标
r = ControlUtils.clickLike(session, udid)
# 点赞成功。
if r == True:
count -= 1
# 假装看几秒视频
time.sleep(5)
if count != 0:
client.swipe_up()
# 右滑返回
client.swipe_right()
# 如果打开视频失败。说明该主播没有视频
if cellClickResult == True:
# 观看主播视频
LogManager.info("去查看主播视频",udid)
viewAnchorVideo()
time.sleep(3)
LogManager.info("视频看完了,重置试图查询深度", udid)
session.appium_settings({"snapshotMaxDepth": 25})
# 点击关注按钮
followButton = AiUtils.getFollowButton(session)
if followButton is not None:
LogManager.info("找到关注按钮了", udid)
followButton.click()
else:
LogManager.info("没找到关注按钮", udid)
removeModelFromAnchorList(anchor)
goBack(3)
continue
time.sleep(2)
msgButton = AiUtils.getSendMesageButton(session)
time.sleep(2)
if msgButton is not None:
print("找到发消息按钮了")
# 进入聊天页面
msgButton.click()
else:
print("没有识别出发消息按钮")
removeModelFromAnchorList(anchor)
goBack(3)
continue
time.sleep(3)
# 查找聊天界面中的输入框节点
chatInput = session.xpath("//TextView")
if chatInput.exists:
print("找到输入框了, 准备发送一条打招呼消息")
# 准备打招呼的文案
# text = random.choice(prologueList)
text = "你好"
# 翻译成主播国家的语言
msg = Requester.translation(text, "法国")
# 准备发送一条信息
chatInput.click()
time.sleep(2)
# 发送消息
chatInput.set_text(msg + "\n")
time.sleep(1)
else:
print("无法发送信息")
# 接着下一个主播
removeModelFromAnchorList(anchor)
goBack(4)
else:
print(f"{anchor.anchorId}:该主播没有视频")
# 删除当前数据
removeModelFromAnchorList(anchor)
goBack(3)
continue
# 设置查找深度
session.appium_settings({"snapshotMaxDepth": 15})
time.sleep(2)
if needReply:
print("如果需要回复主播消息。走此逻辑")
if AiUtils.getUnReadMsgCount(session) > 0:
# 执行回复消息逻辑
self.monitorMessages(session, udid)
homeButton = AiUtils.findHomeButton(udid)
if homeButton.exists:
homeButton.click()
else:
ControlUtils.closeTikTok(session, udid)
time.sleep(2)
ControlUtils.openTikTok(session, udid)
time.sleep(3)
# 执行完成之后。继续点击搜索
session.appium_settings({"snapshotMaxDepth": 15})
# 点击搜索按钮
ControlUtils.clickSearch(session)
else:
session.appium_settings({"snapshotMaxDepth": 15})
# 点击搜索按钮
ControlUtils.clickSearch(session)
# 检查未读消息并回复
def monitorMessages(self, session, udid):
ControlUtils.closeTikTok(session, udid)
time.sleep(2)
ControlUtils.closeTikTok(session, udid)
time.sleep(3)
session.appium_settings({"snapshotMaxDepth": 7})
el = session(xpath='//XCUIElementTypeButton[@name="a11y_vo_inbox"]')
# 如果收件箱有消息 则进行点击
if el.exists:
m = re.search(r'(\d+)', el.label) # 抓到的第一个数字串
count = int(m.group(1)) if m else 0
if count:
el.click()
time.sleep(3)
session.appium_settings({"snapshotMaxDepth": 22})
while True:
el = session(xpath='//XCUIElementTypeButton[@name="a11y_vo_inbox"]')
m = re.search(r'(\d+)', el.label) # 抓到的第一个数字串
count = int(m.group(1)) if m else 0
print("count", count)
if not count:
break
# 新粉丝
xp_new_fan_badge = (
"//XCUIElementTypeCell[.//XCUIElementTypeLink[@name='新粉丝']]"
"//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
)
# 活动
xp_activity_badge = (
"//XCUIElementTypeCell[.//XCUIElementTypeLink[@name='活动']]"
"//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
)
# 系统通知
xp_system_badge = (
"//XCUIElementTypeCell[.//XCUIElementTypeLink[@name='系统通知']]"
"//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
)
# 用户消息
xp_badge_numeric = (
'//XCUIElementTypeOther[@name="AWEIMChatListCellUnreadCountViewComponent"]'
'//XCUIElementTypeStaticText[@value and translate(@value,"0123456789","")=""]'
)
try:
# 如果 2 秒内找不到,会抛异常
badge_text = session.xpath(xp_new_fan_badge).get(timeout=2.0)
val = (badge_text.info.get("value") or
badge_text.info.get("label") or
badge_text.info.get("name"))
print("新粉丝未读数量:", val)
if badge_text:
badge_text.tap()
time.sleep(1)
ControlUtils.clickBack(session)
time.sleep(1)
except Exception:
print("当前屏幕没有找到 新粉丝 未读徽标数字")
badge_text = None
try:
# 如果 2 秒内找不到,会抛异常
badge_text = session.xpath(xp_activity_badge).get(timeout=2.0)
val = (badge_text.info.get("value") or
badge_text.info.get("label") or
badge_text.info.get("name"))
print("活动未读数量:", val)
if badge_text:
badge_text.tap()
time.sleep(1)
ControlUtils.clickBack(session)
time.sleep(1)
except Exception:
print("当前屏幕没有找到 活动 未读徽标数字")
badge_text = None
try:
# 如果 2 秒内找不到,会抛异常
badge_text = session.xpath(xp_system_badge).get(timeout=2.0)
val = (badge_text.info.get("value") or
badge_text.info.get("label") or
badge_text.info.get("name"))
print("系统通知未读数量:", val)
if badge_text:
badge_text.tap()
time.sleep(1)
ControlUtils.clickBack(session)
time.sleep(1)
except Exception:
print("当前屏幕没有找到 系统通知 未读徽标数字")
badge_text = None
try:
# 如果 2 秒内找不到,会抛异常
badge_text = session.xpath(xp_badge_numeric).get(timeout=2.0)
val = (badge_text.info.get("value") or
badge_text.info.get("label") or
badge_text.info.get("name"))
print("用户未读数量:", val)
if badge_text:
badge_text.tap()
time.sleep(3)
xml = session.source()
msgs = AiUtils.extract_messages_from_xml(xml)
# 检测出对方发的最后一条信息
last_msg_text = next(item['text'] for item in reversed(msgs) if item['type'] == 'msg')
# 向ai发送信息
# 获取主播的名称
anchor_name = AiUtils.get_navbar_anchor_name(session)
# 找到输入框
sel = session.xpath(
"//XCUIElementTypeTextView[@name='消息...' or @label='消息...' or @value='消息...']")
if anchor_name not in anchorWithSession:
# 如果是第一次发消息(没有sessionId的情况)
response = Requester.chatToAi({"msg": last_msg_text})
aiResult = response['result']
sessionId = response['session_id']
anchorWithSession[anchor_name] = sessionId
# 找到输入框输入ai返回出来的消息
if sel.exists:
sel.click() # 聚焦
time.sleep(1)
sel.clear_text()
sel.set_text(aiResult + "\n")
else:
# 如果不是第一次发消息证明存储的有sessionId
sessionId = anchorWithSession[anchor_name]
response = Requester.chatToAi({"msg": last_msg_text, "sid": sessionId})
aiResult = response['result']
if sel.exists:
sel.click() # 聚焦
time.sleep(1)
sel.clear_text()
sel.set_text(aiResult + "\n")
time.sleep(1)
# 返回
ControlUtils.clickBack(session)
except Exception:
print("当前屏幕没有找到 用户 未读徽标数字")
badge_text = None
def test(self, udid):
client = wda.USBClient(udid)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 10})
print(client.source())
# manager = ScriptManager()
# manager.test("eca000fcb6f55d7ed9b4c524055214c26a7de7aa")
# manager.growAccount("eca000fcb6f55d7ed9b4c524055214c26a7de7aa")