临时提交

This commit is contained in:
zw
2025-08-08 22:08:10 +08:00
parent d7c98d1fc8
commit e009577cc9
9 changed files with 214 additions and 29 deletions

16
Entity/AnchorModel.py Normal file
View File

@@ -0,0 +1,16 @@
# 主播模型
class AnchorModel:
def __init__(self, anchorId= "", country= ""):
# 主播ID
self.anchorId = anchorId
# 主播国家
self.country = country
# 字典转模型
@classmethod
def dictToModel(cls, d):
model = AnchorModel()
model.anchorId = d.get('anchorId', "")
model.country = d.get('country', "")
return model

View File

@@ -1,4 +1,24 @@
# Tik Tok app bundle id import threading
from typing import Dict, Any
from Entity.AnchorModel import AnchorModel
tikTokApp = "com.zhiliaoapp.musically" tikTokApp = "com.zhiliaoapp.musically"
# wda apple bundle id # wda apple bundle id
WdaAppBundleId = "com.vv.wda.xctrunner" WdaAppBundleId = "com.vv.wda.xctrunner"
# 全局主播列表
anchorList: list[AnchorModel] = []
# 线程锁
anchorListLock = threading.Lock()
# 安全删除数据
def removeModelFromAnchorList(model: AnchorModel):
with anchorListLock:
anchorList.remove(model)
# 添加数据
def addModelToAnchorList(models: list[Dict[str, Any]]):
with anchorListLock:
for dic in models:
obj = AnchorModel.dictToModel(dic)
anchorList.append(obj)

View File

@@ -2,14 +2,20 @@ import json
import os import os
import socket import socket
import threading import threading
import warnings
from queue import Queue from queue import Queue
from typing import Any, Dict
import tidevice import tidevice
import wda import wda
from flask import Flask, request from flask import Flask, request
from flask_cors import CORS from flask_cors import CORS
from Entity.ResultData import ResultData from Entity.ResultData import ResultData
from Utils.ControlUtils import ControlUtils
from Utils.ThreadManager import ThreadManager from Utils.ThreadManager import ThreadManager
from script.ScriptManager import ScriptManager from script.ScriptManager import ScriptManager
from Entity.AnchorModel import AnchorModel
from Entity.Variables import anchorList, addModelToAnchorList
app = Flask(__name__) app = Flask(__name__)
CORS(app) CORS(app)
@@ -81,21 +87,8 @@ def deviceList():
def deviceAppList(): def deviceAppList():
param = request.get_json() param = request.get_json()
udid = param["udid"] udid = param["udid"]
t = tidevice.Device(udid) apps = ControlUtils.getDeviceAppList(udid)
return ResultData(data=apps).toJson()
# 获取已安装的应用列表
apps = []
for app in t.installation.iter_installed():
apps.append({
"name": app.get("CFBundleDisplayName", "Unknown"),
"bundleId": app.get("CFBundleIdentifier", "Unknown"),
"version": app.get("CFBundleShortVersionString", "Unknown"),
"path": app.get("Path", "Unknown")
})
# 筛选非系统级应用(过滤掉以 com.apple 开头的系统应用)
non_system_apps = [app for app in apps if not app["bundleId"].startswith("com.apple")]
return ResultData(data=non_system_apps).toJson()
# 打开指定app # 打开指定app
@app.route('/launchApp', methods=['POST']) @app.route('/launchApp', methods=['POST'])
@@ -188,12 +181,36 @@ def stopScript():
ThreadManager.stop(udid) ThreadManager.stop(udid)
return ResultData(data="").toJson() return ResultData(data="").toJson()
# AI聊天
@app.route('/aiChat', methods=['POST'])
def aiChat():
body = request.get_json()
udid = body.get("udid")
# 传递主播数据
@app.route('/passAnchorData', methods=['POST'])
def passAnchorData():
data: Dict[str, Any] = request.get_json()
# 设备列表
idList = data.get("deviceList", [])
# 主播列表
acList = data.get("anchorList", [])
# 是否需要回复
needReply = data.get("needReply", False)
# 添加主播数据
addModelToAnchorList(acList)
# 启动线程,执行脚本
for udid in idList:
manager = ScriptManager()
event = threading.Event()
# 启动脚本
thread = threading.Thread(target=manager.greetNewFollowers, args=(udid, needReply, event))
thread.start()
# 添加到线程管理
ThreadManager.add(udid, thread, event)
return ResultData(data="").toJson()
# 添加临时数据
@app.route("/addTempAnchorData", methods=['POST'])
def addTempAnchorData():
data = request.get_json()
addModelToAnchorList(data)
return ResultData(data="").toJson()
if __name__ == '__main__': if __name__ == '__main__':
app.run("0.0.0.0", port=5000, debug=True, use_reloader=False) app.run("0.0.0.0", port=5000, debug=True, use_reloader=False)

View File

@@ -4,10 +4,8 @@ import re
import cv2 import cv2
import numpy as np import numpy as np
import wda import wda
from Utils.LogManager import LogManager from Utils.LogManager import LogManager
# 工具类 # 工具类
class AiUtils(object): class AiUtils(object):
@@ -225,3 +223,4 @@ class AiUtils(object):
print(e) print(e)
return None return None
# AiUtils.screenshot()

View File

