合并代码

This commit is contained in:
2025-10-23 19:55:58 +08:00
parent 4966a659aa
commit 26057d4afa
24 changed files with 342 additions and 4489 deletions

4352
.idea/workspace.xml generated

File diff suppressed because it is too large Load Diff

View File

@@ -246,7 +246,7 @@ def longPressAction():
def growAccount():
body = request.get_json()
udid = body.get("udid")
# Variables.commentList = body.get("comment")
Variables.commentList = body.get("comment")
manager = ScriptManager()
event = threading.Event()
@@ -292,7 +292,7 @@ def passAnchorData():
# 主播列表
acList = data.get("anchorList", [])
# Variables.commentList = data.get("comment")
Variables.commentList = data.get("comment")
LogManager.info(f"[INFO] 获取数据: {idList} {acList}")
@@ -662,17 +662,61 @@ def changeAccount():
@app.route('/test', methods=['POST'])
def test():
body = request.get_json()
import wda
import cv2
import numpy as np
manager = ScriptManager()
threading.Event()
# 设备的UDID
udid = "00008110-000120603C13801E"
# 启动脚本
manager.test()
# thread = threading.Thread(target=, args=(udid,))
# # 添加到线程管理
# thread.start()
return ResultData(data="", code=200, message="成功").toJson()
# 连接到设备
client = wda.USBClient(udid)
session = client.session()
# 设置Appium的截图深度
session.appium_settings({"snapshotMaxDepth": 15})
# 获取当前屏幕截图
screenshot = session.screenshot()
screenshot = cv2.imdecode(np.frombuffer(screenshot, np.uint8), cv2.IMREAD_COLOR)
# 读取大图和小图
large_image = screenshot # 这里使用截图作为大图
template = cv2.imread(r'E:\python\Scrcpy_test\open-cv-tk\insert_comment.png', 0) # 0 表示以灰度模式读取
# 检查图像是否成功加载
if template is None:
print("小图加载失败,请检查路径")
exit()
# 获取模板的宽度和高度
w, h = template.shape[::-1]
# 使用模板匹配方法
result = cv2.matchTemplate(large_image, template, cv2.TM_CCOEFF_NORMED)
# 设定阈值
threshold = 0.8
loc = np.where(result >= threshold)
# 遍历所有匹配点
if loc[0].size > 0: # 检查是否有匹配点
for pt in zip(*loc[::-1]): # 将坐标转换为 (x, y) 格式
cv2.rectangle(large_image, pt, (pt[0] + w, pt[1] + h), (0, 255, 0), 2)
print(f"找到匹配区域,坐标:{pt},尺寸:{(w, h)}")
else:
print("未找到匹配区域,请检查模板和大图的内容,或调整阈值")
# 保存结果
cv2.imwrite('matched_result.png', large_image)
# 显示结果
cv2.imshow('Matched Result', large_image)
cv2.waitKey(0)
cv2.destroyAllWindows()
# 关闭会话
session.close()
if __name__ == '__main__':

View File

