修复不能滑动的bug

This commit is contained in:
2025-11-03 19:08:25 +08:00
parent 0ccd4ee97c
commit 740f45b88b
12 changed files with 33 additions and 77 deletions

View File

@@ -136,7 +136,6 @@ class DeviceInfo:
# =============== 核心端口连通性检测HTTP 方式) ================= # =============== 核心端口连通性检测HTTP 方式) =================
def _is_local_port_open(self, port: int, udid: str, timeout: float = 5) -> bool: def _is_local_port_open(self, port: int, udid: str, timeout: float = 5) -> bool:
print("开始HTTP方式检测端口")
""" """
使用 HTTP 方式检测:向 http://127.0.0.1:port/ 发送一次 HEAD 请求。 使用 HTTP 方式检测:向 http://127.0.0.1:port/ 发送一次 HEAD 请求。
只要建立连接并收到合法的 HTTP 响应(任意 1xx~5xx 状态码),即认为 HTTP 可达。 只要建立连接并收到合法的 HTTP 响应(任意 1xx~5xx 状态码),即认为 HTTP 可达。
@@ -156,7 +155,6 @@ class DeviceInfo:
conn.close() conn.close()
# 任何合法 HTTP 状态码都说明“HTTP 服务在监听且可交互”,包括 404/401/403/5xx # 任何合法 HTTP 状态码都说明“HTTP 服务在监听且可交互”,包括 404/401/403/5xx
if 100 <= status <= 599: if 100 <= status <= 599:
print(f"HTTP端口正常status={status}")
return True return True
else: else:
LogManager.error(f"HTTP状态码异常: {status}", udid=udid) LogManager.error(f"HTTP状态码异常: {status}", udid=udid)
@@ -170,7 +168,6 @@ class DeviceInfo:
# =============== 一轮检查:发现不通就移除 ================= # =============== 一轮检查:发现不通就移除 =================
def check_iproxy_ports(self, connect_timeout: float = 3) -> None: def check_iproxy_ports(self, connect_timeout: float = 3) -> None:
time.sleep(20) time.sleep(20)
print("开始监听投屏端口")
while True: while True:
snapshot = list(self._models.items()) # [(deviceId, DeviceModel), ...] snapshot = list(self._models.items()) # [(deviceId, DeviceModel), ...]
for device_id, model in snapshot: for device_id, model in snapshot:

View File

@@ -48,7 +48,7 @@ def main(arg):
if __name__ == "__main__": if __name__ == "__main__":
# 清空日志 # 清空日志
LogManager.clearLogs() # LogManager.clearLogs()
# main(sys.argv) # main(sys.argv)

View File

@@ -65,8 +65,6 @@ class AiUtils(object):
@classmethod @classmethod
def findImageInScreen(cls, target, udid): def findImageInScreen(cls, target, udid):
try: try:
print("参数", target, udid)
# 加载原始图像和模板图像 # 加载原始图像和模板图像
image_path = AiUtils.imagePathWithName(udid, "bgv") image_path = AiUtils.imagePathWithName(udid, "bgv")
template_path = AiUtils.imagePathWithName("", target) template_path = AiUtils.imagePathWithName("", target)
@@ -119,9 +117,7 @@ class AiUtils(object):
# 选择截图 # 选择截图
@classmethod @classmethod
def screenshot(cls): def screenshot(cls, session):
client = wda.USBClient("eca000fcb6f55d7ed9b4c524055214c26a7de7aa",wdaFunctionPort)
session = client.session()
image = session.screenshot() image = session.screenshot()
image_path = "screenshot.png" image_path = "screenshot.png"
image.save(image_path) image.save(image_path)
@@ -243,9 +239,7 @@ class AiUtils(object):
# uuid 设备id # uuid 设备id
# click 是否点击该按钮 # click 是否点击该按钮
@classmethod @classmethod
def findHomeButton(cls, udid="eca000fcb6f55d7ed9b4c524055214c26a7de7aa"): def findHomeButton(cls, session):
client = wda.USBClient(udid,wdaFunctionPort)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 10}) session.appium_settings({"snapshotMaxDepth": 10})
homeButton = session.xpath("//XCUIElementTypeButton[@name='a11y_vo_home' or @label='Home' or @label='首页']") homeButton = session.xpath("//XCUIElementTypeButton[@name='a11y_vo_home' or @label='Home' or @label='首页']")
try: try:

