调整项目结构。修改工具类逻辑

This commit is contained in:
zw
2025-08-07 20:55:30 +08:00
parent 606a80a30f
commit 3f7c366278
7 changed files with 221 additions and 96 deletions

View File

@@ -6,6 +6,7 @@ from tidevice import Usbmux
from Entity.DeviceModel import DeviceModel
from Entity.Variables import tikTokApp, WdaAppBundleId
from Module.FlaskSubprocessManager import FlaskSubprocessManager
from Utils.LogManager import LogManager
threadLock = threading.Lock()
@@ -71,7 +72,13 @@ class Deviceinfo(object):
# 连接设备
def connectDevice(self, identifier):
d = wda.USBClient(identifier, 8100)
try:
d = wda.USBClient(identifier, 8100)
LogManager.info("启动wda成功", identifier)
except Exception as e:
LogManager.error("启动wda失败。请检查wda是否正常", identifier)
return
d.app_start(WdaAppBundleId)
d.home()
time.sleep(2)
@@ -82,6 +89,8 @@ class Deviceinfo(object):
"id": identifier
})
# 转发设备端口
def relayDeviceScreenPort(self):
try:

View File

@@ -1,3 +1,5 @@
import time
from Module.DeviceInfo import Deviceinfo
from Module.FlaskSubprocessManager import FlaskSubprocessManager
from Utils.LogManager import LogManager
@@ -5,6 +7,7 @@ from Utils.LogManager import LogManager
if __name__ == "__main__":
LogManager.clearLogs()
time.sleep(1)
print("启动flask")
manager = FlaskSubprocessManager.get_instance()

View File