@@ -26,7 +26,7 @@ def _force_utf8_everywhere():
except Exception:
pass
# _force_utf8_everywhere()
_force_utf8_everywhere()
class LogManager:
"""

233
Utils/OCRUtils.py Normal file
View File

@@ -0,0 +1,233 @@
import cv2
import numpy as np
from typing import List, Tuple, Union, Optional
from PIL import Image
ArrayLikeImage = Union[np.ndarray, str, Image.Image]
class OCRUtils:
@classmethod
def _to_gray(cls, img: ArrayLikeImage) -> np.ndarray:
"""
接受路径/np.ndarray/PIL.Image统一转为灰度 np.ndarray。
"""
# 路径
if isinstance(img, str):
arr = cv2.imread(img, cv2.IMREAD_GRAYSCALE)
if arr is None:
raise FileNotFoundError(f"图像加载失败,请检查路径: {img}")
return arr
# PIL.Image
if isinstance(img, Image.Image):
return cv2.cvtColor(np.array(img.convert("RGB")), cv2.COLOR_RGB2GRAY)
# numpy 数组
if isinstance(img, np.ndarray):
if img.ndim == 2:
return img # 已是灰度
if img.ndim == 3:
return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
raise ValueError("不支持的图像维度(期望 2D 灰度或 3D BGR/RGB")
raise TypeError("large_image 类型必须是 str / np.ndarray / PIL.Image.Image")
@classmethod
def non_max_suppression(
cls,
boxes: List[List[float]],
scores: Optional[np.ndarray] = None,
overlapThresh: float = 0.5
) -> np.ndarray:
"""
boxes: [ [x1,y1,x2,y2], ... ]
scores: 每个框的置信度(用于“按分数做 NMS”。若为 None则退化为按 y2 排序的经典近似。
返回: 经过 NMS 保留的 boxes(int) ndarray形状 (N,4)
"""
if len(boxes) == 0:
return np.empty((0, 4), dtype=int)
boxes = np.asarray(boxes, dtype=np.float32)
x1, y1, x2, y2 = boxes.T
areas = (x2 - x1 + 1) * (y2 - y1 + 1)
if scores is None:
order = np.argsort(y2) # 经典写法
else:
scores = np.asarray(scores, dtype=np.float32)
order = np.argsort(scores)[::-1] # 分数从高到低
keep = []
while order.size > 0:
i = order[0] if scores is not None else order[-1]
keep.append(i)
rest = order[1:] if scores is not None else order[:-1]
xx1 = np.maximum(x1[i], x1[rest])
yy1 = np.maximum(y1[i], y1[rest])
xx2 = np.minimum(x2[i], x2[rest])
yy2 = np.minimum(y2[i], y2[rest])
w = np.maximum(0, xx2 - xx1 + 1)
h = np.maximum(0, yy2 - yy1 + 1)
inter = w * h
ovr = inter / areas[rest]
inds = np.where(ovr <= overlapThresh)[0]
order = rest[inds]
return boxes[keep].astype(int)
# @classmethod
# def find_template(
# cls,
# template_path: str,
# large_image: ArrayLikeImage,
# threshold: float = 0.8,
# overlapThresh: float = 0.5,
# return_boxes: bool = False
# ) -> Union[List[Tuple[int, int]], Tuple[List[Tuple[int, int]], np.ndarray]]:
# """
# 在 large_image 中查找 template_path 模板的位置。
# - large_image 可为文件路径、np.ndarray 或 PIL.Image
# - threshold: 模板匹配阈值TM_CCOEFF_NORMED
# - overlapThresh: NMS 重叠阈值
# - return_boxes: True 时同时返回保留的框数组 (N,4)
#
# 返回:
# centers 或 (centers, boxes)
# centers: [(cx, cy), ...]
# boxes: [[x1,y1,x2,y2], ...] (np.ndarray, int)
# """
# # 模板(灰度)
# template = cv2.imread(template_path, cv2.IMREAD_GRAYSCALE)
# if template is None:
# raise FileNotFoundError(f"模板图像加载失败,请检查路径: {template_path}")
#
# # 大图(灰度)
# gray = cls._to_gray(large_image)
#
# # 模板尺寸
# tw, th = template.shape[::-1]
#
# # 模板匹配(相关系数归一化)
# result = cv2.matchTemplate(gray, template, cv2.TM_CCOEFF_NORMED)
#
# # 阈值筛选
# ys, xs = np.where(result >= threshold)
# if len(xs) == 0:
# return ([], np.empty((0, 4), dtype=int)) if return_boxes else []
#
# # 收集候选框与分数
# boxes = []
# scores = []
# for (x, y) in zip(xs, ys):
# boxes.append([x, y, x + tw, y + th])
# scores.append(result[y, x])
#
# # 按分数做 NMS
# boxes_nms = cls.non_max_suppression(boxes, scores=np.array(scores), overlapThresh=overlapThresh)
#
# # 计算中心点
# centers = [((x1 + x2) // 2, (y1 + y2) // 2) for (x1, y1, x2, y2) in boxes_nms]
#
#
#
# if return_boxes:
# return centers, boxes_nms
#
#
# return centers
@classmethod
def find_template(
cls,
template_path: str,
large_image: ArrayLikeImage,
threshold: float = 0.8,
overlapThresh: float = 0.5,
return_boxes: bool = False
) -> Union[List[Tuple[int, int]], Tuple[List[Tuple[int, int]], np.ndarray]]:
"""
在 large_image 中查找 template_path 模板的位置。
- large_image 可为文件路径、np.ndarray 或 PIL.Image
- threshold: 模板匹配阈值TM_CCOEFF_NORMED
- overlapThresh: NMS 重叠阈值
- return_boxes: True 时同时返回保留的框数组 (N,4)
若检测结果为空,则在相同阈值下最多重试三次(共 3 次尝试)。
返回:
centers 或 (centers, boxes)
centers: [(cx, cy), ...]
boxes: [[x1,y1,x2,y2], ...] (np.ndarray, int)
"""
# 模板(灰度)
template = cv2.imread(template_path, cv2.IMREAD_GRAYSCALE)
if template is None:
raise FileNotFoundError(f"模板图像加载失败,请检查路径: {template_path}")
# 大图(灰度)
gray = cls._to_gray(large_image)
# 模板尺寸
tw, th = template.shape[::-1]
# 内部:执行一次匹配并返回 (centers, boxes_nms)
def _match_once(cur_threshold: float):
# 模板匹配(相关系数归一化)
result = cv2.matchTemplate(gray, template, cv2.TM_CCOEFF_NORMED)
# 阈值筛选
ys, xs = np.where(result >= cur_threshold)
if len(xs) == 0:
return [], np.empty((0, 4), dtype=int)
# 收集候选框与分数
boxes = []
scores = []
for (x, y) in zip(xs, ys):
boxes.append([int(x), int(y), int(x + tw), int(y + th)])
scores.append(float(result[y, x]))
# 按分数做 NMS
boxes_nms = cls.non_max_suppression(
boxes,
scores=np.asarray(scores, dtype=np.float32),
overlapThresh=overlapThresh
)
# 计算中心点(转为 Python int
centers = [(int((x1 + x2) // 2), int((y1 + y2) // 2))
for (x1, y1, x2, y2) in boxes_nms]
# 统一为 np.ndarray[int]
boxes_nms = np.asarray(boxes_nms, dtype=int)
return centers, boxes_nms
# ===== 重试控制(最多 3 次)=====
MAX_RETRIES = 3
THRESHOLD_DECAY = 0.0 # 如需越试越宽松,可改为 0.02~0.05;不需要则保持 0
MIN_THRESHOLD = 0.6
cur_threshold = float(threshold)
last_centers, last_boxes = [], np.empty((0, 4), dtype=int)
for attempt in range(MAX_RETRIES):
centers, boxes_nms = _match_once(cur_threshold)
if centers:
if return_boxes:
return centers, boxes_nms
return centers
# 记录最后一次(若最终失败按规范返回空)
last_centers, last_boxes = centers, boxes_nms
# 为下一次尝试准备(这里默认不衰减阈值;如需可打开 THRESHOLD_DECAY
if attempt < MAX_RETRIES - 1 and THRESHOLD_DECAY > 0.0:
cur_threshold = max(MIN_THRESHOLD, cur_threshold - THRESHOLD_DECAY)
# 全部尝试失败
if return_boxes:
return last_centers, last_boxes
return last_centers

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.2 KiB

View File

@@ -1,26 +1,20 @@
import atexit
import random
import re
import subprocess
import threading
import time
from datetime import datetime
from enum import Enum
from pathlib import Path
import wda
import os
# from ultralytics import YOLO
from datetime import datetime
from Entity import Variables
from Utils.AiUtils import AiUtils
from Utils.ControlUtils import ControlUtils
from Utils.IOSAIStorage import IOSAIStorage
from Utils.JsonUtils import JsonUtils
from Utils.LogManager import LogManager
from Entity.Variables import anchorList, removeModelFromAnchorList, anchorWithSession
# from Utils.OCRUtils import OCRUtils
from Entity.Variables import anchorList
from Utils.OCRUtils import OCRUtils
from Utils.Requester import Requester
import Entity.Variables as ev
from Utils.TencentOCRUtils import TencentOCR
@@ -40,38 +34,38 @@ class ScriptManager():
def __init__(self):
super().__init__()
# weight_path = Path(__file__).resolve().parent.parent / "best.pt"
# self.model = YOLO(weight_path) # 只传一个参数
# 初始化获取模版所在的地址
current_dir = Path(__file__).resolve().parent
# 项目根目录(假设你的类文件在项目的子目录里,比如 Module/OCR/OCRUtils.py
project_root = current_dir.parent # 如果你确定这个文件就在项目根目录下,可省略这行
# resources 文件夹路径
# 获取相应的模板的地址
self.resources_dir = project_root / "resources"
self.comment_dir = self.resources_dir / "comment.png"
self.comment_add_dir = self.resources_dir / "insert_comment.png"
self.initialized = True # 标记已初始化
# 检测yolo识别的坐标
def get_comment_coords(self, file_path, imgsz=960, conf=0.01):
"""
用 YOLOv8 检测评论区域,返回第一个框的中心坐标 (cx, cy)
未检测到返回 None
"""
results = self.model.predict(source=file_path, imgsz=imgsz, conf=conf)
for r in results:
if len(r.boxes) == 0:
continue
for box in r.boxes:
xy = box.xyxy[0][:4].cpu().numpy()
cx, cy = (xy[0] + xy[2]) / 2, (xy[1] + xy[3]) / 2
return int(cx), int(cy)
return None
# ========= 评论逻辑 =========
def comment_flow(self, filePath, session, udid, recomend_cx, recomend_cy):
"""评论一条龙:点评论框->输入->发送->返回"""
coord = self.get_comment_coords(filePath)
coord = OCRUtils.find_template(str(self.comment_dir), filePath)
if not coord:
return # 没检测到评论按钮就拉倒
print(11111111111)
cx, cy = coord
session.tap(int(cx / 3), int(cy / 3))
cx, cy = coord[0] # ✅ 注意这里取第一个点
session.click(int(cx / 3), int(cy / 3))
print(f"点击评论的坐标:{int(cx / 3)}, {int(cy / 3)}")
time.sleep(2)
print(f"评论的坐标:{cx}, {cy}")
# 截图二判(防止键盘弹出后坐标变化)
img = session.screenshot()
time.sleep(2)
@@ -81,13 +75,16 @@ class ScriptManager():
# 从评论列表中随机取出一条数据,进行评论
single_comment = random.choice(Variables.commentList)
coord2 = self.get_comment_coords(filePath)
print(single_comment)
coord2 = OCRUtils.find_template(str(self.comment_add_dir), filePath)
# print(single_comment)
if coord2: # 二判命中
cx2, cy2 = coord2
cx2, cy2 = coord2[0]
print(f"添加评论:{cx2, cy2}")
session.tap(int(cx2 / 3), int(cy2 / 3))
print(f"点击添加评论的坐标:{int(cx2 / 3)}, {int(cy2 / 3)}")
session.send_keys(f"{single_comment}\n")
# session.send_keys(f"hello\n")
time.sleep(2)
LogManager.method_info("评论成功", "养号", udid)
@@ -101,12 +98,10 @@ class ScriptManager():
print("调用刷视频")
while not event.is_set():
print(11111111111111111)
try:
# ========= 初始化 =========
client = wda.USBClient(udid, ev.wdaFunctionPort)
session = client.session()
print(2222222222222222)
# 关闭并重新打开 TikTok
if not is_monitoring:
ControlUtils.closeTikTok(session, udid)
@@ -114,8 +109,6 @@ class ScriptManager():
ControlUtils.openTikTok(session, udid)
event.wait(timeout=3)
LogManager.method_info("养号重启tiktok", "养号", udid)
AiUtils.makeUdidDir(udid)
@@ -136,7 +129,6 @@ class ScriptManager():
recomend_cx = bounds[0] + bounds[2] // 2
recomend_cy = bounds[1] + bounds[3] // 2
if not el.exists:
# 记录日志
LogManager.method_error("找不到推荐按钮,养号出现问题,重启养号功能", "养号", udid=udid)
@@ -206,7 +198,7 @@ class ScriptManager():
LogManager.method_info("继续观看视频", "养号", udid)
LogManager.method_info("准备划到下一个视频", "养号", udid)
ControlUtils.swipe_up(udid)
# ControlUtils.swipe_up(udid)
else:
LogManager.method_error("找不到首页按钮。出错了", "养号", udid)
# else:
@@ -217,11 +209,9 @@ class ScriptManager():
# event.wait(timeout=1)
# client.swipe_up()
# 评论
# 使用训练好的best.pt(yolo v8模型)进行识别评论区域
# if random.random() > 0.70:
# self.comment_flow(filePath, session, udid, recomend_cx, recomend_cy)
if random.random() > 0.70:
self.comment_flow(filePath, session, udid, recomend_cx, recomend_cy)
videoTime = random.randint(15, 30)
for _ in range(videoTime):
@@ -272,7 +262,7 @@ class ScriptManager():
while not event.is_set():
try:
# —— 每次重启都新建 client/session ——
client = wda.USBClient(udid,ev.wdaFunctionPort)
client = wda.USBClient(udid, ev.wdaFunctionPort)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 15})
@@ -390,7 +380,7 @@ class ScriptManager():
LogManager.method_error(f"watchLiveForGrowth 异常(第{retry_count}次):{repr(e)}", "直播养号", udid)
# 尝试轻量恢复一次,避免一些短暂性 session 失效
try:
client = wda.USBClient(udid,ev.wdaFunctionPort)
client = wda.USBClient(udid, ev.wdaFunctionPort)
_ = client.session()
except Exception:
pass
@@ -426,7 +416,7 @@ class ScriptManager():
# 关注打招呼以及回复主播消息
def greetNewFollowers(self, udid, needReply, needTranslate, event):
client = wda.USBClient(udid,ev.wdaFunctionPort)
client = wda.USBClient(udid, ev.wdaFunctionPort)
session = client.session()
print(f"是否要自动回复消息:{needReply}")
@@ -594,9 +584,8 @@ class ScriptManager():
event.wait(timeout=1)
LogManager.method_info("停止脚本成功", method="task")
# 使用yolo v8模型进行评论
# self.comment_flow(filePath, session, udid, 100, 100)
self.comment_flow(filePath, session, udid, 100, 100)
if count != 0:
ControlUtils.swipe_up(udid)
@@ -735,7 +724,7 @@ class ScriptManager():
event.wait(timeout=3)
print("重新创建wda会话 防止wda会话失效")
client = wda.USBClient(udid,ev.wdaFunctionPort)
client = wda.USBClient(udid, ev.wdaFunctionPort)
session = client.session()
# 执行完成之后。继续点击搜索
session.appium_settings({"snapshotMaxDepth": 15})
@@ -764,7 +753,7 @@ class ScriptManager():
# 关注打招呼以及回复主播消息(联盟号)
def followAndGreetUnion(self, udid, needReply, needTranslate, event):
client = wda.USBClient(udid,ev.wdaFunctionPort)
client = wda.USBClient(udid, ev.wdaFunctionPort)
session = client.session()
print(f"是否要自动回复消息:{needReply}")
@@ -994,7 +983,7 @@ class ScriptManager():
event.wait(timeout=3)
print("重新创建wda会话 防止wda会话失效")
client = wda.USBClient(udid,ev.wdaFunctionPort)
client = wda.USBClient(udid, ev.wdaFunctionPort)
session = client.session()
# 执行完成之后。继续点击搜索
session.appium_settings({"snapshotMaxDepth": 15})
@@ -1007,7 +996,7 @@ class ScriptManager():
def replyMessages(self, udid, event):
try:
client = wda.USBClient(udid,ev.wdaFunctionPort)
client = wda.USBClient(udid, ev.wdaFunctionPort)
session = client.session()
except Exception as e:
LogManager.method_error(f"创建wda会话异常: {e}", "检测消息", udid)
@@ -1032,7 +1021,7 @@ class ScriptManager():
LogManager.method_info(f"出现异常时,稍等再重启 TikTok 并重试 异常是: {e}", "监控消息", udid)
LogManager.method_info(f"出现异常重新创建wda", "监控消息", udid)
client = wda.USBClient(udid,ev.wdaFunctionPort)
client = wda.USBClient(udid, ev.wdaFunctionPort)
session = client.session()
LogManager.method_info(f"重启 TikTok", "监控消息", udid)
@@ -1077,7 +1066,6 @@ class ScriptManager():
while True:
print("循环开始")
info_count = 0
# 创建新的会话
@@ -1248,9 +1236,6 @@ class ScriptManager():
break
time.sleep(1)
LogManager.method_info(f"获取主播的名称:{anchor_name}", "检测消息", udid)
LogManager.method_info(f"获取主播最后发送的消息 即将翻译:{last_in}", "检测消息", udid)
chinese_last_msg_text = Requester.translationToChinese(last_in)
@@ -1272,8 +1257,6 @@ class ScriptManager():
LogManager.method_info(f"主播最后发送的数据,传递给前端进行记录:{chinese_last_msg_text}",
"检测消息", udid)
# 把主播的名称存储到c盘
JsonUtils.append_json_items(last_data, "log/last_message.json")
@@ -1470,7 +1453,7 @@ class ScriptManager():
try:
LogManager.method_info("开始进行切换账号", "切换账号", udid)
client = wda.USBClient(udid,ev.wdaFunctionPort)
client = wda.USBClient(udid, ev.wdaFunctionPort)
session = client.session()
# 重启打开

View File

@@ -1,38 +0,0 @@
# windows_run.py替换你起 iproxy 的那几行)
import os, subprocess, time, requests, wda
from pathlib import Path
UDID = "00008110-00067D0014D3B01E"
MAC = "http://192.168.1.219:8765"
# 让 Mac 起 WDA不转发
requests.post(f"{MAC}/startWDA", json={"udid": UDID}, timeout=600).raise_for_status()
# 计算 iproxy 绝对路径(项目根/resources/iproxy/iproxy.exe
BASE = Path(__file__).resolve().parents[1] # iOSAI/
IPROXY = BASE / "resources" / "iproxy" / "iproxy.exe"
if not IPROXY.exists():
raise FileNotFoundError(f"iproxy 不在这里: {IPROXY}")
# 可选:把 iproxy 目录加入 PATH避免 DLL 依赖找不到
env = os.environ.copy()
env["PATH"] = str(IPROXY.parent) + os.pathsep + env.get("PATH", "")
try:
os.add_dll_directory(str(IPROXY.parent)) # 仅 Windows 有效
except Exception:
pass
# 起 iproxy本地 9111 -> 设备 8100
p = subprocess.Popen([str(IPROXY), "-u", UDID, "9111", "8100"],
cwd=str(IPROXY.parent),
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
text=True, creationflags=0x08000000)
# 探活 WDA
c = wda.Client("http://127.0.0.1:9111")
for _ in range(120):
try:
print(c.status()); break
except:
time.sleep(1)

View File

@@ -1,35 +0,0 @@
from pathlib import Path
import os, subprocess, time, requests, wda
UDID = "00008110-00067D0014D3B01E"
MAC = "http://192.168.1.90:8765"
# 让 Mac 起 WDA
requests.post(f"{MAC}/startWDA", json={"udid": UDID}, timeout=600).raise_for_status()
# 计算 iproxy 绝对路径:项目根/resources/iproxy/iproxy.exe
BASE = Path(__file__).resolve().parents[1] # iOSAI/
IPROXY = BASE / "resources" / "iproxy" / "iproxy.exe"
if not IPROXY.exists():
raise FileNotFoundError(f"iproxy 不在这里:{IPROXY}")
# 避免 DLL 找不到:把目录加入 PATH以及 Windows 的 DLL 搜索路径)
env = os.environ.copy()
env["PATH"] = str(IPROXY.parent) + os.pathsep + env.get("PATH", "")
try:
os.add_dll_directory(str(IPROXY.parent))
except Exception:
pass
# 起 iproxy本地 9111 -> 设备 8100
p = subprocess.Popen([str(IPROXY), "-u", UDID, "9111", "8100"],
cwd=str(IPROXY.parent),
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
# 探活 WDA
c = wda.Client("http://127.0.0.1:9111")
for _ in range(120):
try:
print(c.status()); break
except Exception:
time.sleep(1)