@@ -1,3 +1,4 @@
import tidevice
from wda import Client from wda import Client
from Entity.Variables import tikTokApp from Entity.Variables import tikTokApp
@@ -7,13 +8,47 @@ from Utils.LogManager import LogManager
# 页面控制工具类 # 页面控制工具类
class ControlUtils(object): class ControlUtils(object):
# 获取设备上的app列表
@classmethod
def getDeviceAppList(self, udid):
device = tidevice.Device(udid)
# 获取已安装的应用列表
apps = []
for app in device.installation.iter_installed():
apps.append({
"name": app.get("CFBundleDisplayName", "Unknown"),
"bundleId": app.get("CFBundleIdentifier", "Unknown"),
"version": app.get("CFBundleShortVersionString", "Unknown"),
"path": app.get("Path", "Unknown")
})
# 筛选非系统级应用(过滤掉以 com.apple 开头的系统应用)
noSystemApps = [app for app in apps if not app["bundleId"].startswith("com.apple")]
return noSystemApps
# 打开Tik Tok # 打开Tik Tok
@classmethod @classmethod
def openTikTok(cls, session: Client): def openTikTok(cls, session: Client, udid):
apps = cls.getDeviceAppList(udid)
tk = ""
for app in apps:
if app.get("name", "") == "TikTok":
tk = app.get("bundleId", "")
currentApp = session.app_current() currentApp = session.app_current()
if currentApp != tikTokApp: if currentApp != tk:
session.app_start(tikTokApp) session.app_start(tikTokApp)
# 关闭Tik Tok
@classmethod
def closeTikTok(cls, session: Client, udid):
apps = cls.getDeviceAppList(udid)
tk = ""
for app in apps:
if app.get("name", "") == "TikTok":
tk = app.get("bundleId", "")
session.app_stop(tk)
# 返回 # 返回
@classmethod @classmethod
def clickBack(cls, session: Client): def clickBack(cls, session: Client):
@@ -63,5 +98,16 @@ class ControlUtils(object):
# return False # return False
# 点击搜索
@classmethod
def clickSearch(cls, session: Client):
obj = session.xpath("//*[@name='搜索']")
try:
if obj.label == "搜索":
return obj
else:
return None
except Exception as e:
print(e)
return None
# ControlUtils.backAction("eca000fcb6f55d7ed9b4c524055214c26a7de7aa")

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.0 MiB

After

Width:  |  Height:  |  Size: 1.8 MiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 3.9 KiB

After

Width:  |  Height:  |  Size: 7.3 KiB

BIN
screenshot.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.9 MiB

View File

@@ -1,10 +1,13 @@
import random import random
import threading
import time import time
from enum import Enum
import wda import wda
import os import os
from Utils.AiUtils import AiUtils from Utils.AiUtils import AiUtils
from Utils.ControlUtils import ControlUtils from Utils.ControlUtils import ControlUtils
from Utils.LogManager import LogManager from Utils.LogManager import LogManager
from Entity.Variables import anchorList, removeModelFromAnchorList
# 脚本管理类 # 脚本管理类
@@ -28,10 +31,14 @@ class ScriptManager():
def growAccount(self, udid, event): def growAccount(self, udid, event):
client = wda.USBClient(udid) client = wda.USBClient(udid)
session = client.session() session = client.session()
session.appium_settings({"snapshotMaxDepth": 15}) session.appium_settings({"snapshotMaxDepth": 0})
# 检测当前打开的是否是Tik Tok。如果不是就打开 # 先关闭Tik Tok
ControlUtils.openTikTok(session) ControlUtils.closeTikTok(session, udid)
time.sleep(1)
# 重新打开Tik Tok
ControlUtils.openTikTok(session, udid)
time.sleep(3) time.sleep(3)
# 创建udid名称的目录 # 创建udid名称的目录
@@ -93,6 +100,86 @@ class ScriptManager():
print(f"发生异常:{e}") print(f"发生异常:{e}")
client.swipe_up() client.swipe_up()
# 观看直播
def viewLive(self):
pass
# 关注打招呼
def greetNewFollowers(self, udid, needReply, event):
client = wda.USBClient(udid)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 15})
# 先关闭Tik Tok
ControlUtils.closeTikTok(session, udid)
time.sleep(1)
# 重新打开Tik Tok
ControlUtils.openTikTok(session, udid)
time.sleep(3)
# 点击搜索按钮
searchButton = ControlUtils.clickSearch(session)
if searchButton is not None:
searchButton.click()
else:
print("没找到搜索按钮")
return
# 搜索框
input = session.xpath('//XCUIElementTypeSearchField')
# 获取一个主播
anchor = anchorList[0]
aid = anchor.anchorId
# 如果找到了输入框,就点击并且输入内容
if input.exists:
input.click()
time.sleep(1)
input.set_text(aid + "\n")
# 切换UI查找深度
session.appium_settings({"snapshotMaxDepth": 25})
# 点击进入主播首页
session.xpath(
'(//XCUIElementTypeCollectionView[@name="TTKSearchCollectionComponent"]'
'//XCUIElementTypeCell'
'//XCUIElementTypeButton[following-sibling::XCUIElementTypeButton[@name="关注" or @label="关注"]])[1]'
).get(timeout=5).tap()
time.sleep(3)
# 向上划一点
r = client.swipe_up()
print(r)
# 分析页面节点
# print(session.source())
# 观看主播视频
def viewAnchorVideo():
pass
viewAnchorVideo()
else:
pass
# 功能待完善
# while not event.is_set() and len(anchorList) > 0:
# anchor = anchorList[0]
# aid = anchor.anchorId
#
# ControlUtils.clickSearch(session, udid)
#
# # 删除数据
# removeModelFromAnchorList(anchor)
# print(len(anchorList))
# manager = ScriptManager() # manager = ScriptManager()
# manager.growAccount("eca000fcb6f55d7ed9b4c524055214c26a7de7aa") # manager.growAccount("eca000fcb6f55d7ed9b4c524055214c26a7de7aa")