11号晚临时提交

This commit is contained in:
zw
2025-08-11 22:06:48 +08:00
parent e009577cc9
commit 747126f1f8
8 changed files with 283 additions and 71 deletions

View File

@@ -10,6 +10,8 @@ WdaAppBundleId = "com.vv.wda.xctrunner"
anchorList: list[AnchorModel] = [] anchorList: list[AnchorModel] = []
# 线程锁 # 线程锁
anchorListLock = threading.Lock() anchorListLock = threading.Lock()
# 账号token
accountToken = None
# 安全删除数据 # 安全删除数据
def removeModelFromAnchorList(model: AnchorModel): def removeModelFromAnchorList(model: AnchorModel):

View File

@@ -14,7 +14,7 @@ from Entity.ResultData import ResultData
from Utils.ControlUtils import ControlUtils from Utils.ControlUtils import ControlUtils
from Utils.ThreadManager import ThreadManager from Utils.ThreadManager import ThreadManager
from script.ScriptManager import ScriptManager from script.ScriptManager import ScriptManager
from Entity.AnchorModel import AnchorModel from Entity.Variables import accountToken
from Entity.Variables import anchorList, addModelToAnchorList from Entity.Variables import anchorList, addModelToAnchorList
app = Flask(__name__) app = Flask(__name__)
@@ -68,6 +68,12 @@ def start_socket_listener():
listener_thread = threading.Thread(target=start_socket_listener, daemon=True) listener_thread = threading.Thread(target=start_socket_listener, daemon=True)
listener_thread.start() listener_thread.start()
@app.route('/passToken', methods=['POST'])
def passToken():
data = request.get_json()
accountToken = data['token']
return ResultData(data="").toJson()
# 获取设备列表 # 获取设备列表
@app.route('/deviceList', methods=['GET']) @app.route('/deviceList', methods=['GET'])
def deviceList(): def deviceList():
@@ -173,6 +179,19 @@ def growAccount():
ThreadManager.add(udid, thread, event) ThreadManager.add(udid, thread, event)
return ResultData(data="").toJson() return ResultData(data="").toJson()
# 观看直播
@app.route("/watchLiveForGrowth", methods=['POST'])
def watchLiveForGrowth():
body = request.get_json()
udid = body.get("udid")
manager = ScriptManager()
event = threading.Event()
thread = threading.Thread(target=manager.watchLiveForGrowth, args=(udid, event))
thread.start()
# 添加到线程管理
ThreadManager.add(udid, thread, event)
return ResultData(data="").toJson()
# 停止脚本 # 停止脚本
@app.route("/stopScript", methods=['POST']) @app.route("/stopScript", methods=['POST'])
def stopScript(): def stopScript():

View File

@@ -4,15 +4,14 @@ from Module.DeviceInfo import Deviceinfo
from Module.FlaskSubprocessManager import FlaskSubprocessManager from Module.FlaskSubprocessManager import FlaskSubprocessManager
from Utils.LogManager import LogManager from Utils.LogManager import LogManager
# 项目入口
if __name__ == "__main__": if __name__ == "__main__":
# 清空日志
LogManager.clearLogs() LogManager.clearLogs()
time.sleep(1) time.sleep(1)
print("启动flask")
manager = FlaskSubprocessManager.get_instance() manager = FlaskSubprocessManager.get_instance()
manager.start() manager.start()
print("启动主线程")
info = Deviceinfo() info = Deviceinfo()
info.startDeviceListener() info.startDeviceListener()

View File

