Compare commits

...

2 Commits

7 changed files with 148 additions and 68 deletions

View File

@@ -86,7 +86,7 @@ class AiUtils(object):
# 模板匹配
res = cv2.matchTemplate(image, template, cv2.TM_CCOEFF_NORMED)
threshold = 0.7
threshold = 0.85
loc = np.where(res >= threshold)
# 放在 cv2.matchTemplate 之前
cv2.imwrite(f'/tmp/runtime_bg_{udid}.png', image)
@@ -1394,4 +1394,14 @@ class AiUtils(object):
@classmethod
def _screen_info(cls, udid: str):
try:
# 避免 c.home() 可能触发的阻塞,直接取 window_size
c = wda.USBClient(udid, wdaFunctionPort)
size = c.window_size()
print(f"[Screen] 成功获取屏幕 {int(size.width)}x{int(size.height)} {udid}")
return int(size.width), int(size.height), float(c.scale)
except Exception as e:
print(f"[Screen] 获取屏幕信息异常: {e} {udid}")
return 0, 0, 0.0

View File

@@ -151,16 +151,31 @@ class ControlUtils(object):
@classmethod
def clickLike(cls, session: Client, udid):
try:
scale = session.scale
x, y = AiUtils.findImageInScreen("add", udid)
print(x, y)
if x > -1:
LogManager.method_info("点赞了", "关注打招呼", udid)
session.click(x // scale, y // scale + 50)
return True
from script.ScriptManager import ScriptManager
width, height, scale = ScriptManager.get_screen_info(udid)
if scale == 3.0:
x, y = AiUtils.findImageInScreen("add", udid)
if x > -1:
LogManager.method_info(f"点赞了,点赞的坐标是:{x // scale, y // scale + 50}", "关注打招呼", udid)
session.click(int(x // scale), int(y // scale + 50))
return True
else:
LogManager.method_info("没有找到目标", "关注打招呼", udid)
return False
else:
LogManager.method_info("没有找到目标", "关注打招呼", udid)
return False
x, y = AiUtils.findImageInScreen("like1", udid)
if x > -1:
LogManager.method_info(f"点赞了,点赞的坐标是:{x // scale, y // scale}", "关注打招呼", udid)
session.click(int(x // scale), int(y // scale))
return True
else:
LogManager.method_info("没有找到目标", "关注打招呼", udid)
return False
except Exception as e:
LogManager.method_info(f"点赞出现异常,异常的原因:{e}", "关注打招呼", udid)
raise False
@@ -191,8 +206,6 @@ class ControlUtils(object):
# 获取主播详情页的第一个视频
@classmethod
def clickFirstVideoFromDetailPage(cls, session: Client):
# videoCell = session.xpath(
# '//Window/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[1]/Other[2]/Other[1]/ScrollView[1]/Other[1]/CollectionView[1]/Cell[2]')
videoCell = session.xpath(
'(//XCUIElementTypeCollectionView//XCUIElementTypeCell[.//XCUIElementTypeImage[@name="profile_video"]])[1]')

View File

@@ -1,3 +1,5 @@
import os
import cv2
import numpy as np
from typing import List, Tuple, Union, Optional
@@ -162,6 +164,15 @@ class OCRUtils:
centers: [(cx, cy), ...]
boxes: [[x1,y1,x2,y2], ...] (np.ndarray, int)
"""
if not os.path.isfile(template_path):
print(f"模板文件不存在 → {template_path}")
raise FileNotFoundError(f"模板文件不存在 → {template_path}")
size = os.path.getsize(template_path)
if size == 0:
print(f"模板文件大小为 0 → {template_path} ")
raise ValueError(f"模板文件大小为 0 → {template_path}")
# 模板(灰度)
template = cv2.imread(template_path, cv2.IMREAD_GRAYSCALE)
if template is None:
@@ -230,4 +241,4 @@ class OCRUtils:
# 全部尝试失败
if return_boxes:
return last_centers, last_boxes
return last_centers
return last_centers

BIN
resources/comment2.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.5 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.5 KiB

BIN
resources/like1.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.2 KiB

View File

@@ -32,6 +32,17 @@ class ScriptManager():
# cls._instance = super(ScriptManager, cls).__new__(cls)
# # 返回已存在的实例
# return cls._instance
_device_cache = {}
_cache_lock = threading.Lock() # 线程安全锁(可选,如果你有多线程)
@classmethod
def get_screen_info(cls, udid: str):
# 如果缓存中没有该设备的信息,则获取并缓存
if udid not in cls._device_cache:
with cls._cache_lock: # 防止并发写入
if udid not in cls._device_cache: # 双重检查
cls._device_cache[udid] = AiUtils._screen_info(udid)
return cls._device_cache[udid]
def __init__(self):
super().__init__()
@@ -40,87 +51,100 @@ class ScriptManager():
current_dir = Path(__file__).resolve().parent
# 项目根目录(假设你的类文件在项目的子目录里,比如 Module/OCR/OCRUtils.py
self.lock = threading.Lock()
project_root = current_dir.parent # 如果你确定这个文件就在项目根目录下,可省略这行
# resources 文件夹路径
# 获取相应的模板的地址
self.resources_dir = project_root / "resources"
# === 2. @2x 素材 (scale=2) ===
self.comment_dir2 = self.resources_dir / "comment2.png"
self.comment_add_dir_2x = self.resources_dir / "insert_comment2x.png"
self.comment_add_dir2_2x = self.resources_dir / "insert_comment2x.png"
# === 3. @3x 素材 (scale=3) ===
self.comment_dir = self.resources_dir / "comment.png"
self.comment_add_dir = self.resources_dir / "insert_comment.png"
self.comment_add_dir2 = self.resources_dir / "insert_comment2.png"
self.initialized = True # 标记已初始化
def _pick_template(self, scale: float):
"""
scale≈3 -> 返回 @3x 素材,除 3
其余默认 @2x 素材,除 2
"""
if abs(scale - 3.0) < 0.3: # 3x
return (self.comment_dir,
self.comment_add_dir,
self.comment_add_dir2, # 3x 兜底
3) # 除数
else: # 2x默认
return (self.comment_dir2,
self.comment_add_dir_2x,
self.comment_add_dir2_2x, # 2x 兜底
2) # 除数
def comment_flow(self, filePath, session, udid, recomend_cx, recomend_cy):
"""评论一条龙:点评论框->输入->发送->返回"""
width, height, scale = self.get_screen_info(udid)
# 取当前分辨率的三张图 + 除数
comment_tpl, add_tpl, add_fb_tpl, div = self._pick_template(scale)
# ① 点评论按钮
coord = OCRUtils.find_template(str(comment_tpl), filePath)
LogManager.method_info(f"使用的模板路径是:{str(comment_tpl)}", "养号", udid)
coord = OCRUtils.find_template(str(self.comment_dir), filePath)
if not coord:
return # 没检测到评论按钮就拉倒
print("无法检测到评论按钮")
LogManager.method_info("无法检测到评论按钮", "养号", udid)
return
cx, cy = coord[0] # ✅ 注意这里取第一个点
session.click(int(cx / 3), int(cy / 3))
LogManager.method_info(f"点击评论的坐标:{int(cx / 3)}, {int(cy / 3)}", "养号", udid)
cx, cy = coord[0]
session.click(int(cx / div), int(cy / div))
LogManager.method_info(f"点击评论坐标:{int(cx / div)}, {int(cy / div)}", "养号", udid)
time.sleep(2)
# 截图二判(防止键盘弹出后坐标变化
# ② 重新截图(防止键盘弹起
img = session.screenshot()
time.sleep(2)
filePath = os.path.join(os.path.dirname(filePath), "bgv_comment.png")
img.save(filePath)
# 从评论列表中随机取出一条数据,进行评论
if Variables.commentList:
single_comment = random.choice(Variables.commentList)
else:
single_comment = "评论没有导入数据"
coord2 = OCRUtils.find_template(str(self.comment_add_dir), filePath)
# ③ 随机评论
single_comment = random.choice(Variables.commentList) if Variables.commentList else "评论没有导入数据"
# ④ 找「添加评论」按钮
coord2 = OCRUtils.find_template(str(add_tpl), filePath)
click_count = False
if coord2: # 二判命中
LogManager.method_info(f"方案1", "养号", udid)
if coord2: # 方案 1 命中
cx2, cy2 = coord2[0]
session.tap(int(cx2 / 3), int(cy2 / 3))
LogManager.method_info(f"点击添加评论的坐标:{int(cx2 / 3)}, {int(cy2 / 3)}", "养号", udid)
session.tap(int(cx2 / div), int(cy2 / div))
session.send_keys(f"{single_comment}\n")
time.sleep(2)
LogManager.method_info("评论成功", "养号", udid)
click_count = True
else:
LogManager.method_info("评论成功方案1", "养号", udid)
else: # 方案 2 兜底
time.sleep(1)
LogManager.method_info(f"方案2", "养号", udid)
img = session.screenshot()
filePath = os.path.join(os.path.dirname(filePath), "bgv_comment.png")
img.save(filePath)
coord3 = OCRUtils.find_template(str(self.comment_add_dir2), filePath)
if coord3: # 二判命中
coord3 = OCRUtils.find_template(str(add_fb_tpl), filePath)
if coord3:
cx3, cy3 = coord3[0]
session.tap(int(cx3 / 3), int(cy3 / 3))
session.tap(int(cx3 / div), int(cy3 / div))
session.send_keys(f"{single_comment}\n")
time.sleep(2)
LogManager.method_info("评论成功", "养号", udid)
click_count = True
LogManager.method_info("评论成功方案2", "养号", udid)
# 点返回/取消按钮:优先用推荐按钮坐标,没有就兜底 100,100
tap_x = int(recomend_cx) if recomend_cx else 100
tap_y = int(recomend_cy) if recomend_cy else 100
if click_count:
# ⑤ 返回 / 取消
# tap_x = int(recomend_cx) if recomend_cx else 100
# tap_y = int(recomend_cy) if recomend_cy else 100
print("点击一次")
LogManager.method_info("点击一次", "养号", udid)
time.sleep(1)
session.tap(tap_x, tap_y)
else:
print("点击两次")
LogManager.method_info("点击两次", "养号", udid)
session.tap(100, 100)
session.tap(tap_x, tap_y)
if not click_count: # 兜底多点一次
time.sleep(1)
session.tap(tap_x, tap_y)
session.tap(100, 100)
# 养号
def growAccount(self, udid, isComment, event, is_monitoring=False):
@@ -186,10 +210,19 @@ class ScriptManager():
# ---- 视频逻辑 ----
try:
addX, addY = AiUtils.findImageInScreen("add", udid)
width, height, scale = self.get_screen_info(udid)
if scale == 3.0:
addX, addY = AiUtils.findImageInScreen("add", udid)
else:
addX, addY = AiUtils.findImageInScreen("like1", udid)
isSame = False
for i in range(2):
tx, ty = AiUtils.findImageInScreen("add", udid)
if scale == 3.0:
tx, ty = AiUtils.findImageInScreen("add", udid)
else:
tx, ty = AiUtils.findImageInScreen("like1", udid)
if addX == tx and addY == ty:
isSame = True
event.wait(timeout=1)
@@ -214,7 +247,7 @@ class ScriptManager():
# 重置 session
session.appium_settings({"snapshotMaxDepth": 0})
if needLike < 23:
if needLike < 25:
LogManager.method_info("进行点赞", "养号", udid)
ControlUtils.clickLike(session, udid)
LogManager.method_info("继续观看视频", "养号", udid)
@@ -631,6 +664,7 @@ class ScriptManager():
LogManager.method_info(f"是否进行评论:{isComment}", "关注打招呼", udid)
# 使用OCR进行评论
if isComment:
LogManager.method_info("调用方法进行评论", "关注打招呼", udid)
self.comment_flow(filePath, session, udid, 100, 100)
event.wait(timeout=2)
@@ -1590,9 +1624,16 @@ class ScriptManager():
)
# 活动
# xp_activity_badge = (
# "//XCUIElementTypeCell[.//XCUIElementTypeLink[@name='活动']]"
# "//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
# )
xp_activity_badge = (
"//XCUIElementTypeCell[.//XCUIElementTypeLink[@name='活动']]"
"//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
"//XCUIElementTypeLink[@name='活动']/ancestor::XCUIElementTypeCell[1]"
"//XCUIElementTypeStaticText["
" @value and "
" (translate(@value,'0123456789','')='' or @value='99+')"
"]"
)
# 系统通知
@@ -1931,11 +1972,12 @@ class ScriptManager():
client = wda.USBClient(udid, ev.wdaFunctionPort)
session = client.session()
width, height, scale = self.get_screen_info(udid)
count = 0
while count <= 5:
try:
# 重启打开
ControlUtils.closeTikTok(session, udid)
time.sleep(1)
@@ -2079,8 +2121,12 @@ class ScriptManager():
# 随机偏移(增强拟人)
num = random.randint(-10, 10)
# 分辨率/坐标映射(按你设备比例;你原来是 /3
tap_x = int((center_x + num) / 3)
tap_y = int((center_y + num) / 3)
if scale == 3.0:
tap_x = int((center_x + num) / 3)
tap_y = int((center_y + num) / 3)
else:
tap_x = int((center_x + num) / 2)
tap_y = int((center_y + num) / 2)
LogManager.method_info(f"点击坐标: ({tap_x}, {tap_y}),账号: {target_account}", "切换账号", udid)
session.tap(tap_x, tap_y)