From f138bbfcc09752cf5a7dde706babbf4b97302604 Mon Sep 17 00:00:00 2001 From: zw <12345678> Date: Wed, 20 Aug 2025 14:38:02 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=B4=E6=97=B6=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .idea/workspace.xml | 7 +-- script/ScriptManager.py | 128 +++++++++------------------------------- 2 files changed, 28 insertions(+), 107 deletions(-) diff --git a/.idea/workspace.xml b/.idea/workspace.xml index c37e3a3..6d7dc9e 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -6,11 +6,6 @@ - - - - - diff --git a/script/ScriptManager.py b/script/ScriptManager.py index 8945239..fa28815 100644 --- a/script/ScriptManager.py +++ b/script/ScriptManager.py @@ -74,13 +74,17 @@ class ScriptManager(): # 如果找到普通视频 if addX > 0 and isSame: - needLike = random.randint(0, 10) + needLike = random.randint(0, 100) # 查找首页按钮 homeButton = AiUtils.findHomeButton(udid) if homeButton: print("有首页按钮,查看视频") videoTime = random.randint(5, 15) time.sleep(videoTime) + # 点赞之前重置我wda会话 防止会话失效 + client = wda.USBClient(udid) + session = client.session() + session.appium_settings({"snapshotMaxDepth": 0}) # 百分之三的概率点赞 if needLike < 3: @@ -107,9 +111,10 @@ class ScriptManager(): import time, random, wda retry_count = 0 - backoff_sec = 5 # 异常后冷却,避免频繁重启 + backoff_sec = 100 # 异常后冷却,避免频繁重启 while not event.is_set(): + if max_retries is not None and retry_count >= max_retries: LogManager.error(f"达到最大重试次数,停止任务。retries={retry_count}", udid) break @@ -126,8 +131,9 @@ class ScriptManager(): ControlUtils.openTikTok(session, udid) time.sleep(3) - # 2) 进入直播 + # 2) 进入直播 (使用英文) live_button = session(xpath='//XCUIElementTypeButton[@name="直播"]') + if live_button.exists: live_button.click() else: @@ -145,6 +151,10 @@ class ScriptManager(): while not event.is_set(): time.sleep(3) + # 找到一个看直播的时候肯定有的元素,当这个元素没有的时候,就代表当前的页面出现了问题 + # 需要抛出异常,重启这个流程 + + # PK 直接划走 if session(xpath='//XCUIElementTypeOther[@name="kGBLInteractionViewMatchScoreBar"]').exists: print("✅ 当前是 PK,跳过") @@ -160,10 +170,10 @@ class ScriptManager(): session.swipe_up() continue else: - print("✅ 单窗口,(20%概率)开始点赞") + print("✅ 单窗口,(10%概率)开始点赞") # 随机点赞(仍保留中途保护) - if random.random() >= 0.90: # 你原来是 0.89,可自行调整概率 + if random.random() >= 0.90: print("开始点赞") for _ in range(random.randint(10, 30)): # 中途转PK/连麦立即跳过 @@ -196,84 +206,6 @@ class ScriptManager(): time.sleep(backoff_sec) # 冷却后整段流程重来 continue - # def watchLiveForGrowth(self, udid, event): - # - # client = wda.USBClient(udid) - # session = client.session() - # - # session.appium_settings({"snapshotMaxDepth": 15}) - # # 先关闭Tik Tok - # ControlUtils.closeTikTok(session, udid) - # time.sleep(1) - # - # # 重新打开Tik Tok - # ControlUtils.openTikTok(session, udid) - # time.sleep(3) - # # 进入直播 - # live_button = session(xpath='//XCUIElementTypeButton[@name="直播"]') - # if live_button.exists: - # live_button.click() - # else: - # LogManager.error(f"无法找到直播间按钮", udid) - # time.sleep(20) - # - # size = session.window_size() - # width, height = size.width, size.height - # - # # 可选:重新拉起 session,规避偶发 Stale 会话 - # session = client.session() - # - # while not event.is_set(): - # try: - # time.sleep(3) - # - # # 如果处于 PK(分数条),直接划走 - # if session(xpath='//XCUIElementTypeOther[@name="kGBLInteractionViewMatchScoreBar"]').exists: - # print("✅ 当前是 PK,跳过") - # session.swipe_up() - # continue - # - # # 数直播显示区域窗口(主画面 + 连麦小窗) - # count = AiUtils.count_add_by_xml(session) - # print(f"检测到直播显示区域窗口数:{count}") - # - # if count > 1: - # print("❌ 多窗口(有人连麦/分屏),划走") - # session.swipe_up() - # continue - # else: - # print("✅ 单窗口(只有一个主播),(目前是20%概率)开始点赞") - # - # # 点赞(仍保留中途转PK的保护) - # if random.random() >= 0.89: - # print("开始点赞") - # for _ in range(random.randint(10, 30)): - # if session(xpath='//XCUIElementTypeOther[@name="kGBLInteractionViewMatchScoreBar"]').exists: - # print("❗ 中途开始 PK,停止点赞并跳过") - # session.swipe_up() - # break - # if AiUtils.count_add_by_xml(session) > 1: - # print("❗ 中途开始 连麦,停止点赞并跳过") - # session.swipe_up() - # break - # x = width // 3 + random.randint(-10, 10) - # y = height // 3 + random.randint(10, 20) - # print("双击坐标:", x, y) - # session.double_tap(x, y) - # - # print("--------------------------------------------") - # time.sleep(random.randint(100, 300)) - # session.swipe_up() - # - # except Exception as e: - # print("循环异常,重试:", repr(e)) - # # 轻量恢复:重新获取 session,避免因为快照或元素句柄失效卡死 - # try: - # session = client.session() - # except Exception: - # time.sleep(2) - # session = client.session() - # 关注打招呼以及回复主播消息 def greetNewFollowers(self, udid, needReply, event): client = wda.USBClient(udid) @@ -443,7 +375,6 @@ class ScriptManager(): time.sleep(2) print("即将要回复消息") - print(f"页面层级:{session.source()}") if needReply: print("如果需要回复主播消息。走此逻辑") if AiUtils.getUnReadMsgCount(session) > 0: @@ -451,7 +382,6 @@ class ScriptManager(): print("监控回复消息") # 执行回复消息逻辑 self.monitorMessages(session, udid) - # 判断是否有首页按钮 homeButton = AiUtils.findHomeButton(udid) if homeButton.exists: homeButton.click() @@ -470,12 +400,12 @@ class ScriptManager(): session.appium_settings({"snapshotMaxDepth": 15}) # 点击搜索按钮 ControlUtils.clickSearch(session) + else: session.appium_settings({"snapshotMaxDepth": 15}) # 点击搜索按钮 ControlUtils.clickSearch(session) - def replyMessages(self, udid, event): client = wda.USBClient(udid) session = client.session() @@ -522,14 +452,6 @@ class ScriptManager(): LogManager.info(f"当前收件箱的总数量{count}", udid) break - # 双击收件箱 定位到消息的位置 - - r = el.bounds # 可能是命名属性,也可能是 tuple - cx = int((r.x + r.width / 2) if hasattr(r, "x") else (r[0] + r[2] / 2)) - cy = int((r.y + r.height / 2) if hasattr(r, "y") else (r[1] + r[3] / 2)) - - session.double_tap(cx, cy) # 可能抛异常:方法不存在 - LogManager.info(f"双击收件箱 定位到信息", udid) # 新粉丝 xp_new_fan_badge = ( @@ -652,8 +574,6 @@ class ScriptManager(): anchor_name = AiUtils.get_navbar_anchor_name(session) # 找到输入框 - # sel = session.xpath( - # "//XCUIElementTypeTextView[@name='消息...' or @label='消息...' or @value='消息...']") sel = session.xpath("//TextView") @@ -687,12 +607,18 @@ class ScriptManager(): LogManager.warning("当前屏幕没有找到 用户 未读徽标数字", udid) badge_text = None + # 双击收件箱 定位到消息的位置 + + r = el.bounds # 可能是命名属性,也可能是 tuple + cx = int((r.x + r.width / 2) if hasattr(r, "x") else (r[0] + r[2] / 2)) + cy = int((r.y + r.height / 2) if hasattr(r, "y") else (r[1] + r[3] / 2)) + + session.double_tap(cx, cy) # 可能抛异常:方法不存在 + + LogManager.info(f"双击收件箱 定位到信息", udid) + def test(self, udid): client = wda.USBClient(udid) session = client.session() session.appium_settings({"snapshotMaxDepth": 10}) - print(client.source()) - -# manager = ScriptManager() -# manager.test("2335890f77fb51322361bd46b85f7fd1311aed53") -# manager.growAccount("2335890f77fb51322361bd46b85f7fd1311aed53") + print(client.source()) \ No newline at end of file