@@ -5,6 +5,7 @@ import cv2
import numpy as np import numpy as np
import wda import wda
from Utils.LogManager import LogManager from Utils.LogManager import LogManager
import xml.etree.ElementTree as ET
# 工具类 # 工具类
class AiUtils(object): class AiUtils(object):
@@ -209,10 +210,10 @@ class AiUtils(object):
# 查找关闭按钮 # 查找关闭按钮
@classmethod @classmethod
def findCloseButton(cls,udid="eca000fcb6f55d7ed9b4c524055214c26a7de7aa"): def findLiveCloseButton(cls,udid="eca000fcb6f55d7ed9b4c524055214c26a7de7aa"):
client = wda.USBClient(udid) client = wda.USBClient(udid)
session = client.session() session = client.session()
session.appium_settings({"snapshotMaxDepth": 0}) session.appium_settings({"snapshotMaxDepth": 10})
r = session.xpath("//XCUIElementTypeButton[@name='关闭屏幕']") r = session.xpath("//XCUIElementTypeButton[@name='关闭屏幕']")
try: try:
if r.label == "关闭屏幕": if r.label == "关闭屏幕":
@@ -223,4 +224,22 @@ class AiUtils(object):
print(e) print(e)
return None return None
# AiUtils.screenshot() # 获取直播间窗口数量
@classmethod
def count_add_by_xml(cls, session):
xml = session.source()
root = ET.fromstring(xml)
return sum(
1 for e in root.iter()
if e.get('type') in ('XCUIElementTypeButton', 'XCUIElementTypeImage')
and (e.get('name') == '添加' or e.get('label') == '添加')
and (e.get('visible') in (None, 'true'))
)
# 获取当前屏幕上的节点
@classmethod
def getCurrentScreenSource(cls):
client = wda.USBClient("eca000fcb6f55d7ed9b4c524055214c26a7de7aa")
print(client.source())
# AiUtils.getCurrentScreenSource()

View File