@@ -3,6 +3,9 @@ import os
import re
import cv2
import numpy as np
import wda
from Utils.LogManager import LogManager
# 工具类
@@ -10,42 +13,47 @@ class AiUtils(object):
# 在屏幕中找到对应的图片
@classmethod
def findImageInScreen(cls, target):
# 加载原始图像和模板图像
image_path = AiUtils.imagePathWithName("bgv") # 替换为你的图像路径
template_path = AiUtils.imagePathWithName(target) # 替换为你的模板路径
def findImageInScreen(cls, target, udid):
try:
# 加载原始图像和模板图像
image_path = AiUtils.imagePathWithName(udid,"bgv") # 替换为你的图像路径
template_path = AiUtils.imagePathWithName("", target) # 替换为你的模板路径
# 读取图像和模板,确保它们都是单通道灰度图
image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
template = cv2.imread(template_path, cv2.IMREAD_GRAYSCALE)
# 读取图像和模板,确保它们都是单通道灰度图
image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
template = cv2.imread(template_path, cv2.IMREAD_GRAYSCALE)
if image is None:
raise ValueError("背景图无法加载")
if image is None:
LogManager.error("加载背景图失败")
return -1, -1
if template is None:
raise ValueError("模板图无法加载")
if template is None:
LogManager.error("加载模板图失败")
return -1, -1
# 获取模板的宽度和高度
w, h = template.shape[::-1]
# 获取模板的宽度和高度
w, h = template.shape[::-1]
# 使用模板匹配方法
res = cv2.matchTemplate(image, template, cv2.TM_CCOEFF_NORMED)
threshold = 0.7 # 匹配度阈值,可以根据需要调整
loc = np.where(res >= threshold)
# 检查是否有匹配结果
if loc[0].size > 0:
# 取第一个匹配位置
pt = zip(*loc[::-1]).__next__() # 获取第一个匹配点的坐标
center_x = int(pt[0] + w // 2)
center_y = int(pt[1] + h // 2)
# print(f"第一个匹配到的小心心中心坐标: ({center_x}, {center_y})")
return center_x, center_y
else:
print("未找到匹配的目标")
return -1, -1
# 使用模板匹配方法
res = cv2.matchTemplate(image, template, cv2.TM_CCOEFF_NORMED)
threshold = 0.7 # 匹配度阈值,可以根据需要调整
loc = np.where(res >= threshold)
# 检查是否有匹配结果
if loc[0].size > 0:
print("匹配到目标了")
# 取第一个匹配位置
pt = zip(*loc[::-1]).__next__() # 获取第一个匹配点的坐标
center_x = int(pt[0] + w // 2)
center_y = int(pt[1] + h // 2)
# print(f"第一个匹配到的小心心中心坐标: ({center_x}, {center_y})")
return center_x, center_y
else:
print("未找到匹配的目标")
return -1, -1
except Exception as e:
LogManager.error(f"加载素材失败:{e}", udid)
print(e)
# 使用正则查找字符串中的数字
@classmethod
@@ -60,7 +68,11 @@ class AiUtils(object):
# 选择截图
@classmethod
def screenshot(cls):
image_path = AiUtils.imagePathWithName("bgv") # 替换为你的图像路径
client = wda.USBClient("eca000fcb6f55d7ed9b4c524055214c26a7de7aa")
session = client.session()
image = session.screenshot()
image_path = "screenshot.png"
image.save(image_path)
image = cv2.imread(image_path)
# 如果图像过大,缩小显示
@@ -133,15 +145,84 @@ class AiUtils(object):
# 根据名称获取图片完整地址
@classmethod
def imagePathWithName(cls, name):
def imagePathWithName(cls, udid, name):
current_file_path = os.path.abspath(__file__)
# 获取当前文件所在的目录即script目录
current_dir = os.path.dirname(current_file_path)
# 由于script目录位于项目根目录下一级因此需要向上一级目录移动两次
project_root = os.path.abspath(os.path.join(current_dir, '..'))
# 构建资源文件的完整路径,向上两级目录,然后进入 resources 目录
resource_path = os.path.abspath(os.path.join(project_root, 'resources', name + ".png")).replace('/', '\\')
resource_path = os.path.abspath(os.path.join(project_root, "resources", udid, name + ".png")).replace('/', '\\')
return resource_path
# 获取根目录
@classmethod
def getRootDir(cls):
current_file = os.path.abspath(__file__)
# 获取当前文件所在的目录
current_dir = os.path.dirname(current_file)
# 获取项目根目录(假设根目录是当前文件的父目录的父目录)
project_root = os.path.dirname(current_dir)
# 返回根目录
return project_root
# 创建一个以udid命名的目录
@classmethod
def makeUdidDir(cls, udid):
# 获取项目根目录
home = cls.getRootDir()
# 拼接 resources 目录的路径
resources_dir = os.path.join(home, "resources")
# 拼接 udid 目录的路径
udid_dir = os.path.join(resources_dir, udid)
# 检查 udid 目录是否存在,如果不存在则创建
if not os.path.exists(udid_dir):
try:
os.makedirs(udid_dir)
LogManager.info(f"目录 {udid_dir} 创建成功", udid)
print(f"目录 {udid_dir} 创建成功")
except Exception as e:
print(f"创建目录时出错: {e}")
LogManager.error(f"创建目录时出错: {e}", udid)
else:
LogManager.info(f"目录 {udid_dir} 已存在,跳过创建", udid)
print(f"目录 {udid_dir} 已存在,跳过创建")
# 查找首页按钮
# uuid 设备id
# click 是否点击该按钮
@classmethod
def findHomeButton(cls, udid="eca000fcb6f55d7ed9b4c524055214c26a7de7aa"):
client = wda.USBClient(udid)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 10})
homeButton = session.xpath("//*[@label='首页']")
try:
if homeButton.label == "首页":
print("找到了")
return homeButton
else:
print("没找到")
return None
except Exception as e:
print(e)
return None
# 查找关闭按钮
@classmethod
def findCloseButton(cls,udid="eca000fcb6f55d7ed9b4c524055214c26a7de7aa"):
client = wda.USBClient(udid)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 30})
r = session.xpath("//XCUIElementTypeButton[@name='关闭屏幕']")
try:
if r.label == "关闭屏幕":
return r
else:
return None
except Exception as e:
print(e)
return None
# AiUtils.screenshot()

