合并代码。临时上传
This commit is contained in:
443
Utils/AiUtils.py
443
Utils/AiUtils.py
@@ -11,52 +11,101 @@ import unicodedata
|
||||
import wda
|
||||
from lxml import etree
|
||||
from wda import Client
|
||||
|
||||
from Entity.Variables import wdaFunctionPort
|
||||
from Utils.LogManager import LogManager
|
||||
|
||||
|
||||
# 工具类
|
||||
class AiUtils(object):
|
||||
|
||||
# 在屏幕中找到对应的图片
|
||||
# @classmethod
|
||||
# def findImageInScreen(cls, target, udid):
|
||||
# try:
|
||||
# # 加载原始图像和模板图像
|
||||
# image_path = AiUtils.imagePathWithName(udid, "bgv") # 替换为你的图像路径
|
||||
# template_path = AiUtils.imagePathWithName("", target) # 替换为你的模板路径
|
||||
#
|
||||
# # 读取图像和模板,确保它们都是单通道灰度图
|
||||
# image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
|
||||
# template = cv2.imread(template_path, cv2.IMREAD_GRAYSCALE)
|
||||
#
|
||||
# if image is None:
|
||||
# LogManager.error("加载背景图失败")
|
||||
# return -1, -1
|
||||
#
|
||||
# if template is None:
|
||||
# LogManager.error("加载模板图失败")
|
||||
# return -1, -1
|
||||
#
|
||||
# # 获取模板的宽度和高度
|
||||
# w, h = template.shape[::-1]
|
||||
#
|
||||
# # 使用模板匹配方法
|
||||
# res = cv2.matchTemplate(image, template, cv2.TM_CCOEFF_NORMED)
|
||||
# threshold = 0.7 # 匹配度阈值,可以根据需要调整
|
||||
# loc = np.where(res >= threshold)
|
||||
#
|
||||
# # 检查是否有匹配结果
|
||||
# if loc[0].size > 0:
|
||||
# # 取第一个匹配位置
|
||||
# pt = zip(*loc[::-1]).__next__() # 获取第一个匹配点的坐标
|
||||
# center_x = int(pt[0] + w // 2)
|
||||
# center_y = int(pt[1] + h // 2)
|
||||
# # print(f"第一个匹配到的小心心中心坐标: ({center_x}, {center_y})")
|
||||
# return center_x, center_y
|
||||
# else:
|
||||
# return -1, -1
|
||||
# except Exception as e:
|
||||
# LogManager.error(f"加载素材失败:{e}", udid)
|
||||
# print(e)
|
||||
# return -1, -1
|
||||
|
||||
@classmethod
|
||||
def findImageInScreen(cls, target, udid):
|
||||
try:
|
||||
print("参数", target, udid)
|
||||
|
||||
# 加载原始图像和模板图像
|
||||
image_path = AiUtils.imagePathWithName(udid, "bgv") # 替换为你的图像路径
|
||||
template_path = AiUtils.imagePathWithName("", target) # 替换为你的模板路径
|
||||
image_path = AiUtils.imagePathWithName(udid, "bgv")
|
||||
template_path = AiUtils.imagePathWithName("", target)
|
||||
|
||||
# 读取图像和模板,确保它们都是单通道灰度图
|
||||
image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
|
||||
template = cv2.imread(template_path, cv2.IMREAD_GRAYSCALE)
|
||||
|
||||
if image is None:
|
||||
LogManager.error("加载背景图失败")
|
||||
LogManager.error("加载背景图失败", udid)
|
||||
return -1, -1
|
||||
|
||||
if template is None:
|
||||
LogManager.error("加载模板图失败")
|
||||
|
||||
LogManager.error("加载模板图失败", udid)
|
||||
return -1, -1
|
||||
|
||||
# 获取模板的宽度和高度
|
||||
w, h = template.shape[::-1]
|
||||
|
||||
# 使用模板匹配方法
|
||||
# 模板匹配
|
||||
res = cv2.matchTemplate(image, template, cv2.TM_CCOEFF_NORMED)
|
||||
threshold = 0.7 # 匹配度阈值,可以根据需要调整
|
||||
threshold = 0.7
|
||||
loc = np.where(res >= threshold)
|
||||
|
||||
# 检查是否有匹配结果
|
||||
if loc[0].size > 0:
|
||||
# 取第一个匹配位置
|
||||
pt = zip(*loc[::-1]).__next__() # 获取第一个匹配点的坐标
|
||||
center_x = int(pt[0] + w // 2)
|
||||
center_y = int(pt[1] + h // 2)
|
||||
# print(f"第一个匹配到的小心心中心坐标: ({center_x}, {center_y})")
|
||||
return center_x, center_y
|
||||
else:
|
||||
# 放在 cv2.matchTemplate 之前
|
||||
cv2.imwrite(f'/tmp/runtime_bg_{udid}.png', image)
|
||||
cv2.imwrite(f'/tmp/runtime_tpl_{udid}.png', template)
|
||||
print(f'>>> 设备{udid} 模板{target} 最高相似度:', cv2.minMaxLoc(res)[1])
|
||||
# 安全取出第一个匹配点
|
||||
matches = list(zip(*loc[::-1]))
|
||||
if not matches:
|
||||
return -1, -1
|
||||
|
||||
pt = matches[0]
|
||||
center_x = int(pt[0] + w // 2)
|
||||
center_y = int(pt[1] + h // 2)
|
||||
return center_x, center_y
|
||||
|
||||
except Exception as e:
|
||||
LogManager.error(f"加载素材失败:{e}", udid)
|
||||
print(e)
|
||||
return -1, -1
|
||||
|
||||
# 使用正则查找字符串中的数字
|
||||
@@ -71,7 +120,7 @@ class AiUtils(object):
|
||||
# 选择截图
|
||||
@classmethod
|
||||
def screenshot(cls):
|
||||
client = wda.USBClient("eca000fcb6f55d7ed9b4c524055214c26a7de7aa")
|
||||
client = wda.USBClient("eca000fcb6f55d7ed9b4c524055214c26a7de7aa",wdaFunctionPort)
|
||||
session = client.session()
|
||||
image = session.screenshot()
|
||||
image_path = "screenshot.png"
|
||||
@@ -195,10 +244,10 @@ class AiUtils(object):
|
||||
# click 是否点击该按钮
|
||||
@classmethod
|
||||
def findHomeButton(cls, udid="eca000fcb6f55d7ed9b4c524055214c26a7de7aa"):
|
||||
client = wda.USBClient(udid)
|
||||
client = wda.USBClient(udid,wdaFunctionPort)
|
||||
session = client.session()
|
||||
session.appium_settings({"snapshotMaxDepth": 10})
|
||||
homeButton = session.xpath( "//XCUIElementTypeButton[@name='a11y_vo_home' or @label='Home' or @label='首页']")
|
||||
homeButton = session.xpath("//XCUIElementTypeButton[@name='a11y_vo_home' or @label='Home' or @label='首页']")
|
||||
try:
|
||||
if homeButton.exists:
|
||||
print("找到首页了")
|
||||
@@ -213,7 +262,7 @@ class AiUtils(object):
|
||||
# 查找关闭按钮
|
||||
@classmethod
|
||||
def findLiveCloseButton(cls, udid="eca000fcb6f55d7ed9b4c524055214c26a7de7aa"):
|
||||
client = wda.USBClient(udid)
|
||||
client = wda.USBClient(udid,wdaFunctionPort)
|
||||
session = client.session()
|
||||
session.appium_settings({"snapshotMaxDepth": 10})
|
||||
r = session.xpath("//XCUIElementTypeButton[@name='关闭屏幕']")
|
||||
@@ -288,7 +337,7 @@ class AiUtils(object):
|
||||
# 获取当前屏幕上的节点
|
||||
@classmethod
|
||||
def getCurrentScreenSource(cls):
|
||||
client = wda.USBClient("eca000fcb6f55d7ed9b4c524055214c26a7de7aa")
|
||||
client = wda.USBClient("eca000fcb6f55d7ed9b4c524055214c26a7de7aa",wdaFunctionPort)
|
||||
print(client.source())
|
||||
|
||||
# 查找app主页上的收件箱按钮
|
||||
@@ -308,8 +357,13 @@ class AiUtils(object):
|
||||
print(f"btn:{btn}")
|
||||
return cls.findNumber(btn.label)
|
||||
|
||||
@classmethod
|
||||
def parse_float(cls, el, attr, default=0.0):
|
||||
try:
|
||||
return float(el.get(attr, default))
|
||||
except Exception:
|
||||
return default
|
||||
|
||||
# # 识别当前页面的消息
|
||||
# @classmethod
|
||||
# def extract_messages_from_xml(cls, xml: str):
|
||||
# """
|
||||
@@ -331,17 +385,21 @@ class AiUtils(object):
|
||||
# return html.unescape(s.strip())
|
||||
#
|
||||
# def is_visible(el):
|
||||
# """无 visible 属性按可见处理;有且为 'false' 才视为不可见。"""
|
||||
# v = el.get('visible')
|
||||
# return (v is None) or (v.lower() == 'true')
|
||||
#
|
||||
# def get_ancestor_cell(el):
|
||||
# p = el
|
||||
# while p is not None and p.get('type') != 'XCUIElementTypeCell':
|
||||
# p = p.getparent()
|
||||
# return p
|
||||
#
|
||||
# # ---------- 屏幕尺寸 ----------
|
||||
# app = root.xpath('/XCUIElementTypeApplication')
|
||||
# screen_w = cls.parse_float(app[0], 'width', 414.0) if app else 414.0
|
||||
# screen_h = cls.parse_float(app[0], 'height', 736.0) if app else 736.0
|
||||
#
|
||||
# # ---------- 主容器探测(评分选择最像聊天区的容器) ----------
|
||||
#
|
||||
# # ---------- 主容器探测 ----------
|
||||
# def pick_container():
|
||||
# cands = []
|
||||
# for xp, ctype in (
|
||||
@@ -353,7 +411,6 @@ class AiUtils(object):
|
||||
# for n in nodes:
|
||||
# y = cls.parse_float(n, 'y', 0.0)
|
||||
# h = cls.parse_float(n, 'height', screen_h)
|
||||
# # Cell 数越多越像聊天列表;越靠中间越像
|
||||
# cells = n.xpath('.//XCUIElementTypeCell')
|
||||
# score = len(cells) * 10 - abs((y + h / 2) - screen_h / 2)
|
||||
# cands.append((score, n, ctype))
|
||||
@@ -364,13 +421,12 @@ class AiUtils(object):
|
||||
#
|
||||
# container, container_type = pick_container()
|
||||
#
|
||||
# # ---------- 可视区(area_top, area_bot) ----------
|
||||
# # ---------- 可视区 ----------
|
||||
# if container is not None:
|
||||
# area_top = cls.parse_float(container, 'y', 0.0)
|
||||
# area_h = cls.parse_float(container, 'height', screen_h)
|
||||
# area_bot = area_top + area_h
|
||||
# else:
|
||||
# # 顶栏底缘作为上边界(选最靠上的宽>200的块)
|
||||
# blocks = [n for n in root.xpath('//XCUIElementTypeOther[@y and @height and @width>="200"]') if
|
||||
# is_visible(n)]
|
||||
# area_top = 0.0
|
||||
@@ -378,7 +434,6 @@ class AiUtils(object):
|
||||
# blocks.sort(key=lambda n: cls.parse_float(n, 'y', 0.0))
|
||||
# b = blocks[0]
|
||||
# area_top = cls.parse_float(b, 'y', 0.0) + cls.parse_float(b, 'height', 0.0)
|
||||
# # 输入框 TextView 顶边作为下边界
|
||||
# tvs = [n for n in root.xpath('//XCUIElementTypeTextView') if is_visible(n)]
|
||||
# if tvs:
|
||||
# tvs.sort(key=lambda n: cls.parse_float(n, 'y', 0.0))
|
||||
@@ -394,10 +449,10 @@ class AiUtils(object):
|
||||
# y = cls.parse_float(el, 'y', -1e9)
|
||||
# h = cls.parse_float(el, 'height', 0.0)
|
||||
# by = y + h
|
||||
# tol = 8.0 # 容差,避免边缘误判
|
||||
# tol = 8.0
|
||||
# return not (by <= area_top + tol or y >= area_bot - tol)
|
||||
#
|
||||
# # ---------- 时间分隔(Header) ----------
|
||||
# # ---------- 时间分隔 ----------
|
||||
# items = []
|
||||
# for t in root.xpath('//XCUIElementTypeStaticText[contains(@traits, "Header")]'):
|
||||
# if not in_view(t):
|
||||
@@ -410,10 +465,12 @@ class AiUtils(object):
|
||||
# EXCLUDES_LITERAL = {
|
||||
# 'Heart', 'Lol', 'ThumbsUp',
|
||||
# '分享发布内容', '视频贴纸标签页', '双击发送表情', '贴纸',
|
||||
# '关注',
|
||||
# }
|
||||
# SYSTEM_PATTERNS = [
|
||||
# r"(消息请求已被接受|你开始了和.*的聊天|你打开了这个与.*的聊天).*"
|
||||
# r"回复时接收通知", r"开启(私信)?通知", r"开启通知",
|
||||
# r"消息请求已被接受。你们可以开始聊天了。",
|
||||
# r"(消息请求已被接受|你开始了和.*的聊天|你打开了这个与.*的聊天).*",
|
||||
# r"开启(私信)?通知", r"开启通知",
|
||||
# r"你打开了这个与 .* 的聊天。.*隐私",
|
||||
# r"在此用户接受你的消息请求之前,你最多只能发送 ?\d+ 条消息。?",
|
||||
# r"聊天消息条数已达上限,你将无法向该用户发送消息。?",
|
||||
@@ -423,10 +480,43 @@ class AiUtils(object):
|
||||
# r"Get notified when .* replies",
|
||||
# r"You opened this chat .* privacy",
|
||||
# r"Only \d+ message can be sent .* accepts .* request",
|
||||
# r"此消息可能违反.*",
|
||||
# r"无法发送",
|
||||
# r"请告知我们"
|
||||
# ]
|
||||
# SYSTEM_RE = re.compile("|".join(SYSTEM_PATTERNS), re.IGNORECASE)
|
||||
#
|
||||
# # 排除底部贴纸/GIF/分享栏(通常是位于底部、较矮的一排 CollectionView)
|
||||
# # ---------- 资料卡片(个人信息)剔除 ----------
|
||||
# PROFILE_RE = re.compile(
|
||||
# r"@[\w\.\-]+|粉丝|followers?|following|关注账号",
|
||||
# re.IGNORECASE
|
||||
# )
|
||||
#
|
||||
# def is_profile_cell(cell) -> bool:
|
||||
# if cell is None:
|
||||
# return False
|
||||
# if cell.xpath(
|
||||
# './/XCUIElementTypeButton[@name="关注" or @label="关注" or '
|
||||
# 'contains(translate(@name,"FOLW","folw"),"follow") or '
|
||||
# 'contains(translate(@label,"FOLW","folw"),"follow")]'
|
||||
# ):
|
||||
# return True
|
||||
# texts = []
|
||||
# for t in cell.xpath('.//*[@name or @label or @value]'):
|
||||
# s = get_text(t)
|
||||
# if s:
|
||||
# texts.append(s)
|
||||
# if len(texts) > 40:
|
||||
# break
|
||||
# joined = " ".join(texts)
|
||||
# if PROFILE_RE.search(joined):
|
||||
# return True
|
||||
# cy = cls.parse_float(cell, 'y', 0.0)
|
||||
# ch = cls.parse_float(cell, 'height', 0.0)
|
||||
# if cy < area_top + 140 and ch >= 150:
|
||||
# return True
|
||||
# return False
|
||||
#
|
||||
# def is_toolbar_like(o) -> bool:
|
||||
# txt = get_text(o)
|
||||
# if txt in EXCLUDES_LITERAL:
|
||||
@@ -440,7 +530,6 @@ class AiUtils(object):
|
||||
# # ---------- 收集消息候选 ----------
|
||||
# msg_nodes = []
|
||||
# if container is not None:
|
||||
# # 容器内优先找 Cell 下的文本节点(Other/StaticText/TextView)
|
||||
# cand = container.xpath(
|
||||
# './/XCUIElementTypeCell//*[self::XCUIElementTypeOther or self::XCUIElementTypeStaticText or self::XCUIElementTypeTextView]'
|
||||
# '[@y and (@name or @label or @value)]'
|
||||
@@ -450,12 +539,14 @@ class AiUtils(object):
|
||||
# continue
|
||||
# if is_toolbar_like(o):
|
||||
# continue
|
||||
# cell = get_ancestor_cell(o)
|
||||
# if is_profile_cell(cell):
|
||||
# continue
|
||||
# txt = get_text(o)
|
||||
# if not txt or SYSTEM_RE.search(txt):
|
||||
# if not txt or SYSTEM_RE.search(txt) or txt in EXCLUDES_LITERAL:
|
||||
# continue
|
||||
# msg_nodes.append(o)
|
||||
# else:
|
||||
# # 全局兜底:排除直接挂在 CollectionView(底部工具栏)下的节点
|
||||
# cand = root.xpath(
|
||||
# '//XCUIElementTypeOther[@y and (@name or @label or @value)]'
|
||||
# ' | //XCUIElementTypeStaticText[@y and (@name or @label or @value)]'
|
||||
@@ -467,37 +558,37 @@ class AiUtils(object):
|
||||
# continue
|
||||
# if not in_view(o) or is_toolbar_like(o):
|
||||
# continue
|
||||
# cell = get_ancestor_cell(o)
|
||||
# if is_profile_cell(cell):
|
||||
# continue
|
||||
# txt = get_text(o)
|
||||
# if not txt or SYSTEM_RE.search(txt):
|
||||
# if not txt or SYSTEM_RE.search(txt) or txt in EXCLUDES_LITERAL:
|
||||
# continue
|
||||
# msg_nodes.append(o)
|
||||
#
|
||||
# # ---------- 方向判定 & 组装 ----------
|
||||
# # ---------- 方向判定 & 组装(中心点法) ----------
|
||||
# CENTER_MARGIN = max(12.0, screen_w * 0.02) # 中线容差
|
||||
#
|
||||
# for o in msg_nodes:
|
||||
# txt = get_text(o)
|
||||
# if not txt or txt in EXCLUDES_LITERAL:
|
||||
# if not txt or txt in EXCLUDES_LITERAL or SYSTEM_RE.search(txt):
|
||||
# continue
|
||||
#
|
||||
# # 找所在 Cell(用于查头像)
|
||||
# cell = o.getparent()
|
||||
# while cell is not None and cell.get('type') != 'XCUIElementTypeCell':
|
||||
# cell = cell.getparent()
|
||||
#
|
||||
# x = cls.parse_float(o, 'x', 0.0)
|
||||
# y = cls.parse_float(o, 'y', 0.0)
|
||||
# w = cls.parse_float(o, 'width', 0.0)
|
||||
# right_edge = x + w
|
||||
#
|
||||
# direction = None
|
||||
# if cell is not None:
|
||||
# avatars = [a for a in cell.xpath(
|
||||
# './/XCUIElementTypeButton[@visible="true" and (@name="图片头像" or @label="图片头像")]'
|
||||
# ) if is_visible(a)]
|
||||
# if avatars:
|
||||
# ax = cls.parse_float(avatars[0], 'x', 0.0)
|
||||
# direction = 'in' if ax < (screen_w / 2) else 'out'
|
||||
# if direction is None:
|
||||
# direction = 'out' if right_edge > (screen_w * 0.75) else 'in'
|
||||
# center_x = x + w / 2.0
|
||||
# screen_center = screen_w / 2.0
|
||||
#
|
||||
# if center_x < screen_center - CENTER_MARGIN:
|
||||
# direction = 'in' # 左侧:对方
|
||||
# elif center_x > screen_center + CENTER_MARGIN:
|
||||
# direction = 'out' # 右侧:自己
|
||||
# else:
|
||||
# # 处在中线附近,用右缘兜底
|
||||
# right_edge = x + w
|
||||
# direction = 'out' if right_edge >= screen_center else 'in'
|
||||
#
|
||||
# items.append({'type': 'msg', 'dir': direction, 'text': txt, 'y': y})
|
||||
#
|
||||
@@ -507,31 +598,32 @@ class AiUtils(object):
|
||||
# for it in items:
|
||||
# it.pop('y', None)
|
||||
# return items
|
||||
#
|
||||
#
|
||||
# @classmethod
|
||||
# def parse_float(cls, el, attr, default=0.0):
|
||||
# try:
|
||||
# v = el.get(attr)
|
||||
# if v is None:
|
||||
# return default
|
||||
# return float(v)
|
||||
# except Exception:
|
||||
# return default
|
||||
|
||||
@classmethod
|
||||
def parse_float(cls, el, attr, default=0.0):
|
||||
try:
|
||||
return float(el.get(attr, default))
|
||||
except Exception:
|
||||
@staticmethod
|
||||
def parse_float(el, key: str, default: float = 0.0) -> float:
|
||||
"""稳健读取浮点属性"""
|
||||
if el is None:
|
||||
return default
|
||||
v = el.get(key)
|
||||
if v is None or v == "":
|
||||
return default
|
||||
try:
|
||||
return float(v)
|
||||
except Exception:
|
||||
try:
|
||||
# 某些抓取会出现 '20.0px' / '20,' 等
|
||||
v2 = re.sub(r"[^\d\.\-]+", "", v)
|
||||
return float(v2) if v2 else default
|
||||
except Exception:
|
||||
return default
|
||||
|
||||
@classmethod
|
||||
def extract_messages_from_xml(cls, xml: str):
|
||||
"""
|
||||
解析 TikTok 聊天 XML,返回当前屏幕可见的消息与时间分隔:
|
||||
[{"type":"time","text":"..."}, {"type":"msg","dir":"in|out","text":"..."}]
|
||||
兼容 Table / CollectionView / ScrollView;过滤系统提示/底部工具栏;可见性使用“重叠可视+容差”。
|
||||
兼容 Table / CollectionView / ScrollView;过滤系统提示/底部工具栏;
|
||||
资料卡只过滤“资料区块”而非整 Cell;可见性使用“重叠可视+容差”。
|
||||
"""
|
||||
if not isinstance(xml, str) or not xml.strip():
|
||||
return []
|
||||
@@ -550,6 +642,20 @@ class AiUtils(object):
|
||||
v = el.get('visible')
|
||||
return (v is None) or (v.lower() == 'true')
|
||||
|
||||
def get_ancestor_cell(el):
|
||||
p = el
|
||||
while p is not None and p.get('type') != 'XCUIElementTypeCell':
|
||||
p = p.getparent()
|
||||
return p
|
||||
|
||||
def _bbox(el):
|
||||
return (
|
||||
cls.parse_float(el, 'x', 0.0),
|
||||
cls.parse_float(el, 'y', 0.0),
|
||||
cls.parse_float(el, 'width', 0.0),
|
||||
cls.parse_float(el, 'height', 0.0),
|
||||
)
|
||||
|
||||
# ---------- 屏幕尺寸 ----------
|
||||
app = root.xpath('/XCUIElementTypeApplication')
|
||||
screen_w = cls.parse_float(app[0], 'width', 414.0) if app else 414.0
|
||||
@@ -621,6 +727,7 @@ class AiUtils(object):
|
||||
EXCLUDES_LITERAL = {
|
||||
'Heart', 'Lol', 'ThumbsUp',
|
||||
'分享发布内容', '视频贴纸标签页', '双击发送表情', '贴纸',
|
||||
'关注', # 注意:仅用于按钮/工具条等短元素,后续还会叠加区域过滤,避免误杀消息
|
||||
}
|
||||
SYSTEM_PATTERNS = [
|
||||
r"消息请求已被接受。你们可以开始聊天了。",
|
||||
@@ -635,13 +742,105 @@ class AiUtils(object):
|
||||
r"Get notified when .* replies",
|
||||
r"You opened this chat .* privacy",
|
||||
r"Only \d+ message can be sent .* accepts .* request",
|
||||
|
||||
r"此消息可能违反.*",
|
||||
r"无法发送",
|
||||
r"请告知我们"
|
||||
]
|
||||
SYSTEM_RE = re.compile("|".join(SYSTEM_PATTERNS), re.IGNORECASE)
|
||||
|
||||
# ---------- 资料卡片(个人信息)剔除:仅过滤“资料区块” ----------
|
||||
PROFILE_RE = re.compile(
|
||||
r"@[\w\.\-]+|粉丝|followers?|following|关注账号",
|
||||
re.IGNORECASE
|
||||
)
|
||||
|
||||
def is_profile_cell(cell) -> bool:
|
||||
"""更严格:至少同时命中 >=2 个信号才认定为资料卡片 Cell。"""
|
||||
if cell is None:
|
||||
return False
|
||||
|
||||
has_follow_btn = bool(cell.xpath(
|
||||
'.//XCUIElementTypeButton['
|
||||
'@name="关注" or @label="关注" or '
|
||||
'contains(translate(@name,"FOLW","folw"),"follow") or '
|
||||
'contains(translate(@label,"FOLW","folw"),"follow")]'
|
||||
))
|
||||
|
||||
has_view_profile = bool(cell.xpath(
|
||||
'.//XCUIElementTypeButton['
|
||||
'@name="查看主页" or @label="查看主页" or '
|
||||
'contains(translate(@name,"VIEW PROFILE","view profile"),"view profile") or '
|
||||
'contains(translate(@label,"VIEW PROFILE","view profile"),"view profile")]'
|
||||
))
|
||||
|
||||
has_live_ended = bool(cell.xpath(
|
||||
'.//XCUIElementTypeStaticText['
|
||||
'@name="直播已结束" or @label="直播已结束" or '
|
||||
'contains(translate(@name,"LIVE ENDED","live ended"),"live ended") or '
|
||||
'contains(translate(@label,"LIVE ENDED","live ended"),"live ended")]'
|
||||
))
|
||||
|
||||
cy = cls.parse_float(cell, 'y', 0.0)
|
||||
ch = cls.parse_float(cell, 'height', 0.0)
|
||||
looks_large_card = ch >= 180 # 大卡片外观
|
||||
|
||||
# 再做一次文本特征检查(防止仅一个“关注”误杀)
|
||||
texts = []
|
||||
for t in cell.xpath('.//*[@name or @label or @value]'):
|
||||
s = get_text(t)
|
||||
if s:
|
||||
texts.append(s)
|
||||
if len(texts) > 40:
|
||||
break
|
||||
joined = " ".join(texts)
|
||||
has_profile_terms = bool(PROFILE_RE.search(joined))
|
||||
|
||||
# 命中信号计数(至少2个)
|
||||
signals = sum([has_follow_btn, has_view_profile, has_live_ended, looks_large_card, has_profile_terms])
|
||||
return signals >= 2
|
||||
|
||||
def profile_region_y_range(cell):
|
||||
"""
|
||||
在资料卡 Cell 内,估算“资料区块”的 y 范围(min_y, max_y)。
|
||||
用关键元素(关注按钮 / 查看主页 / 直播已结束 / 短用户名)来圈定范围。
|
||||
"""
|
||||
if cell is None:
|
||||
return None
|
||||
|
||||
key_nodes = []
|
||||
key_nodes += cell.xpath('.//XCUIElementTypeButton[@name="关注" or @label="关注"]')
|
||||
key_nodes += cell.xpath('.//XCUIElementTypeButton[@name="查看主页" or @label="查看主页"]')
|
||||
key_nodes += cell.xpath('.//XCUIElementTypeStaticText[@name="直播已结束" or @label="直播已结束"]')
|
||||
|
||||
# 用户名/昵称:长度较短更像资料区标签
|
||||
for t in cell.xpath('.//XCUIElementTypeStaticText[@name or @label]'):
|
||||
s = (t.get('label') or t.get('name') or '') or ''
|
||||
st = s.strip()
|
||||
if st and len(st) <= 30:
|
||||
key_nodes.append(t)
|
||||
|
||||
ys = []
|
||||
for n in key_nodes:
|
||||
_, y, _, h = _bbox(n)
|
||||
ys += [y, y + h]
|
||||
|
||||
if not ys:
|
||||
return None # 没有关键元素则不定义资料区
|
||||
|
||||
min_y, max_y = min(ys), max(ys)
|
||||
pad = 12.0
|
||||
return (min_y - pad, max_y + pad)
|
||||
|
||||
def belongs_to_profile_region(node, cell) -> bool:
|
||||
"""判断候选 node 是否落在资料区块的 y 范围内"""
|
||||
rng = profile_region_y_range(cell)
|
||||
if not rng:
|
||||
return False
|
||||
_, y, _, h = _bbox(node)
|
||||
ny1, ny2 = y, y + h
|
||||
ry1, ry2 = rng
|
||||
return not (ny2 < ry1 or ny1 > ry2) # 任意重叠即算属于资料区
|
||||
|
||||
def is_toolbar_like(o) -> bool:
|
||||
txt = get_text(o)
|
||||
if txt in EXCLUDES_LITERAL:
|
||||
@@ -664,8 +863,12 @@ class AiUtils(object):
|
||||
continue
|
||||
if is_toolbar_like(o):
|
||||
continue
|
||||
cell = get_ancestor_cell(o)
|
||||
# 仅在“资料卡 Cell 且节点位于资料区块范围内”时过滤
|
||||
if is_profile_cell(cell) and belongs_to_profile_region(o, cell):
|
||||
continue
|
||||
txt = get_text(o)
|
||||
if not txt or SYSTEM_RE.search(txt):
|
||||
if not txt or SYSTEM_RE.search(txt) or txt in EXCLUDES_LITERAL:
|
||||
continue
|
||||
msg_nodes.append(o)
|
||||
else:
|
||||
@@ -680,41 +883,37 @@ class AiUtils(object):
|
||||
continue
|
||||
if not in_view(o) or is_toolbar_like(o):
|
||||
continue
|
||||
cell = get_ancestor_cell(o)
|
||||
if is_profile_cell(cell) and belongs_to_profile_region(o, cell):
|
||||
continue
|
||||
txt = get_text(o)
|
||||
if not txt or SYSTEM_RE.search(txt):
|
||||
if not txt or SYSTEM_RE.search(txt) or txt in EXCLUDES_LITERAL:
|
||||
continue
|
||||
msg_nodes.append(o)
|
||||
|
||||
# ---------- 方向判定 & 组装 ----------
|
||||
# ---------- 方向判定 & 组装(中心点法) ----------
|
||||
CENTER_MARGIN = max(12.0, screen_w * 0.02) # 中线容差
|
||||
|
||||
for o in msg_nodes:
|
||||
txt = get_text(o)
|
||||
if not txt or txt in EXCLUDES_LITERAL or SYSTEM_RE.search(txt):
|
||||
continue
|
||||
|
||||
cell = o.getparent()
|
||||
while cell is not None and cell.get('type') != 'XCUIElementTypeCell':
|
||||
cell = cell.getparent()
|
||||
|
||||
x = cls.parse_float(o, 'x', 0.0)
|
||||
y = cls.parse_float(o, 'y', 0.0)
|
||||
w = cls.parse_float(o, 'width', 0.0)
|
||||
right_edge = x + w
|
||||
|
||||
direction = None
|
||||
if cell is not None:
|
||||
avatars = [a for a in cell.xpath(
|
||||
'.//XCUIElementTypeButton[@visible="true" and (@name="Profile photo" or @label="Profile photo")]'
|
||||
) if is_visible(a)]
|
||||
if not avatars and SYSTEM_RE.search(txt):
|
||||
continue # 没头像且系统消息,直接跳过
|
||||
if avatars:
|
||||
ax = cls.parse_float(avatars[0], 'x', 0.0)
|
||||
direction = 'in' if ax < (screen_w / 2) else 'out'
|
||||
center_x = x + w / 2.0
|
||||
screen_center = screen_w / 2.0
|
||||
|
||||
if direction is None:
|
||||
if w > screen_w * 0.8 and SYSTEM_RE.search(txt):
|
||||
continue
|
||||
direction = 'out' if right_edge > (screen_w * 0.75) else 'in'
|
||||
if center_x < screen_center - CENTER_MARGIN:
|
||||
direction = 'in' # 左侧:对方
|
||||
elif center_x > screen_center + CENTER_MARGIN:
|
||||
direction = 'out' # 右侧:自己
|
||||
else:
|
||||
# 处在中线附近,用右缘兜底
|
||||
right_edge = x + w
|
||||
direction = 'out' if right_edge >= screen_center else 'in'
|
||||
|
||||
items.append({'type': 'msg', 'dir': direction, 'text': txt, 'y': y})
|
||||
|
||||
@@ -725,11 +924,6 @@ class AiUtils(object):
|
||||
it.pop('y', None)
|
||||
return items
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@classmethod
|
||||
def get_navbar_anchor_name(cls, session, timeout: float = 5) -> str:
|
||||
"""从聊天页导航栏读取主播名称;找不到返回空字符串。"""
|
||||
@@ -818,8 +1012,6 @@ class AiUtils(object):
|
||||
|
||||
return ""
|
||||
|
||||
|
||||
|
||||
# 检查字符串中是否包含中文
|
||||
@classmethod
|
||||
def contains_chinese(cls, text):
|
||||
@@ -863,38 +1055,6 @@ class AiUtils(object):
|
||||
with open(file_path, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||||
|
||||
# @staticmethod
|
||||
# def _normalize_anchor_items(items):
|
||||
# """
|
||||
# 规范化输入为 [{anchorId, country}] 的列表:
|
||||
# - 允许传入:单个对象、对象列表、字符串(当 anchorId 用)
|
||||
# - 过滤不合规项
|
||||
# """
|
||||
# result = []
|
||||
# if items is None:
|
||||
# return result
|
||||
#
|
||||
# if isinstance(items, dict):
|
||||
# # 单个对象
|
||||
# aid = items.get("anchorId")
|
||||
# if aid:
|
||||
# result.append({"anchorId": str(aid), "country": items.get("country", "")})
|
||||
# return result
|
||||
#
|
||||
# if isinstance(items, list):
|
||||
# for it in items:
|
||||
# if isinstance(it, dict):
|
||||
# aid = it.get("anchorId")
|
||||
# if aid:
|
||||
# result.append({"anchorId": str(aid), "country": it.get("country", "")})
|
||||
# elif isinstance(it, str):
|
||||
# result.append({"anchorId": it, "country": ""})
|
||||
# return result
|
||||
#
|
||||
# if isinstance(items, str):
|
||||
# result.append({"anchorId": items, "country": ""})
|
||||
# return result
|
||||
|
||||
@staticmethod
|
||||
def _normalize_anchor_items(items):
|
||||
"""
|
||||
@@ -929,7 +1089,6 @@ class AiUtils(object):
|
||||
result.append({"anchorId": items})
|
||||
return result
|
||||
|
||||
|
||||
# -------- 追加(对象数组平铺追加) --------
|
||||
@classmethod
|
||||
def save_aclist_flat_append(cls, acList, filename="log/acList.json"):
|
||||
@@ -958,7 +1117,6 @@ class AiUtils(object):
|
||||
# LogManager.method_info(f"写入的路径是:{file_path}", "写入数据")
|
||||
LogManager.info(f"[acList] 已追加 {len(to_add)} 条,当前总数={len(data)} -> {file_path}")
|
||||
|
||||
|
||||
@classmethod
|
||||
def pop_aclist_first(cls, filename="log/acList.json", mode="pop"):
|
||||
"""
|
||||
@@ -1166,8 +1324,6 @@ class AiUtils(object):
|
||||
print(f"[peek] 读取失败: {e}")
|
||||
return None
|
||||
|
||||
|
||||
|
||||
@staticmethod
|
||||
def run_tidevice_command(udid, action, bundle_id, timeout=30):
|
||||
"""
|
||||
@@ -1204,7 +1360,8 @@ class AiUtils(object):
|
||||
return False
|
||||
except FileNotFoundError:
|
||||
# 处理tidevice命令未找到的情况(通常意味着tidevice未安装或不在PATH中)
|
||||
LogManager.error("The 'tidevice' command was not found. Please ensure it is installed and in your system PATH.")
|
||||
LogManager.error(
|
||||
"The 'tidevice' command was not found. Please ensure it is installed and in your system PATH.")
|
||||
return False
|
||||
except Exception as e:
|
||||
# 捕获其他可能异常
|
||||
@@ -1235,3 +1392,5 @@ class AiUtils(object):
|
||||
return cls.run_tidevice_command(udid, "launch", bundle_id, timeout)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -58,13 +58,6 @@ class ControlUtils(object):
|
||||
@classmethod
|
||||
def clickBack(cls, session: Client):
|
||||
try:
|
||||
# back = session.xpath(
|
||||
# "//*[@label='返回']"
|
||||
# " | "
|
||||
# "//*[@label='返回上一屏幕']"
|
||||
# " | "
|
||||
# "//XCUIElementTypeButton[@visible='true' and @name='TTKProfileNavBarBaseItemComponent' and @label='IconChevronLeftOffsetLTR']"
|
||||
# )
|
||||
|
||||
back = session.xpath(
|
||||
# ① 常见中文文案
|
||||
@@ -80,7 +73,6 @@ class ControlUtils(object):
|
||||
")]"
|
||||
)
|
||||
|
||||
|
||||
if back.exists:
|
||||
back.click()
|
||||
return True
|
||||
@@ -96,6 +88,13 @@ class ControlUtils(object):
|
||||
if back.exists:
|
||||
back.click()
|
||||
return True
|
||||
elif session.xpath(
|
||||
"(//XCUIElementTypeOther[@y='20' and @height='44']//XCUIElementTypeButton[@visible='true'])[1]").exists:
|
||||
back = session.xpath(
|
||||
"(//XCUIElementTypeOther[@y='20' and @height='44']//XCUIElementTypeButton[@visible='true'])[1]")
|
||||
if back.exists:
|
||||
back.click()
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
except Exception as e:
|
||||
@@ -148,10 +147,9 @@ class ControlUtils(object):
|
||||
videoCell = session.xpath(
|
||||
'(//XCUIElementTypeCollectionView//XCUIElementTypeCell[.//XCUIElementTypeImage[@name="profile_video"]])[1]')
|
||||
|
||||
|
||||
tab = session.xpath(
|
||||
'//XCUIElementTypeButton[@name="TTKProfileTabVideoButton_0" or contains(@label,"作品") or contains(@name,"作品")]'
|
||||
).get(timeout=5) # 某些版本 tab.value 可能就是数量;或者 tab.label 类似 “作品 7”
|
||||
).get(timeout=5) # 某些版本 tab.value 可能就是数量;或者 tab.label 类似 “作品 7”
|
||||
m = re.search(r"\d+", tab.label)
|
||||
|
||||
num = 0
|
||||
@@ -170,8 +168,6 @@ class ControlUtils(object):
|
||||
print("没有找到主页的第一个视频")
|
||||
return False, num
|
||||
|
||||
|
||||
|
||||
@classmethod
|
||||
def clickFollow(cls, session, aid):
|
||||
# 1) 含“关注/已关注/Follow/Following”的首个 cell
|
||||
@@ -199,6 +195,7 @@ class ControlUtils(object):
|
||||
left_x = max(1, rect.x - 20)
|
||||
center_y = rect.y + rect.height // 2
|
||||
session.tap(left_x, center_y)
|
||||
|
||||
@classmethod
|
||||
def userClickProfile(cls, session, aid):
|
||||
try:
|
||||
@@ -283,7 +280,3 @@ class ControlUtils(object):
|
||||
print("开始微滑动")
|
||||
session.swipe(center_x, center_y, end_x, end_y, duration_ms / 1000)
|
||||
print("随机微滑动:", trajectory)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -13,19 +13,6 @@ class IOSAIStorage:
|
||||
iosai_dir.mkdir(parents=True, exist_ok=True)
|
||||
return iosai_dir
|
||||
|
||||
# @classmethod
|
||||
# def save(cls, data: dict | list, filename: str = "data.json") -> Path:
|
||||
# """
|
||||
# 存储数据到 C:/Users/<用户名>/IOSAI/filename
|
||||
# """
|
||||
# file_path = cls._get_iosai_dir() / filename
|
||||
# try:
|
||||
# with open(file_path, "w", encoding="utf-8") as f:
|
||||
# json.dump(data, f, ensure_ascii=False, indent=2)
|
||||
# print(f"[IOSAIStorage] 已保存到: {file_path}")
|
||||
# except Exception as e:
|
||||
# print(f"[IOSAIStorage] 写入失败: {e}")
|
||||
# return file_path
|
||||
|
||||
@classmethod
|
||||
def save(cls, data: dict | list, filename: str = "data.json", mode: str = "overwrite") -> Path:
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Dict, Any
|
||||
|
||||
import portalocker as locker # ① 引入跨平台锁
|
||||
|
||||
|
||||
@@ -118,11 +120,33 @@ class JsonUtils:
|
||||
json.dump(data, f, ensure_ascii=False, indent=4)
|
||||
|
||||
# --- 新增:通用追加(不做字段校验) ---
|
||||
# @classmethod
|
||||
# def append_json_items(cls, items, filename="log/last_message.json"):
|
||||
# """
|
||||
# 将 dict 或 [dict, ...] 追加到 JSON 文件(数组)中;不校验字段。
|
||||
# """
|
||||
# file_path = Path(filename)
|
||||
# data = cls._read_json_list(file_path)
|
||||
#
|
||||
# # 统一成 list
|
||||
# if isinstance(items, dict):
|
||||
# items = [items]
|
||||
# elif not isinstance(items, list):
|
||||
# # 既不是 dict 也不是 list,直接忽略
|
||||
# return
|
||||
#
|
||||
# # 只接受字典项
|
||||
# items = [it for it in items if isinstance(it, dict)]
|
||||
# if not items:
|
||||
# return
|
||||
#
|
||||
# data.extend(items)
|
||||
#
|
||||
# # LogManager.method_info(filename,"路径")
|
||||
# cls._write_json_list(file_path, data)
|
||||
|
||||
@classmethod
|
||||
def append_json_items(cls, items, filename="log/last_message.json"):
|
||||
"""
|
||||
将 dict 或 [dict, ...] 追加到 JSON 文件(数组)中;不校验字段。
|
||||
"""
|
||||
file_path = Path(filename)
|
||||
data = cls._read_json_list(file_path)
|
||||
|
||||
@@ -130,20 +154,19 @@ class JsonUtils:
|
||||
if isinstance(items, dict):
|
||||
items = [items]
|
||||
elif not isinstance(items, list):
|
||||
# 既不是 dict 也不是 list,直接忽略
|
||||
return
|
||||
|
||||
# 只接受字典项
|
||||
items = [it for it in items if isinstance(it, dict)]
|
||||
# 只保留 sender 非空的字典
|
||||
items = [
|
||||
it for it in items
|
||||
if isinstance(it, dict) and it.get("sender") != ""
|
||||
]
|
||||
if not items:
|
||||
return
|
||||
|
||||
data.extend(items)
|
||||
|
||||
# LogManager.method_info(filename,"路径")
|
||||
cls._write_json_list(file_path, data)
|
||||
|
||||
|
||||
@classmethod
|
||||
def update_json_items(cls, match: dict, patch: dict, filename="log/last_message.json", multi: bool = True) -> int:
|
||||
"""
|
||||
@@ -177,17 +200,8 @@ class JsonUtils:
|
||||
|
||||
return updated
|
||||
|
||||
# @classmethod
|
||||
# def query_all_json_items(cls, filename="log/last_message.json") -> list:
|
||||
# """
|
||||
# 查询 JSON 文件(数组)中的所有项
|
||||
# :param filename: JSON 文件路径
|
||||
# :return: list,可能为空
|
||||
# """
|
||||
# file_path = Path(filename)
|
||||
# print(file_path)
|
||||
# data = cls._read_json_list(file_path)
|
||||
# return data if isinstance(data, list) else []
|
||||
|
||||
|
||||
|
||||
@classmethod
|
||||
def query_all_json_items(cls, filename="log/last_message.json") -> list:
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import requests
|
||||
from Entity.Variables import prologueList
|
||||
from Entity.Variables import prologueList, API_KEY
|
||||
from Utils.IOSAIStorage import IOSAIStorage
|
||||
from Utils.JsonUtils import JsonUtils
|
||||
from Utils.LogManager import LogManager
|
||||
@@ -84,31 +84,63 @@ class Requester():
|
||||
# ai聊天
|
||||
@classmethod
|
||||
def chatToAi(cls, param):
|
||||
aiConfig = JsonUtils.read_json("aiConfig")
|
||||
|
||||
|
||||
|
||||
|
||||
# aiConfig = JsonUtils.read_json("aiConfig")
|
||||
aiConfig = IOSAIStorage.load("aiConfig.json")
|
||||
|
||||
|
||||
|
||||
|
||||
agentName = aiConfig.get("agentName")
|
||||
guildName = aiConfig.get("guildName")
|
||||
contactTool = aiConfig.get("contactTool", "")
|
||||
contact = aiConfig.get("contact", "")
|
||||
|
||||
age = aiConfig.get("age", 20)
|
||||
sex = aiConfig.get("sex", "女")
|
||||
height = aiConfig.get("height", 160)
|
||||
weight = aiConfig.get("weight", 55)
|
||||
body_features = aiConfig.get("body_features", "")
|
||||
nationality = aiConfig.get("nationality", "中国")
|
||||
personality = aiConfig.get("personality", "")
|
||||
strengths = aiConfig.get("strengths", "")
|
||||
|
||||
|
||||
|
||||
|
||||
inputs = {
|
||||
"name": agentName,
|
||||
"Trade_union": guildName,
|
||||
"contcat_method": contactTool,
|
||||
"contcat_info": contact
|
||||
"contcat_info": contact,
|
||||
"age": age,
|
||||
"sex": sex,
|
||||
"height": height,
|
||||
"weight": weight,
|
||||
"body_features": body_features,
|
||||
"nationality": nationality,
|
||||
"personality": personality,
|
||||
"strengths": strengths,
|
||||
}
|
||||
|
||||
param["inputs"] = inputs
|
||||
|
||||
try:
|
||||
url = "https://ai.yolozs.com/chat"
|
||||
result = requests.post(url=url, json=param, verify=False)
|
||||
|
||||
# url = "https://ai.yolozs.com/chat"
|
||||
url = "https://ai.yolozs.com/customchat"
|
||||
|
||||
result = requests.post(url=url, json=param, verify=False)
|
||||
|
||||
LogManager.method_info(f"ai聊天的参数:{param}", "ai聊天")
|
||||
print(f"ai聊天的参数:{param}")
|
||||
|
||||
json = result.json()
|
||||
data = json.get("answer", {})
|
||||
session_id = json.get("conversation_id", {})
|
||||
data = json.get("answer", "")
|
||||
session_id = json.get("conversation_id", "")
|
||||
LogManager.method_info(f"ai聊天返回的内容:{result.json()}", "ai聊天")
|
||||
|
||||
return data, session_id
|
||||
|
||||
327
Utils/TencentOCRUtils.py
Normal file
327
Utils/TencentOCRUtils.py
Normal file
@@ -0,0 +1,327 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import base64
|
||||
import hashlib
|
||||
import hmac
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import socket
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from http.client import HTTPSConnection
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
Point = Dict[str, int]
|
||||
ItemPolygon = Dict[str, int]
|
||||
|
||||
|
||||
class TencentOCR:
|
||||
"""腾讯云 OCR 封装,自动从环境变量或配置文件加载密钥"""
|
||||
|
||||
@staticmethod
|
||||
def _load_secret() -> Dict[str, str]:
|
||||
# 优先从环境变量读取
|
||||
sid = "AKIDXw86q6D8pJYZOEvOm25wZy96oIZcQ1OX"
|
||||
skey = "ye7MNAj4ub5PVO2TmriLkwtc8QTItGPO"
|
||||
|
||||
# 如果没有,就尝试从 ~/.tencent_ocr.json 加载
|
||||
if not sid or not skey:
|
||||
cfg_path = os.path.expanduser("~/.tencent_ocr.json")
|
||||
if os.path.exists(cfg_path):
|
||||
with open(cfg_path, "r", encoding="utf-8") as f:
|
||||
cfg = json.load(f)
|
||||
sid = sid or cfg.get("secret_id")
|
||||
skey = skey or cfg.get("secret_key")
|
||||
|
||||
if not sid or not skey:
|
||||
raise RuntimeError(
|
||||
"❌ 未找到腾讯云 OCR 密钥,请设置环境变量 TENCENT_SECRET_ID / TENCENT_SECRET_KEY,"
|
||||
"或在用户目录下创建 ~/.tencent_ocr.json(格式:{\"secret_id\":\"...\",\"secret_key\":\"...\"})"
|
||||
)
|
||||
|
||||
return {"secret_id": sid, "secret_key": skey}
|
||||
|
||||
@staticmethod
|
||||
def _hmac_sha256(key: bytes, msg: str) -> bytes:
|
||||
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
|
||||
|
||||
@staticmethod
|
||||
def _strip_data_uri_prefix(b64: str) -> str:
|
||||
if "," in b64 and b64.strip().lower().startswith("data:"):
|
||||
return b64.split(",", 1)[1]
|
||||
return b64
|
||||
|
||||
@staticmethod
|
||||
def _now_ts_and_date():
|
||||
ts = int(time.time())
|
||||
date = datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%Y-%m-%d")
|
||||
return ts, date
|
||||
|
||||
@staticmethod
|
||||
def recognize(
|
||||
*,
|
||||
image_path: Optional[str] = None,
|
||||
image_bytes: Optional[bytes] = None,
|
||||
image_url: Optional[str] = None,
|
||||
region: Optional[str] = None,
|
||||
token: Optional[str] = None,
|
||||
action: str = "GeneralBasicOCR",
|
||||
version: str = "2018-11-19",
|
||||
service: str = "ocr",
|
||||
host: str = "ocr.tencentcloudapi.com",
|
||||
timeout: int = 15,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
调用腾讯云 OCR,三选一:image_path / image_bytes / image_url
|
||||
自动加载密钥(优先环境变量 -> ~/.tencent_ocr.json)
|
||||
"""
|
||||
# 读取密钥
|
||||
sec = TencentOCR._load_secret()
|
||||
secret_id = sec["secret_id"]
|
||||
secret_key = sec["secret_key"]
|
||||
|
||||
assert sum(v is not None for v in (image_path, image_bytes, image_url)) == 1, \
|
||||
"必须且只能提供 image_path / image_bytes / image_url 之一"
|
||||
|
||||
# 1. payload
|
||||
payload: Dict[str, Any] = {}
|
||||
if image_url:
|
||||
payload["ImageUrl"] = image_url
|
||||
else:
|
||||
if image_bytes is None:
|
||||
with open(image_path, "rb") as f:
|
||||
image_bytes = f.read()
|
||||
img_b64 = base64.b64encode(image_bytes).decode("utf-8")
|
||||
img_b64 = TencentOCR._strip_data_uri_prefix(img_b64)
|
||||
payload["ImageBase64"] = img_b64
|
||||
|
||||
payload_str = json.dumps(payload, ensure_ascii=False, separators=(",", ":"))
|
||||
|
||||
# 2. 参数准备
|
||||
algorithm = "TC3-HMAC-SHA256"
|
||||
http_method = "POST"
|
||||
canonical_uri = "/"
|
||||
canonical_querystring = ""
|
||||
content_type = "application/json; charset=utf-8"
|
||||
signed_headers = "content-type;host;x-tc-action"
|
||||
|
||||
timestamp, date = TencentOCR._now_ts_and_date()
|
||||
credential_scope = f"{date}/{service}/tc3_request"
|
||||
|
||||
# 3. 规范请求串
|
||||
canonical_headers = (
|
||||
f"content-type:{content_type}\n"
|
||||
f"host:{host}\n"
|
||||
f"x-tc-action:{action.lower()}\n"
|
||||
)
|
||||
hashed_request_payload = hashlib.sha256(payload_str.encode("utf-8")).hexdigest()
|
||||
canonical_request = (
|
||||
f"{http_method}\n{canonical_uri}\n{canonical_querystring}\n"
|
||||
f"{canonical_headers}\n{signed_headers}\n{hashed_request_payload}"
|
||||
)
|
||||
|
||||
# 4. 签名
|
||||
hashed_canonical_request = hashlib.sha256(canonical_request.encode("utf-8")).hexdigest()
|
||||
string_to_sign = (
|
||||
f"{algorithm}\n{timestamp}\n{credential_scope}\n{hashed_canonical_request}"
|
||||
)
|
||||
secret_date = TencentOCR._hmac_sha256(("TC3" + secret_key).encode("utf-8"), date)
|
||||
secret_service = hmac.new(secret_date, service.encode("utf-8"), hashlib.sha256).digest()
|
||||
secret_signing = hmac.new(secret_service, b"tc3_request", hashlib.sha256).digest()
|
||||
signature = hmac.new(secret_signing, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
|
||||
|
||||
authorization = (
|
||||
f"{algorithm} "
|
||||
f"Credential={secret_id}/{credential_scope}, "
|
||||
f"SignedHeaders={signed_headers}, "
|
||||
f"Signature={signature}"
|
||||
)
|
||||
|
||||
# 5. headers
|
||||
headers = {
|
||||
"Authorization": authorization,
|
||||
"Content-Type": content_type,
|
||||
"Host": host,
|
||||
"X-TC-Action": action,
|
||||
"X-TC-Timestamp": str(timestamp),
|
||||
"X-TC-Version": version,
|
||||
}
|
||||
if region:
|
||||
headers["X-TC-Region"] = region
|
||||
if token:
|
||||
headers["X-TC-Token"] = token
|
||||
|
||||
# 6. 发请求
|
||||
try:
|
||||
conn = HTTPSConnection(host, timeout=timeout)
|
||||
conn.request("POST", "/", body=payload_str.encode("utf-8"), headers=headers)
|
||||
resp = conn.getresponse()
|
||||
raw = resp.read().decode("utf-8", errors="replace")
|
||||
try:
|
||||
data = json.loads(raw)
|
||||
except Exception:
|
||||
data = {"NonJSONBody": raw}
|
||||
return {
|
||||
"http_status": resp.status,
|
||||
"http_reason": resp.reason,
|
||||
"headers": dict(resp.getheaders()),
|
||||
"body": data,
|
||||
}
|
||||
except socket.gaierror as e:
|
||||
return {"error": "DNS_RESOLUTION_FAILED", "detail": str(e)}
|
||||
except socket.timeout:
|
||||
return {"error": "NETWORK_TIMEOUT", "detail": f"Timeout after {timeout}s"}
|
||||
except Exception as e:
|
||||
return {"error": "REQUEST_FAILED", "detail": str(e)}
|
||||
finally:
|
||||
try:
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def _norm(s: str) -> str:
|
||||
return (s or "").strip().lstrip("@").lower()
|
||||
|
||||
@staticmethod
|
||||
def _rect_from_polygon(poly: List[Point]) -> Optional[ItemPolygon]:
|
||||
if not poly:
|
||||
return None
|
||||
xs = [p["X"] for p in poly]
|
||||
ys = [p["Y"] for p in poly]
|
||||
return {"X": min(xs), "Y": min(ys), "Width": max(xs) - min(xs), "Height": max(ys) - min(ys)}
|
||||
|
||||
@classmethod
|
||||
def find_last_name_bbox(cls, ocr: Dict[str, Any], name: str) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
从 OCR JSON 中找到指定名字的“最后一次”出现并返回坐标信息。
|
||||
:param ocr: 完整 OCR JSON(含 Response.TextDetections)
|
||||
:param name: 前端传入的名字,比如 'lee39160'
|
||||
:return: dict 或 None,例如:
|
||||
{
|
||||
"index": 21,
|
||||
"text": "lee39160",
|
||||
"item": {"X": 248, "Y": 1701, "Width": 214, "Height": 49},
|
||||
"polygon": [...],
|
||||
"center": {"x": 355.0, "y": 1725.5}
|
||||
}
|
||||
"""
|
||||
dets = (ocr.get("body") or ocr).get("Response", {}).get("TextDetections", [])
|
||||
if not dets or not name:
|
||||
return None
|
||||
|
||||
target = cls._norm(name)
|
||||
found = -1
|
||||
|
||||
# 从后往前找最后一个严格匹配
|
||||
for i in range(len(dets) - 1, -1, -1):
|
||||
txt = cls._norm(dets[i].get("DetectedText", ""))
|
||||
if txt == target:
|
||||
found = i
|
||||
break
|
||||
|
||||
# 兜底:再匹配原始文本(可能带 @)
|
||||
if found == -1:
|
||||
for i in range(len(dets) - 1, -1, -1):
|
||||
raw = (dets[i].get("DetectedText") or "").strip().lower()
|
||||
if raw.lstrip("@") == target:
|
||||
found = i
|
||||
break
|
||||
|
||||
if found == -1:
|
||||
return None
|
||||
|
||||
det = dets[found]
|
||||
item: Optional[ItemPolygon] = det.get("ItemPolygon")
|
||||
poly: List[Point] = det.get("Polygon") or []
|
||||
|
||||
# 没有 ItemPolygon 就从 Polygon 算
|
||||
if not item:
|
||||
item = cls._rect_from_polygon(poly)
|
||||
if not item:
|
||||
return None
|
||||
|
||||
center = {"x": item["X"] + item["Width"] / 2.0, "y": item["Y"] + item["Height"] / 2.0}
|
||||
|
||||
return {
|
||||
"index": found,
|
||||
"text": det.get("DetectedText", ""),
|
||||
"item": item,
|
||||
"polygon": poly,
|
||||
"center": center,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _get_detections(ocr: Dict[str, Any]) -> List[Dict[str, Any]]:
|
||||
"""兼容含 body 层的 OCR 结构,提取 TextDetections 列表"""
|
||||
return (ocr.get("body") or ocr).get("Response", {}).get("TextDetections", []) or []
|
||||
|
||||
@staticmethod
|
||||
def _norm_txt(s: str) -> str:
|
||||
"""清洗文本:去空格"""
|
||||
return (s or "").strip()
|
||||
|
||||
@classmethod
|
||||
def slice_texts_between(
|
||||
cls,
|
||||
ocr: Dict[str, Any],
|
||||
start_keyword: str = "切换账号",
|
||||
end_keyword: str = "添加账号",
|
||||
*,
|
||||
username_like: bool = False, # True 时只保留像用户名的文本
|
||||
min_conf: int = 0 # 置信度下限
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
返回位于 start_keyword 与 end_keyword 之间的所有文本项(不含两端),
|
||||
每项保留原始 DetectedText、Confidence、ItemPolygon 等信息。
|
||||
"""
|
||||
dets = cls._get_detections(ocr)
|
||||
if not dets:
|
||||
return []
|
||||
|
||||
# 找“切换账号”最后一次出现的下标
|
||||
start_idx = -1
|
||||
for i, d in enumerate(dets):
|
||||
txt = cls._norm_txt(d.get("DetectedText", ""))
|
||||
if txt == start_keyword:
|
||||
start_idx = i
|
||||
|
||||
# 找“添加账号”第一次出现的下标
|
||||
end_idx = -1
|
||||
for i, d in enumerate(dets):
|
||||
txt = cls._norm_txt(d.get("DetectedText", ""))
|
||||
if txt == end_keyword:
|
||||
end_idx = i
|
||||
break
|
||||
|
||||
if start_idx == -1 or end_idx == -1 or end_idx <= start_idx:
|
||||
return []
|
||||
|
||||
# 提取两者之间的内容
|
||||
mid = []
|
||||
for d in dets[start_idx + 1:end_idx]:
|
||||
if int(d.get("Confidence", 0)) < min_conf:
|
||||
continue
|
||||
txt = cls._norm_txt(d.get("DetectedText", ""))
|
||||
if not txt:
|
||||
continue
|
||||
mid.append(d)
|
||||
|
||||
if not username_like:
|
||||
return mid
|
||||
|
||||
# 只保留像用户名的文本
|
||||
pat = re.compile(r"^[A-Za-z0-9_.-]{3,}$")
|
||||
filtered = [d for d in mid if pat.match(cls._norm_txt(d.get("DetectedText", "")))]
|
||||
return filtered
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
result = TencentOCR.recognize(
|
||||
image_path=r"C:\Users\zhangkai\Desktop\last-item\iosai\test.png",
|
||||
action="GeneralAccurateOCR",
|
||||
)
|
||||
print(json.dumps(result, ensure_ascii=False, indent=2))
|
||||
|
||||
@@ -20,7 +20,7 @@ class ThreadManager:
|
||||
@classmethod
|
||||
def add(cls, udid: str, thread: threading.Thread, event: threading.Event) -> Tuple[int, str]:
|
||||
LogManager.method_info(f"准备创建任务:{udid}", "task")
|
||||
LogManager.method_info("创建线程成功","监控消息")
|
||||
LogManager.method_info("创建线程成功", "监控消息")
|
||||
with cls._lock:
|
||||
# 判断当前设备是否有任务
|
||||
if cls._tasks.get(udid, None) is not None:
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Reference in New Issue
Block a user