@@ -52,6 +52,7 @@ class ControlUtils(object):
# 返回 # 返回
@classmethod @classmethod
def clickBack(cls, session: Client): def clickBack(cls, session: Client):
try:
x, y = AiUtils.findImageInScreen("back") x, y = AiUtils.findImageInScreen("back")
if x > 0: if x > 0:
r = session.swipe_right() r = session.swipe_right()
@@ -62,24 +63,23 @@ class ControlUtils(object):
else: else:
print("本页面无法返回") print("本页面无法返回")
return False return False
except Exception as e:
print(e)
return False
# 点赞 # 点赞
@classmethod @classmethod
def clickLike(cls, session: Client, udid): def clickLike(cls, session: Client, udid):
scale = session.scale scale = session.scale
x, y = AiUtils.findImageInScreen(udid,"add") x, y = AiUtils.findImageInScreen("add",udid)
print(x, y) print(x, y)
if x > -1: if x > -1:
print("找到目标") print("点赞")
r = session.click(x // scale, y // scale + 50) session.click(x // scale, y // scale + 50)
if r.status == 0:
return True return True
else:
return False
else: else:
print("没有找到目标") print("没有找到目标")
return False return False
# 点击评论 # 点击评论
# @classmethod # @classmethod
# def clickComment(cls, session: Client): # def clickComment(cls, session: Client):
@@ -103,11 +103,46 @@ class ControlUtils(object):
def clickSearch(cls, session: Client): def clickSearch(cls, session: Client):
obj = session.xpath("//*[@name='搜索']") obj = session.xpath("//*[@name='搜索']")
try: try:
if obj.label == "搜索": if obj.exists:
return obj obj.click()
else: return True
return None
except Exception as e: except Exception as e:
print(e) print(e)
return None return False
# 获取主播详情页的第一个视频
@classmethod
def clickFirstVideoFromDetailPage(cls, session: Client):
videoCell = session.xpath('//Window/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[2]/Other[1]/ScrollView[1]/Other[1]/CollectionView[1]/Cell[2]')
if videoCell.exists:
videoCell.click()
# 点击视频
return True
else:
return False
# 获取关注按钮
@classmethod
def clickFollowButton(cls, session: Client):
followButton = session.xpath('//Window[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[2]/Other[2]/Other[1]/Other[1]/Other[3]/Other[1]/Other[1]/Button[1]')
if followButton.exists:
print("找到了")
followButton.click()
return True
else:
print("没找到")
return False
# 查找发消息按钮
@classmethod
def clickSendMesageButton(cls, session: Client):
msgButton = session.xpath("//Window[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[2]/Other[2]/Other[1]/Other[1]/Other[3]/Other[1]/Other[1]")
if msgButton.exists:
print("找到了")
print(msgButton.name)
msgButton.click()
return True
else:
print("没找到")
return False

7
Utils/Requester.py Normal file
View File

@@ -0,0 +1,7 @@
from Entity.Variables import accountToken
class Requester():
@classmethod
def printToken(cls):
print(accountToken)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.8 MiB

After

Width:  |  Height:  |  Size: 1.6 MiB

View File

@@ -60,6 +60,7 @@ class ScriptManager():
try: try:
# 判断视频类型 # 判断视频类型
addX, addY = AiUtils.findImageInScreen("add", udid) addX, addY = AiUtils.findImageInScreen("add", udid)
# 多次获取结果是否一致,如果有不一致的结果就切换视频
isSame = False isSame = False
for i in range(2): for i in range(2):
tx, ty = AiUtils.findImageInScreen("add", udid) tx, ty = AiUtils.findImageInScreen("add", udid)
@@ -102,10 +103,82 @@ class ScriptManager():
# 观看直播 # 观看直播
def viewLive(self): def watchLiveForGrowth(self, udid, event):
pass client = wda.USBClient(udid)
session = client.session()
# 关注打招呼 session.appium_settings({"snapshotMaxDepth": 15})
# 先关闭Tik Tok
ControlUtils.closeTikTok(session, udid)
time.sleep(1)
# 重新打开Tik Tok
ControlUtils.openTikTok(session, udid)
time.sleep(3)
# 进入直播
live_button = session(xpath='//XCUIElementTypeButton[@name="直播"]')
if live_button.exists:
live_button.click()
time.sleep(20)
size = session.window_size()
width, height = size.width, size.height
print(f"屏幕的宽高是:{width},{height}")
# 可选:重新拉起 session规避偶发 Stale 会话
session = client.session()
# session.appium_settings({"snapshotMaxDepth": 25})
while not event.is_set():
try:
time.sleep(3)
# 如果处于 PK分数条直接划走
if session(xpath='//XCUIElementTypeOther[@name="kGBLInteractionViewMatchScoreBar"]').exists:
print("✅ 当前是 PK跳过")
session.swipe_up()
continue
# 数直播显示区域窗口(主画面 + 连麦小窗)
count = AiUtils.count_add_by_xml(session)
print(f"检测到直播显示区域窗口数:{count}")
if count > 1:
print("❌ 多窗口(有人连麦/分屏),划走")
session.swipe_up()
continue
else:
print("✅ 单窗口(只有一个主播),(目前是20%概率)开始点赞")
# 点赞仍保留中途转PK的保护
if random.random() >= 0.89:
print("开始点赞")
for _ in range(random.randint(10, 30)):
if session(xpath='//XCUIElementTypeOther[@name="kGBLInteractionViewMatchScoreBar"]').exists:
print("❗ 中途开始 PK停止点赞并跳过")
session.swipe_up()
break
x = width // 3 + random.randint(-10, 10)
y = height // 3 + random.randint(10, 20)
print("双击坐标:", x, y)
session.double_tap(x, y)
print("--------------------------------------------")
time.sleep(random.randint(100, 300))
session.swipe_up()
except Exception as e:
print("循环异常,重试:", repr(e))
# 轻量恢复:重新获取 session避免因为快照或元素句柄失效卡死
try:
session = client.session()
except Exception:
time.sleep(2)
session = client.session()
# 关注打招呼以及回复主播消息
def greetNewFollowers(self, udid, needReply, event): def greetNewFollowers(self, udid, needReply, event):
client = wda.USBClient(udid) client = wda.USBClient(udid)
session = client.session() session = client.session()
@@ -120,65 +193,123 @@ class ScriptManager():
time.sleep(3) time.sleep(3)
# 点击搜索按钮 # 点击搜索按钮
searchButton = ControlUtils.clickSearch(session) ControlUtils.clickSearch(session)
if searchButton is not None:
searchButton.click()
else:
print("没找到搜索按钮")
return
# 搜索框 # 搜索框
input = session.xpath('//XCUIElementTypeSearchField') input = session.xpath('//XCUIElementTypeSearchField')
# 如果找到了输入框,就点击并且输入内容
input.click()
# 稍作停顿
time.sleep(1)
# 返回上一步
def goBack():
ControlUtils.clickBack(session)
# 搜索框
input = session.xpath('//XCUIElementTypeSearchField')
# 如果找到了输入框,就点击并且输入内容
input.click()
# 稍作停顿
time.sleep(1)
# 删除数据
removeModelFromAnchorList(anchor)
# 循环条件。1、 循环关闭 2、 数据处理完毕
while not event.is_set() or len(anchorList) > 0:
# 获取一个主播 # 获取一个主播
anchor = anchorList[0] anchor = anchorList[0]
aid = anchor.anchorId aid = anchor.anchorId
# 如果找到了输入框,就点击并且输入内容 try:
if input.exists: input.clear_text()
input.click() except Exception as e:
time.sleep(1) print(e)
time.sleep(2)
# 输入主播id
input.set_text(aid + "\n") input.set_text(aid + "\n")
# 切换UI查找深度 # 切换UI查找深度
session.appium_settings({"snapshotMaxDepth": 25}) session.appium_settings({"snapshotMaxDepth": 25})
# 点击进入主播首页 # 定位 "关注" 按钮 通过关注按钮的位置点击主播首页
session.xpath( follow_button = session.xpath(
'(//XCUIElementTypeCollectionView[@name="TTKSearchCollectionComponent"]' '//XCUIElementTypeCell[@index="1"]//XCUIElementTypeButton[@index="1"]').get()
'//XCUIElementTypeCell' if follow_button:
'//XCUIElementTypeButton[following-sibling::XCUIElementTypeButton[@name="关注" or @label="关注"]])[1]' print("找到关注按钮!")
).get(timeout=5).tap() x = follow_button.bounds.x - 200
y = follow_button.bounds.y
client.click(x, y)
else:
print("未找到关注按钮")
time.sleep(3) time.sleep(3)
# 向上划一点 # 找到并点击第一个视频
r = client.swipe_up() cellClickResult = ControlUtils.clickFirstVideoFromDetailPage(session)
print(r)
# 分析页面节点
# print(session.source())
# 观看主播视频 # 观看主播视频
def viewAnchorVideo(): def viewAnchorVideo():
pass print("开始查看视频")
count = 5
viewAnchorVideo() while count > 1:
print("条件满足,继续查看")
img = client.screenshot()
time.sleep(1)
filePath = f"resources/{udid}/bgv.png"
img.save(filePath)
LogManager.info("保存屏幕图像成功", udid)
print("保存了背景图")
time.sleep(1)
addX, addY = AiUtils.findImageInScreen("add", udid)
if addX != -1:
r = ControlUtils.clickLike(session, udid)
# 如果点了赞,总数量-1 否则划下一个视频,无法点赞的原因很多。指不定是什么原因。所以直接下一个视频比较好
if r:
count -= 1
else: else:
pass client.swipe_up()
# 功能待完善 else:
client.swipe_up()
continue
# 假装看几秒视频
time.sleep(10)
client.swipe_up()
if count == 1:
break
# 点击第一个视频后的逻辑
if cellClickResult:
print("点击了视频")
session.appium_settings({"snapshotMaxDepth": 5})
print("重新设置了匹配深度")
viewAnchorVideo()
# 视频看完需要点关注。
ControlUtils.clickFollowButton(session)
time.sleep(1)
ControlUtils.clickFollowButton(session)
# 发送一条信息
chatInput = session.xpath("//*[className='XCUIElementTypeTextView']")
if chatInput.exists:
print("找到了")
else:
print("没找到")
# 清除数据
removeModelFromAnchorList(anchor)
# 流程结束
client.swipe_left()
time.sleep(1)
else:
print("获取该主播第一个视频失败")
goBack()
# while not event.is_set() and len(anchorList) > 0:
# anchor = anchorList[0]
# aid = anchor.anchorId
#
# ControlUtils.clickSearch(session, udid)
#
# # 删除数据
# removeModelFromAnchorList(anchor)
# print(len(anchorList))
# manager = ScriptManager() # manager = ScriptManager()
# manager.growAccount("eca000fcb6f55d7ed9b4c524055214c26a7de7aa") # manager.growAccount("eca000fcb6f55d7ed9b4c524055214c26a7de7aa")