View File

@@ -26,14 +26,13 @@ class ControlUtils(object):
return False
else:
print("本页面无法返回")
LogManager.info("没有返回按钮")
return False
# 点赞
@classmethod
def clickLike(cls, session: Client):
def clickLike(cls, session: Client, udid):
scale = session.scale
x, y = AiUtils.findImageInScreen("add")
x, y = AiUtils.findImageInScreen(udid,"add")
print(x,y)
if x > -1:
print("找到目标了")

View File

@@ -1,71 +1,78 @@
import logging
import os
import shutil
class LogManager:
# 获取项目根目录
projectRoot = os.path.dirname(os.path.dirname(__file__))
logDir = os.path.join(projectRoot, "log")
infoLogFile = os.path.join(logDir, "info.log")
warningLogFile = os.path.join(logDir, "warning.log")
errorLogFile = os.path.join(logDir, "error.log")
# 类变量,存储日志记录器
_infoLogger = None
_warningLogger = None
_errorLogger = None
_loggers = {}
@classmethod
def _setupLogger(cls, name, logFile, level=logging.INFO):
def _setupLogger(cls, udid, name, logName, level=logging.INFO):
"""设置日志记录器"""
os.makedirs(cls.logDir, exist_ok=True) # 确保日志目录存在
logger = logging.getLogger(name)
deviceLogDir = os.path.join(cls.logDir, udid)
os.makedirs(deviceLogDir, exist_ok=True) # 确保日志目录存在
logFile = os.path.join(deviceLogDir, logName)
logger = logging.getLogger(f"{udid}_{name}")
logger.setLevel(level)
fileHandler = logging.FileHandler(logFile, mode="a", encoding="utf-8")
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S")
fileHandler.setFormatter(formatter)
logger.addHandler(fileHandler)
return logger
@classmethod
def _initializeLoggers(cls):
"""初始化所有日志记录器"""
if not cls._infoLogger:
cls._infoLogger = cls._setupLogger("infoLogger", cls.infoLogFile, level=logging.INFO)
if not cls._warningLogger:
cls._warningLogger = cls._setupLogger("warningLogger", cls.warningLogFile, level=logging.WARNING)
if not cls._errorLogger:
cls._errorLogger = cls._setupLogger("errorLogger", cls.errorLogFile, level=logging.ERROR)
def _getLogger(cls, udid, name, logName, level=logging.INFO):
"""获取或初始化日志记录器"""
if udid not in cls._loggers:
cls._loggers[udid] = {}
if name not in cls._loggers[udid]:
cls._loggers[udid][name] = cls._setupLogger(udid, name, logName, level)
return cls._loggers[udid][name]
@classmethod
def info(cls, text):
def info(cls, text, udid):
"""记录 INFO 级别的日志"""
cls._initializeLoggers()
cls._infoLogger.info(text)
logger = cls._getLogger(udid, "infoLogger", "info.log", level=logging.INFO)
logger.info(f"[{udid}] {text}")
@classmethod
def warning(cls, text):
def warning(cls, text, udid):
"""记录 WARNING 级别的日志"""
cls._initializeLoggers()
cls._warningLogger.warning(text)
logger = cls._getLogger(udid, "warningLogger", "warning.log", level=logging.WARNING)
logger.warning(f"[{udid}] {text}")
@classmethod
def error(cls, text):
def error(cls, text, udid):
"""记录 ERROR 级别的日志"""
cls._initializeLoggers()
cls._errorLogger.error(text)
logger = cls._getLogger(udid, "errorLogger", "error.log", level=logging.ERROR)
logger.error(f"[{udid}] {text}")
@classmethod
def clearLogs(cls):
"""删除所有日志文件"""
"""清空整个 log 目录下的所有内容"""
print("开始清空日志...")
# 关闭所有日志记录器的处理器
for logger in [cls._infoLogger, cls._warningLogger, cls._errorLogger]:
if logger:
for udid in cls._loggers:
for name in cls._loggers[udid]:
logger = cls._loggers[udid][name]
for handler in logger.handlers[:]: # 使用切片避免在迭代时修改列表
handler.close()
logger.removeHandler(handler)
print(f"关闭了 {udid}_{name} 的处理器")
# 删除日志文件
for logFile in [cls.infoLogFile, cls.warningLogFile, cls.errorLogFile]:
if os.path.exists(logFile):
os.remove(logFile) # 删除文件
print("所有日志文件已删除")
# 删除整个 log 目录
if os.path.exists(cls.logDir):
shutil.rmtree(cls.logDir) # 删除目录及其所有内容
print(f"删除了 {cls.logDir}")
os.makedirs(cls.logDir, exist_ok=True) # 重新创建空的 log 目录
print(f"重新创建了 {cls.logDir}")
else:
print(f"{cls.logDir} 不存在,无需删除")
print("日志清空完成")

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.5 MiB

