diff --git a/Module/DeviceInfo.py b/Module/DeviceInfo.py index d8e21a6..b0e8c85 100644 --- a/Module/DeviceInfo.py +++ b/Module/DeviceInfo.py @@ -6,6 +6,7 @@ from tidevice import Usbmux from Entity.DeviceModel import DeviceModel from Entity.Variables import tikTokApp, WdaAppBundleId from Module.FlaskSubprocessManager import FlaskSubprocessManager +from Utils.LogManager import LogManager threadLock = threading.Lock() @@ -71,7 +72,13 @@ class Deviceinfo(object): # 连接设备 def connectDevice(self, identifier): - d = wda.USBClient(identifier, 8100) + try: + d = wda.USBClient(identifier, 8100) + LogManager.info("启动wda成功", identifier) + except Exception as e: + LogManager.error("启动wda失败。请检查wda是否正常", identifier) + return + d.app_start(WdaAppBundleId) d.home() time.sleep(2) @@ -82,6 +89,8 @@ class Deviceinfo(object): "id": identifier }) + + # 转发设备端口 def relayDeviceScreenPort(self): try: diff --git a/Module/Main.py b/Module/Main.py index 0818a43..9a7d82c 100644 --- a/Module/Main.py +++ b/Module/Main.py @@ -1,3 +1,5 @@ +import time + from Module.DeviceInfo import Deviceinfo from Module.FlaskSubprocessManager import FlaskSubprocessManager from Utils.LogManager import LogManager @@ -5,6 +7,7 @@ from Utils.LogManager import LogManager if __name__ == "__main__": LogManager.clearLogs() + time.sleep(1) print("启动flask") manager = FlaskSubprocessManager.get_instance() diff --git a/Utils/AiUtils.py b/Utils/AiUtils.py index 53d4e8c..55f5f33 100644 --- a/Utils/AiUtils.py +++ b/Utils/AiUtils.py @@ -3,6 +3,9 @@ import os import re import cv2 import numpy as np +import wda + +from Utils.LogManager import LogManager # 工具类 @@ -10,42 +13,47 @@ class AiUtils(object): # 在屏幕中找到对应的图片 @classmethod - def findImageInScreen(cls, target): - # 加载原始图像和模板图像 - image_path = AiUtils.imagePathWithName("bgv") # 替换为你的图像路径 - template_path = AiUtils.imagePathWithName(target) # 替换为你的模板路径 + def findImageInScreen(cls, target, udid): + try: + # 加载原始图像和模板图像 + image_path = AiUtils.imagePathWithName(udid,"bgv") # 替换为你的图像路径 + template_path = AiUtils.imagePathWithName("", target) # 替换为你的模板路径 - # 读取图像和模板,确保它们都是单通道灰度图 - image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE) - template = cv2.imread(template_path, cv2.IMREAD_GRAYSCALE) + # 读取图像和模板,确保它们都是单通道灰度图 + image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE) + template = cv2.imread(template_path, cv2.IMREAD_GRAYSCALE) - if image is None: - raise ValueError("背景图无法加载") + if image is None: + LogManager.error("加载背景图失败") + return -1, -1 - if template is None: - raise ValueError("模板图无法加载") + if template is None: + LogManager.error("加载模板图失败") + return -1, -1 + # 获取模板的宽度和高度 + w, h = template.shape[::-1] - # 获取模板的宽度和高度 - w, h = template.shape[::-1] - - # 使用模板匹配方法 - res = cv2.matchTemplate(image, template, cv2.TM_CCOEFF_NORMED) - threshold = 0.7 # 匹配度阈值,可以根据需要调整 - loc = np.where(res >= threshold) - - # 检查是否有匹配结果 - if loc[0].size > 0: - # 取第一个匹配位置 - pt = zip(*loc[::-1]).__next__() # 获取第一个匹配点的坐标 - center_x = int(pt[0] + w // 2) - center_y = int(pt[1] + h // 2) - # print(f"第一个匹配到的小心心中心坐标: ({center_x}, {center_y})") - return center_x, center_y - else: - print("未找到匹配的目标") - return -1, -1 + # 使用模板匹配方法 + res = cv2.matchTemplate(image, template, cv2.TM_CCOEFF_NORMED) + threshold = 0.7 # 匹配度阈值,可以根据需要调整 + loc = np.where(res >= threshold) + # 检查是否有匹配结果 + if loc[0].size > 0: + print("匹配到目标了") + # 取第一个匹配位置 + pt = zip(*loc[::-1]).__next__() # 获取第一个匹配点的坐标 + center_x = int(pt[0] + w // 2) + center_y = int(pt[1] + h // 2) + # print(f"第一个匹配到的小心心中心坐标: ({center_x}, {center_y})") + return center_x, center_y + else: + print("未找到匹配的目标") + return -1, -1 + except Exception as e: + LogManager.error(f"加载素材失败:{e}", udid) + print(e) # 使用正则查找字符串中的数字 @classmethod @@ -60,7 +68,11 @@ class AiUtils(object): # 选择截图 @classmethod def screenshot(cls): - image_path = AiUtils.imagePathWithName("bgv") # 替换为你的图像路径 + client = wda.USBClient("eca000fcb6f55d7ed9b4c524055214c26a7de7aa") + session = client.session() + image = session.screenshot() + image_path = "screenshot.png" + image.save(image_path) image = cv2.imread(image_path) # 如果图像过大,缩小显示 @@ -133,15 +145,84 @@ class AiUtils(object): # 根据名称获取图片完整地址 @classmethod - def imagePathWithName(cls, name): + def imagePathWithName(cls, udid, name): current_file_path = os.path.abspath(__file__) # 获取当前文件所在的目录(即script目录) current_dir = os.path.dirname(current_file_path) # 由于script目录位于项目根目录下一级,因此需要向上一级目录移动两次 project_root = os.path.abspath(os.path.join(current_dir, '..')) # 构建资源文件的完整路径,向上两级目录,然后进入 resources 目录 - resource_path = os.path.abspath(os.path.join(project_root, 'resources', name + ".png")).replace('/', '\\') + resource_path = os.path.abspath(os.path.join(project_root, "resources", udid, name + ".png")).replace('/', '\\') return resource_path + # 获取根目录 + @classmethod + def getRootDir(cls): + current_file = os.path.abspath(__file__) + # 获取当前文件所在的目录 + current_dir = os.path.dirname(current_file) + # 获取项目根目录(假设根目录是当前文件的父目录的父目录) + project_root = os.path.dirname(current_dir) + # 返回根目录 + return project_root + + + # 创建一个以udid命名的目录 + @classmethod + def makeUdidDir(cls, udid): + # 获取项目根目录 + home = cls.getRootDir() + # 拼接 resources 目录的路径 + resources_dir = os.path.join(home, "resources") + # 拼接 udid 目录的路径 + udid_dir = os.path.join(resources_dir, udid) + # 检查 udid 目录是否存在,如果不存在则创建 + if not os.path.exists(udid_dir): + try: + os.makedirs(udid_dir) + LogManager.info(f"目录 {udid_dir} 创建成功", udid) + print(f"目录 {udid_dir} 创建成功") + except Exception as e: + print(f"创建目录时出错: {e}") + LogManager.error(f"创建目录时出错: {e}", udid) + else: + LogManager.info(f"目录 {udid_dir} 已存在,跳过创建", udid) + print(f"目录 {udid_dir} 已存在,跳过创建") + + # 查找首页按钮 + # uuid 设备id + # click 是否点击该按钮 + @classmethod + def findHomeButton(cls, udid="eca000fcb6f55d7ed9b4c524055214c26a7de7aa"): + client = wda.USBClient(udid) + session = client.session() + session.appium_settings({"snapshotMaxDepth": 10}) + homeButton = session.xpath("//*[@label='首页']") + try: + if homeButton.label == "首页": + print("找到了") + return homeButton + else: + print("没找到") + return None + except Exception as e: + print(e) + return None + + + # 查找关闭按钮 + @classmethod + def findCloseButton(cls,udid="eca000fcb6f55d7ed9b4c524055214c26a7de7aa"): + client = wda.USBClient(udid) + session = client.session() + session.appium_settings({"snapshotMaxDepth": 30}) + r = session.xpath("//XCUIElementTypeButton[@name='关闭屏幕']") + try: + if r.label == "关闭屏幕": + return r + else: + return None + except Exception as e: + print(e) + return None -# AiUtils.screenshot() \ No newline at end of file diff --git a/Utils/ControlUtils.py b/Utils/ControlUtils.py index 0b23f25..682be17 100644 --- a/Utils/ControlUtils.py +++ b/Utils/ControlUtils.py @@ -26,14 +26,13 @@ class ControlUtils(object): return False else: print("本页面无法返回") - LogManager.info("没有返回按钮") return False # 点赞 @classmethod - def clickLike(cls, session: Client): + def clickLike(cls, session: Client, udid): scale = session.scale - x, y = AiUtils.findImageInScreen("add") + x, y = AiUtils.findImageInScreen(udid,"add") print(x,y) if x > -1: print("找到目标了") diff --git a/Utils/LogManager.py b/Utils/LogManager.py index cc74e99..8b3f209 100644 --- a/Utils/LogManager.py +++ b/Utils/LogManager.py @@ -1,71 +1,78 @@ import logging import os +import shutil + class LogManager: # 获取项目根目录 projectRoot = os.path.dirname(os.path.dirname(__file__)) logDir = os.path.join(projectRoot, "log") - infoLogFile = os.path.join(logDir, "info.log") - warningLogFile = os.path.join(logDir, "warning.log") - errorLogFile = os.path.join(logDir, "error.log") # 类变量,存储日志记录器 - _infoLogger = None - _warningLogger = None - _errorLogger = None + _loggers = {} @classmethod - def _setupLogger(cls, name, logFile, level=logging.INFO): + def _setupLogger(cls, udid, name, logName, level=logging.INFO): """设置日志记录器""" - os.makedirs(cls.logDir, exist_ok=True) # 确保日志目录存在 - logger = logging.getLogger(name) + deviceLogDir = os.path.join(cls.logDir, udid) + os.makedirs(deviceLogDir, exist_ok=True) # 确保日志目录存在 + logFile = os.path.join(deviceLogDir, logName) + logger = logging.getLogger(f"{udid}_{name}") logger.setLevel(level) fileHandler = logging.FileHandler(logFile, mode="a", encoding="utf-8") - formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S") fileHandler.setFormatter(formatter) logger.addHandler(fileHandler) return logger @classmethod - def _initializeLoggers(cls): - """初始化所有日志记录器""" - if not cls._infoLogger: - cls._infoLogger = cls._setupLogger("infoLogger", cls.infoLogFile, level=logging.INFO) - if not cls._warningLogger: - cls._warningLogger = cls._setupLogger("warningLogger", cls.warningLogFile, level=logging.WARNING) - if not cls._errorLogger: - cls._errorLogger = cls._setupLogger("errorLogger", cls.errorLogFile, level=logging.ERROR) + def _getLogger(cls, udid, name, logName, level=logging.INFO): + """获取或初始化日志记录器""" + if udid not in cls._loggers: + cls._loggers[udid] = {} + if name not in cls._loggers[udid]: + cls._loggers[udid][name] = cls._setupLogger(udid, name, logName, level) + return cls._loggers[udid][name] @classmethod - def info(cls, text): + def info(cls, text, udid): """记录 INFO 级别的日志""" - cls._initializeLoggers() - cls._infoLogger.info(text) + logger = cls._getLogger(udid, "infoLogger", "info.log", level=logging.INFO) + logger.info(f"[{udid}] {text}") @classmethod - def warning(cls, text): + def warning(cls, text, udid): """记录 WARNING 级别的日志""" - cls._initializeLoggers() - cls._warningLogger.warning(text) + logger = cls._getLogger(udid, "warningLogger", "warning.log", level=logging.WARNING) + logger.warning(f"[{udid}] {text}") @classmethod - def error(cls, text): + def error(cls, text, udid): """记录 ERROR 级别的日志""" - cls._initializeLoggers() - cls._errorLogger.error(text) + logger = cls._getLogger(udid, "errorLogger", "error.log", level=logging.ERROR) + logger.error(f"[{udid}] {text}") @classmethod def clearLogs(cls): - """删除所有日志文件""" + """清空整个 log 目录下的所有内容""" + print("开始清空日志...") + # 关闭所有日志记录器的处理器 - for logger in [cls._infoLogger, cls._warningLogger, cls._errorLogger]: - if logger: + for udid in cls._loggers: + for name in cls._loggers[udid]: + logger = cls._loggers[udid][name] for handler in logger.handlers[:]: # 使用切片避免在迭代时修改列表 handler.close() logger.removeHandler(handler) + print(f"关闭了 {udid}_{name} 的处理器") - # 删除日志文件 - for logFile in [cls.infoLogFile, cls.warningLogFile, cls.errorLogFile]: - if os.path.exists(logFile): - os.remove(logFile) # 删除文件 - print("所有日志文件已删除") \ No newline at end of file + # 删除整个 log 目录 + if os.path.exists(cls.logDir): + shutil.rmtree(cls.logDir) # 删除目录及其所有内容 + print(f"删除了 {cls.logDir}") + os.makedirs(cls.logDir, exist_ok=True) # 重新创建空的 log 目录 + print(f"重新创建了 {cls.logDir}") + else: + print(f"{cls.logDir} 不存在,无需删除") + print("日志清空完成") \ No newline at end of file diff --git a/resources/eca000fcb6f55d7ed9b4c524055214c26a7de7aa/bgv.png b/resources/eca000fcb6f55d7ed9b4c524055214c26a7de7aa/bgv.png new file mode 100644 index 0000000..934012d Binary files /dev/null and b/resources/eca000fcb6f55d7ed9b4c524055214c26a7de7aa/bgv.png differ diff --git a/script/ScriptManager.py b/script/ScriptManager.py index 14301d6..df14e3e 100644 --- a/script/ScriptManager.py +++ b/script/ScriptManager.py @@ -1,9 +1,10 @@ import random import time import wda - +import os from Utils.AiUtils import AiUtils from Utils.ControlUtils import ControlUtils +from Utils.LogManager import LogManager # 脚本管理类 @@ -33,39 +34,64 @@ class ScriptManager(): ControlUtils.openTikTok(session) time.sleep(3) + # 创建udid名称的目录 + AiUtils.makeUdidDir(udid) + # 假设此时有个开关 while not event.is_set(): try: img = client.screenshot() - filePath = "resources/bgv.png" + filePath = f"resources/{udid}/bgv.png" img.save(filePath) + LogManager.info("保存屏幕图像成功", udid) + print("保存了背景图") + time.sleep(1) except Exception as e: + LogManager.error(e, udid) print(e) - # 判断视频类型。普通视频、广告、直播,只有普通视频停留。其他视频全部划走 - addX, addY = AiUtils.findImageInScreen("add") - # 如果找到普通视频 - if addX > 0: - needLike = random.randint(0, 10) + try: + # 判断视频类型 + addX, addY = AiUtils.findImageInScreen("add", udid) + isSame = False + for i in range(2): + tx, ty = AiUtils.findImageInScreen("add", udid) + if addX == tx and addY == ty: + isSame = True + time.sleep(1) + else: + isSame = False + break - # 随机时间间隔 - videoTime = random.randint(2,5) - time.sleep(videoTime) + # 如果找到普通视频 + if addX > 0 and isSame: + needLike = random.randint(0, 10) + # 查找首页按钮 + homeButton = AiUtils.findHomeButton(udid) + if homeButton: + print("查看视频") + videoTime = random.randint(5, 15) + time.sleep(videoTime) - # 百分之三的概率点赞 - if needLike < 3: - ControlUtils.clickLike(session) + # 百分之三的概率点赞 + if needLike < 3: + print("点赞") + ControlUtils.clickLike(session, udid) - # 随机时间间隔 - # videoTime = random.randint(10,30) - # time.sleep(videoTime) - session.swipe_up() - else: - # 随机时间 - nextTime = random.randint(1,5) - time.sleep(nextTime) - session.swipe_up() - continue + print("继续观看视频") + videoTime = random.randint(10, 30) + time.sleep(videoTime) + print("准备划到下一个视频") + client.swipe_up() + else: + print("目前没有首页按钮。需要另外处理") + else: + nextTime = random.randint(1, 5) + time.sleep(nextTime) + client.swipe_up() + except Exception as e: + print(f"发生异常:{e}") + client.swipe_up() # manager = ScriptManager() # manager.growAccount("eca000fcb6f55d7ed9b4c524055214c26a7de7aa")