View File

@@ -286,9 +286,8 @@ class ControlUtils(object):
# 向上滑动 脚本内部使用 # 向上滑动 脚本内部使用
@classmethod @classmethod
def swipe_up(cls, udid): def swipe_up(cls, client):
dev = wda.USBClient(udid, wdaFunctionPort) client.swipe(200, 350, 200, 250, 0.05)
dev.swipe(200, 350, 200, 250, 0.05)
# 向下滑动,脚本内使用 # 向下滑动,脚本内使用
@classmethod @classmethod

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 MiB

View File

@@ -20,9 +20,6 @@ from Utils.Requester import Requester
import Entity.Variables as ev import Entity.Variables as ev
from Utils.TencentOCRUtils import TencentOCR from Utils.TencentOCRUtils import TencentOCR
print("测试sourceTree使用")
# 脚本管理类 # 脚本管理类
class ScriptManager(): class ScriptManager():
# 单利对象 # 单利对象
@@ -49,7 +46,6 @@ class ScriptManager():
self.comment_dir = self.resources_dir / "comment.png" self.comment_dir = self.resources_dir / "comment.png"
self.comment_add_dir = self.resources_dir / "insert_comment.png" self.comment_add_dir = self.resources_dir / "insert_comment.png"
self.comment_add_dir2 = self.resources_dir / "insert_comment2.png" self.comment_add_dir2 = self.resources_dir / "insert_comment2.png"
self.initialized = True # 标记已初始化 self.initialized = True # 标记已初始化
def comment_flow(self, filePath, session, udid, recomend_cx, recomend_cy): def comment_flow(self, filePath, session, udid, recomend_cx, recomend_cy):
@@ -121,35 +117,28 @@ class ScriptManager():
# 养号 # 养号
def growAccount(self, udid, isComment, event, is_monitoring=False): def growAccount(self, udid, isComment, event, is_monitoring=False):
LogManager.method_info(f"调用刷视频", "养号", udid) LogManager.method_info(f"调用刷视频", "养号", udid)
while not event.is_set(): # ========= 初始化 =========
client = wda.USBClient(udid, ev.wdaFunctionPort)
session = client.session()
AiUtils.makeUdidDir(udid)
while not event.is_set():
try: try:
# ========= 初始化 =========
client = wda.USBClient(udid, ev.wdaFunctionPort)
session = client.session()
# 关闭并重新打开 TikTok
if not is_monitoring: if not is_monitoring:
ControlUtils.closeTikTok(session, udid) ControlUtils.closeTikTok(session, udid)
event.wait(timeout=1) event.wait(timeout=1)
ControlUtils.openTikTok(session, udid) ControlUtils.openTikTok(session, udid)
event.wait(timeout=3) event.wait(timeout=3)
LogManager.method_info("养号重启tiktok", "养号", udid)
AiUtils.makeUdidDir(udid)
recomend_cx = 0 recomend_cx = 0
recomend_cy = 0 recomend_cy = 0
# ========= 主循环 ========= # ========= 主循环 =========
while not event.is_set(): while not event.is_set():
# 设置手机的节点深度为15,判断该页面是否正确 # 设置手机的节点深度为15,判断该页面是否正确
session.appium_settings({"snapshotMaxDepth": 15}) session.appium_settings({"snapshotMaxDepth": 15})
el = session.xpath( el = session.xpath(
'//XCUIElementTypeButton[@name="top_tabs_recomend" or @name="推荐" or @label="推荐"]' '//XCUIElementTypeButton[@name="top_tabs_recomend" or @name="推荐" or @label="推荐"]'
) )
# 获取推荐按钮所在的坐标 # 获取推荐按钮所在的坐标
if el.exists: if el.exists:
bounds = el.bounds # 返回 [x, y, width, height] bounds = el.bounds # 返回 [x, y, width, height]
@@ -188,7 +177,6 @@ class ScriptManager():
# ---- 视频逻辑 ---- # ---- 视频逻辑 ----
try: try:
addX, addY = AiUtils.findImageInScreen("add", udid) addX, addY = AiUtils.findImageInScreen("add", udid)
isSame = False isSame = False
for i in range(2): for i in range(2):
tx, ty = AiUtils.findImageInScreen("add", udid) tx, ty = AiUtils.findImageInScreen("add", udid)
@@ -201,7 +189,7 @@ class ScriptManager():
if addX > 0 and isSame: if addX > 0 and isSame:
needLike = random.randint(0, 100) needLike = random.randint(0, 100)
homeButton = AiUtils.findHomeButton(udid) homeButton = AiUtils.findHomeButton(session)
if homeButton: if homeButton:
LogManager.method_info("有首页按钮,查看视频", "养号", udid) LogManager.method_info("有首页按钮,查看视频", "养号", udid)
videoTime = random.randint(5, 15) videoTime = random.randint(5, 15)
@@ -214,22 +202,16 @@ class ScriptManager():
LogManager.method_info("停止脚本成功", method="task") LogManager.method_info("停止脚本成功", method="task")
# 重置 session # 重置 session
client = wda.USBClient(udid, ev.wdaFunctionPort)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 0}) session.appium_settings({"snapshotMaxDepth": 0})
if needLike < 23: if needLike < 23:
LogManager.method_info("进行点赞", "养号", udid) LogManager.method_info("进行点赞", "养号", udid)
ControlUtils.clickLike(session, udid) ControlUtils.clickLike(session, udid)
LogManager.method_info("继续观看视频", "养号", udid) LogManager.method_info("继续观看视频", "养号", udid)
LogManager.method_info("准备划到下一个视频", "养号", udid) LogManager.method_info("准备划到下一个视频", "养号", udid)
# ControlUtils.swipe_up(udid)
else: else:
LogManager.method_error("找不到首页按钮。出错了", "养号", udid) LogManager.method_error("找不到首页按钮。出错了", "养号", udid)
if isComment and random.random() > 0.70: if isComment and random.random() > 0.70:
self.comment_flow(filePath, session, udid, recomend_cx, recomend_cy) self.comment_flow(filePath, session, udid, recomend_cx, recomend_cy)
event.wait(timeout=2) event.wait(timeout=2)
@@ -240,7 +222,7 @@ class ScriptManager():
break break
event.wait(timeout=1) event.wait(timeout=1)
ControlUtils.swipe_up(udid) ControlUtils.swipe_up(client)
# 如果is_monitoring 为False 则说明和监控消息没有联动,反正 则证明和监控消息进行联动 # 如果is_monitoring 为False 则说明和监控消息没有联动,反正 则证明和监控消息进行联动
if is_monitoring: if is_monitoring:
@@ -266,26 +248,27 @@ class ScriptManager():
continue continue
except Exception as e: except Exception as e:
print("刷视频脚本有错误:错误内容:",e)
LogManager.method_error(f"刷视频过程出现错误,重试", "养号", udid) LogManager.method_error(f"刷视频过程出现错误,重试", "养号", udid)
raise e # 抛出给上层,触发重生机制 raise e # 抛出给上层,触发重生机制
except Exception as e: except Exception as e:
print("刷视频遇到错误了。错误内容:",e)
LogManager.method_error(f"[{udid}] 养号出现异常,将重启流程: {e}", "养号", udid) LogManager.method_error(f"[{udid}] 养号出现异常,将重启流程: {e}", "养号", udid)
event.wait(timeout=3) event.wait(timeout=3)
# 观看直播 # 观看直播
def watchLiveForGrowth(self, udid, event, max_retries=None): def watchLiveForGrowth(self, udid, event, max_retries=None):
import time, random, wda import random, wda
retry_count = 0 retry_count = 0
backoff_sec = 10 # 异常后冷却,避免频繁重启 backoff_sec = 10
# —— 每次重启都新建 client/session ——
client = wda.USBClient(udid, ev.wdaFunctionPort)
session = client.session()
while not event.is_set(): while not event.is_set():
try: try:
# —— 每次重启都新建 client/session ——
client = wda.USBClient(udid, ev.wdaFunctionPort)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 15})
# 1) 先关再开 # 1) 先关再开
ControlUtils.closeTikTok(session, udid) ControlUtils.closeTikTok(session, udid)
@@ -293,6 +276,7 @@ class ScriptManager():
ControlUtils.openTikTok(session, udid) ControlUtils.openTikTok(session, udid)
event.wait(timeout=3) event.wait(timeout=3)
session.appium_settings({"snapshotMaxDepth": 15})
# 2) 进入直播 (使用英文) # 2) 进入直播 (使用英文)
live_button = session( live_button = session(
xpath='//XCUIElementTypeButton[@name="LIVE" or @label="LIVE" or @name="直播" or @label="直播"] ' xpath='//XCUIElementTypeButton[@name="LIVE" or @label="LIVE" or @name="直播" or @label="直播"] '
@@ -321,12 +305,11 @@ class ScriptManager():
continue continue
# 下滑一下 # 下滑一下
ControlUtils.swipe_up(udid) ControlUtils.swipe_up(client)
# 3) 取分辨率;可选重建 session 规避句柄陈旧 # 3) 取分辨率;可选重建 session 规避句柄陈旧
size = session.window_size() size = session.window_size()
width, height = size.width, size.height width, height = size.width, size.height
session = client.session()
# 4) 主循环:刷直播 # 4) 主循环:刷直播
while not event.is_set(): while not event.is_set():
@@ -349,7 +332,7 @@ class ScriptManager():
if session(xpath='//XCUIElementTypeOther[@name="kGBLInteractionViewMatchScoreBar"]').exists: if session(xpath='//XCUIElementTypeOther[@name="kGBLInteractionViewMatchScoreBar"]').exists:
print("✅ 当前是 PK跳过") print("✅ 当前是 PK跳过")
LogManager.method_info("✅ 当前是 PK跳过", "直播养号", udid=udid) LogManager.method_info("✅ 当前是 PK跳过", "直播养号", udid=udid)
ControlUtils.swipe_up(udid) ControlUtils.swipe_up(client)
continue continue
# 计算直播显示窗口数量(主画面+连麦小窗) # 计算直播显示窗口数量(主画面+连麦小窗)
@@ -359,7 +342,7 @@ class ScriptManager():
if count > 1: if count > 1:
print("❌ 多窗口(有人连麦/分屏),划走") print("❌ 多窗口(有人连麦/分屏),划走")
LogManager.method_info("❌ 多窗口(有人连麦/分屏),划走", "直播养号", udid=udid) LogManager.method_info("❌ 多窗口(有人连麦/分屏),划走", "直播养号", udid=udid)
ControlUtils.swipe_up(udid) ControlUtils.swipe_up(client)
continue continue
else: else:
print("✅ 单窗口,(10%概率)开始点赞") print("✅ 单窗口,(10%概率)开始点赞")
@@ -375,7 +358,7 @@ class ScriptManager():
or AiUtils.count_add_by_xml(session) > 1: or AiUtils.count_add_by_xml(session) > 1:
print("❗ 中途发生 PK/连麦,跳过") print("❗ 中途发生 PK/连麦,跳过")
LogManager.method_info("❗ 中途发生 PK/连麦,跳过", "直播养号", udid=udid) LogManager.method_info("❗ 中途发生 PK/连麦,跳过", "直播养号", udid=udid)
ControlUtils.swipe_up(udid) ControlUtils.swipe_up(client)
break break
x = width // 3 + random.randint(-10, 10) x = width // 3 + random.randint(-10, 10)
y = height // 3 + random.randint(10, 20) y = height // 3 + random.randint(10, 20)
@@ -391,7 +374,7 @@ class ScriptManager():
break break
event.wait(timeout=1) event.wait(timeout=1)
ControlUtils.swipe_up(udid) ControlUtils.swipe_up(client)
# 正常退出(外部 event 触发) # 正常退出(外部 event 触发)
break break
@@ -614,13 +597,11 @@ class ScriptManager():
event.wait(timeout=2) event.wait(timeout=2)
if count != 0: if count != 0:
ControlUtils.swipe_up(udid) ControlUtils.swipe_up(client)
# 右滑返回 # 右滑返回
# client.swipe_right() # client.swipe_right()
session.appium_settings({"snapshotMaxDepth": 12}) session.appium_settings({"snapshotMaxDepth": 12})
source = session.source()
print(f"fff“{source}")
back_btn = ControlUtils.clickBack(session) back_btn = ControlUtils.clickBack(session)
@@ -790,9 +771,6 @@ class ScriptManager():
ControlUtils.openTikTok(session, udid) ControlUtils.openTikTok(session, udid)
event.wait(timeout=3) event.wait(timeout=3)
print("重新创建wda会话 防止wda会话失效")
client = wda.USBClient(udid, ev.wdaFunctionPort)
session = client.session()
# 执行完成之后。继续点击搜索 # 执行完成之后。继续点击搜索
session.appium_settings({"snapshotMaxDepth": 15}) session.appium_settings({"snapshotMaxDepth": 15})
else: else:
@@ -1068,8 +1046,6 @@ class ScriptManager():
event.wait(timeout=3) event.wait(timeout=3)
print("重新创建wda会话 防止wda会话失效") print("重新创建wda会话 防止wda会话失效")
client = wda.USBClient(udid, ev.wdaFunctionPort)
session = client.session()
# 执行完成之后。继续点击搜索 # 执行完成之后。继续点击搜索
session.appium_settings({"snapshotMaxDepth": 15}) session.appium_settings({"snapshotMaxDepth": 15})
else: else:
@@ -1102,13 +1078,8 @@ class ScriptManager():
self.monitorMessages(session, udid, event) self.monitorMessages(session, udid, event)
except Exception as e: except Exception as e:
LogManager.method_error(f"监控消息 出现异常: {e},重新启动监控直播", "检测消息", udid) LogManager.method_error(f"监控消息 出现异常: {e},重新启动监控直播", "检测消息", udid)
LogManager.method_info(f"出现异常时,稍等再重启 TikTok 并重试 异常是: {e}", "监控消息", udid) LogManager.method_info(f"出现异常时,稍等再重启 TikTok 并重试 异常是: {e}", "监控消息", udid)
LogManager.method_info(f"出现异常重新创建wda", "监控消息", udid) LogManager.method_info(f"出现异常重新创建wda", "监控消息", udid)
client = wda.USBClient(udid, ev.wdaFunctionPort)
session = client.session()
LogManager.method_info(f"重启 TikTok", "监控消息", udid) LogManager.method_info(f"重启 TikTok", "监控消息", udid)
# 出现异常时,稍等再重启 TikTok 并重试 # 出现异常时,稍等再重启 TikTok 并重试
ControlUtils.closeTikTok(session, udid) ControlUtils.closeTikTok(session, udid)
@@ -1150,9 +1121,7 @@ class ScriptManager():
event.wait(timeout=3) event.wait(timeout=3)
while True: while True:
print("循环开始") print("循环开始")
info_count = 0 info_count = 0
# 创建新的会话 # 创建新的会话
el = session.xpath( el = session.xpath(
'//XCUIElementTypeButton[@name="a11y_vo_inbox"]' '//XCUIElementTypeButton[@name="a11y_vo_inbox"]'
@@ -1165,7 +1134,6 @@ class ScriptManager():
print("el", el) print("el", el)
if not el.exists: if not el.exists:
LogManager.method_error(f"检测不到收件箱", "检测消息", udid) LogManager.method_error(f"检测不到收件箱", "检测消息", udid)
raise Exception("当前页面找不到收件箱,重启") raise Exception("当前页面找不到收件箱,重启")
# break # break
@@ -1518,9 +1486,6 @@ class ScriptManager():
cy = int((r.y + r.height / 2) if hasattr(r, "y") else (r[1] + r[3] / 2)) cy = int((r.y + r.height / 2) if hasattr(r, "y") else (r[1] + r[3] / 2))
session.double_tap(cx, cy) # 可能抛异常:方法不存在 session.double_tap(cx, cy) # 可能抛异常:方法不存在
LogManager.method_info(f"双击收件箱 定位到信息", "检测消息", udid) LogManager.method_info(f"双击收件箱 定位到信息", "检测消息", udid)
else: else:
return return
@@ -1554,19 +1519,20 @@ class ScriptManager():
# 切换账号 # 切换账号
def changeAccount(self, udid): def changeAccount(self, udid):
client = wda.USBClient(udid, ev.wdaFunctionPort)
session = client.session()
count = 0 count = 0
while count <= 5: while count <= 5:
try: try:
LogManager.method_info("开始进行切换账号", "切换账号", udid)
client = wda.USBClient(udid, ev.wdaFunctionPort)
session = client.session()
# 重启打开 # 重启打开
ControlUtils.closeTikTok(session, udid) ControlUtils.closeTikTok(session, udid)
time.sleep(1) time.sleep(1)
ControlUtils.openTikTok(session, udid) ControlUtils.openTikTok(session, udid)
LogManager.method_info("开始进行切换账号", "切换账号", udid)
LogManager.method_info("重启进行切换账号", "切换账号", udid) LogManager.method_info("重启进行切换账号", "切换账号", udid)
# 打开个人主页 # 打开个人主页