View File

@@ -1,9 +1,10 @@
import random
import time
import wda
import os
from Utils.AiUtils import AiUtils
from Utils.ControlUtils import ControlUtils
from Utils.LogManager import LogManager
# 脚本管理类
@@ -33,39 +34,64 @@ class ScriptManager():
ControlUtils.openTikTok(session)
time.sleep(3)
# 创建udid名称的目录
AiUtils.makeUdidDir(udid)
# 假设此时有个开关
while not event.is_set():
try:
img = client.screenshot()
filePath = "resources/bgv.png"
filePath = f"resources/{udid}/bgv.png"
img.save(filePath)
LogManager.info("保存屏幕图像成功", udid)
print("保存了背景图")
time.sleep(1)
except Exception as e:
LogManager.error(e, udid)
print(e)
# 判断视频类型。普通视频、广告、直播,只有普通视频停留。其他视频全部划走
addX, addY = AiUtils.findImageInScreen("add")
# 如果找到普通视频
if addX > 0:
needLike = random.randint(0, 10)
try:
# 判断视频类型
addX, addY = AiUtils.findImageInScreen("add", udid)
isSame = False
for i in range(2):
tx, ty = AiUtils.findImageInScreen("add", udid)
if addX == tx and addY == ty:
isSame = True
time.sleep(1)
else:
isSame = False
break
# 随机时间间隔
videoTime = random.randint(2,5)
time.sleep(videoTime)
# 如果找到普通视频
if addX > 0 and isSame:
needLike = random.randint(0, 10)
# 查找首页按钮
homeButton = AiUtils.findHomeButton(udid)
if homeButton:
print("查看视频")
videoTime = random.randint(5, 15)
time.sleep(videoTime)
# 百分之三的概率点赞
if needLike < 3:
ControlUtils.clickLike(session)
# 百分之三的概率点赞
if needLike < 3:
print("点赞")
ControlUtils.clickLike(session, udid)
# 随机时间间隔
# videoTime = random.randint(10,30)
# time.sleep(videoTime)
session.swipe_up()
else:
# 随机时间
nextTime = random.randint(1,5)
time.sleep(nextTime)
session.swipe_up()
continue
print("继续观看视频")
videoTime = random.randint(10, 30)
time.sleep(videoTime)
print("准备划到下一个视频")
client.swipe_up()
else:
print("目前没有首页按钮。需要另外处理")
else:
nextTime = random.randint(1, 5)
time.sleep(nextTime)
client.swipe_up()
except Exception as e:
print(f"发生异常:{e}")
client.swipe_up()
# manager = ScriptManager()
# manager.growAccount("eca000fcb6f55d7ed9b4c524055214c26a7de7aa")