增加切换账号功能

This commit is contained in:
2025-09-28 20:42:01 +08:00
parent d876743d3e
commit d543c6f757
28 changed files with 937 additions and 137 deletions

View File

@@ -198,13 +198,13 @@ class AiUtils(object):
client = wda.USBClient(udid)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 10})
homeButton = session.xpath("//*[@label='首页']")
homeButton = session.xpath( "//XCUIElementTypeButton[@name='a11y_vo_home' or @label='Home' or @label='首页']")
try:
if homeButton.label == "首页":
print("1.找到了")
if homeButton.exists:
print("找到首页")
return homeButton
else:
print("1.没找到")
print("没找到首页")
return None
except Exception as e:
print(e)
@@ -259,11 +259,20 @@ class AiUtils(object):
@classmethod
def getSendMesageButton(cls, session: Client):
# msgButton = session.xpath(
# '//XCUIElementTypeButton['
# '(@name="发消息" or @label="发消息" or '
# '@name="发送 👋" or @label="发送 👋" or '
# '@name="消息" or @label="消息")'
# ' and @visible="true"]'
# )
msgButton = session.xpath(
'//XCUIElementTypeButton['
'(@name="发消息" or @label="发消息" or '
'@name="发送 👋" or @label="发送 👋" or '
'@name="消息" or @label="消息")'
'@name="消息" or @label="消息" or '
'@name="Message" or @label="Message")'
' and @visible="true"]'
)
@@ -300,6 +309,222 @@ class AiUtils(object):
return cls.findNumber(btn.label)
# # 识别当前页面的消息
# @classmethod
# def extract_messages_from_xml(cls, xml: str):
# """
# 解析 TikTok 聊天 XML返回当前屏幕可见的消息与时间分隔
# [{"type":"time","text":"..."}, {"type":"msg","dir":"in|out","text":"..."}]
# 兼容 Table / CollectionView / ScrollView过滤系统提示/底部工具栏;可见性使用“重叠可视+容差”。
# """
# if not isinstance(xml, str) or not xml.strip():
# return []
#
# try:
# root = etree.fromstring(xml.encode("utf-8"))
# except Exception:
# return []
#
# # ---------- 小工具 ----------
# def get_text(el):
# s = (el.get('label') or el.get('name') or el.get('value') or '') or ''
# return html.unescape(s.strip())
#
# def is_visible(el):
# """无 visible 属性按可见处理;有且为 'false' 才视为不可见。"""
# v = el.get('visible')
# return (v is None) or (v.lower() == 'true')
#
# # ---------- 屏幕尺寸 ----------
# app = root.xpath('/XCUIElementTypeApplication')
# screen_w = cls.parse_float(app[0], 'width', 414.0) if app else 414.0
# screen_h = cls.parse_float(app[0], 'height', 736.0) if app else 736.0
#
# # ---------- 主容器探测(评分选择最像聊天区的容器) ----------
#
# def pick_container():
# cands = []
# for xp, ctype in (
# ('//XCUIElementTypeTable', 'table'),
# ('//XCUIElementTypeCollectionView', 'collection'),
# ('//XCUIElementTypeScrollView', 'scroll'),
# ):
# nodes = [n for n in root.xpath(xp) if is_visible(n)]
# for n in nodes:
# y = cls.parse_float(n, 'y', 0.0)
# h = cls.parse_float(n, 'height', screen_h)
# # Cell 数越多越像聊天列表;越靠中间越像
# cells = n.xpath('.//XCUIElementTypeCell')
# score = len(cells) * 10 - abs((y + h / 2) - screen_h / 2)
# cands.append((score, n, ctype))
# if cands:
# cands.sort(key=lambda t: t[0], reverse=True)
# return cands[0][1], cands[0][2]
# return None, None
#
# container, container_type = pick_container()
#
# # ---------- 可视区area_top, area_bot ----------
# if container is not None:
# area_top = cls.parse_float(container, 'y', 0.0)
# area_h = cls.parse_float(container, 'height', screen_h)
# area_bot = area_top + area_h
# else:
# # 顶栏底缘作为上边界(选最靠上的宽>200的块
# blocks = [n for n in root.xpath('//XCUIElementTypeOther[@y and @height and @width>="200"]') if
# is_visible(n)]
# area_top = 0.0
# if blocks:
# blocks.sort(key=lambda n: cls.parse_float(n, 'y', 0.0))
# b = blocks[0]
# area_top = cls.parse_float(b, 'y', 0.0) + cls.parse_float(b, 'height', 0.0)
# # 输入框 TextView 顶边作为下边界
# tvs = [n for n in root.xpath('//XCUIElementTypeTextView') if is_visible(n)]
# if tvs:
# tvs.sort(key=lambda n: cls.parse_float(n, 'y', 0.0))
# area_bot = cls.parse_float(tvs[-1], 'y', screen_h)
# else:
# area_bot = screen_h
# if area_bot - area_top < 100:
# area_top, area_bot = 0.0, screen_h
#
# def in_view(el) -> bool:
# if not is_visible(el):
# return False
# y = cls.parse_float(el, 'y', -1e9)
# h = cls.parse_float(el, 'height', 0.0)
# by = y + h
# tol = 8.0 # 容差,避免边缘误判
# return not (by <= area_top + tol or y >= area_bot - tol)
#
# # ---------- 时间分隔Header ----------
# items = []
# for t in root.xpath('//XCUIElementTypeStaticText[contains(@traits, "Header")]'):
# if not in_view(t):
# continue
# txt = get_text(t)
# if txt:
# items.append({'type': 'time', 'text': txt, 'y': cls.parse_float(t, 'y', 0.0)})
#
# # ---------- 系统提示/横幅过滤 ----------
# EXCLUDES_LITERAL = {
# 'Heart', 'Lol', 'ThumbsUp',
# '分享发布内容', '视频贴纸标签页', '双击发送表情', '贴纸',
# }
# SYSTEM_PATTERNS = [
# r"(消息请求已被接受|你开始了和.*的聊天|你打开了这个与.*的聊天).*"
# r"回复时接收通知", r"开启(私信)?通知", r"开启通知",
# r"你打开了这个与 .* 的聊天。.*隐私",
# r"在此用户接受你的消息请求之前,你最多只能发送 ?\d+ 条消息。?",
# r"聊天消息条数已达上限,你将无法向该用户发送消息。?",
# r"未发送$",
# r"Turn on (DM|message|direct message)?\s*notifications",
# r"Enable notifications",
# r"Get notified when .* replies",
# r"You opened this chat .* privacy",
# r"Only \d+ message can be sent .* accepts .* request",
# ]
# SYSTEM_RE = re.compile("|".join(SYSTEM_PATTERNS), re.IGNORECASE)
#
# # 排除底部贴纸/GIF/分享栏(通常是位于底部、较矮的一排 CollectionView
# def is_toolbar_like(o) -> bool:
# txt = get_text(o)
# if txt in EXCLUDES_LITERAL:
# return True
# y = cls.parse_float(o, 'y', 0.0)
# h = cls.parse_float(o, 'height', 0.0)
# near_bottom = (area_bot - (y + h)) < 48
# is_short = h <= 40
# return near_bottom and is_short
#
# # ---------- 收集消息候选 ----------
# msg_nodes = []
# if container is not None:
# # 容器内优先找 Cell 下的文本节点Other/StaticText/TextView
# cand = container.xpath(
# './/XCUIElementTypeCell//*[self::XCUIElementTypeOther or self::XCUIElementTypeStaticText or self::XCUIElementTypeTextView]'
# '[@y and (@name or @label or @value)]'
# )
# for o in cand:
# if not in_view(o):
# continue
# if is_toolbar_like(o):
# continue
# txt = get_text(o)
# if not txt or SYSTEM_RE.search(txt):
# continue
# msg_nodes.append(o)
# else:
# # 全局兜底:排除直接挂在 CollectionView底部工具栏下的节点
# cand = root.xpath(
# '//XCUIElementTypeOther[@y and (@name or @label or @value)]'
# ' | //XCUIElementTypeStaticText[@y and (@name or @label or @value)]'
# ' | //XCUIElementTypeTextView[@y and (@name or @label or @value)]'
# )
# for o in cand:
# p = o.getparent()
# if p is not None and p.get('type') == 'XCUIElementTypeCollectionView':
# continue
# if not in_view(o) or is_toolbar_like(o):
# continue
# txt = get_text(o)
# if not txt or SYSTEM_RE.search(txt):
# continue
# msg_nodes.append(o)
#
# # ---------- 方向判定 & 组装 ----------
# for o in msg_nodes:
# txt = get_text(o)
# if not txt or txt in EXCLUDES_LITERAL:
# continue
#
# # 找所在 Cell用于查头像
# cell = o.getparent()
# while cell is not None and cell.get('type') != 'XCUIElementTypeCell':
# cell = cell.getparent()
#
# x = cls.parse_float(o, 'x', 0.0)
# y = cls.parse_float(o, 'y', 0.0)
# w = cls.parse_float(o, 'width', 0.0)
# right_edge = x + w
#
# direction = None
# if cell is not None:
# avatars = [a for a in cell.xpath(
# './/XCUIElementTypeButton[@visible="true" and (@name="图片头像" or @label="图片头像")]'
# ) if is_visible(a)]
# if avatars:
# ax = cls.parse_float(avatars[0], 'x', 0.0)
# direction = 'in' if ax < (screen_w / 2) else 'out'
# if direction is None:
# direction = 'out' if right_edge > (screen_w * 0.75) else 'in'
#
# items.append({'type': 'msg', 'dir': direction, 'text': txt, 'y': y})
#
# # ---------- 排序 & 收尾 ----------
# if items:
# items.sort(key=lambda i: i.get('y', 0.0))
# for it in items:
# it.pop('y', None)
# return items
#
#
# @classmethod
# def parse_float(cls, el, attr, default=0.0):
# try:
# v = el.get(attr)
# if v is None:
# return default
# return float(v)
# except Exception:
# return default
@classmethod
def parse_float(cls, el, attr, default=0.0):
try:
return float(el.get(attr, default))
except Exception:
return default
@classmethod
def extract_messages_from_xml(cls, xml: str):
@@ -310,6 +535,7 @@ class AiUtils(object):
"""
if not isinstance(xml, str) or not xml.strip():
return []
try:
root = etree.fromstring(xml.encode("utf-8"))
except Exception:
@@ -321,7 +547,6 @@ class AiUtils(object):
return html.unescape(s.strip())
def is_visible(el):
"""无 visible 属性按可见处理;有且为 'false' 才视为不可见。"""
v = el.get('visible')
return (v is None) or (v.lower() == 'true')
@@ -330,8 +555,7 @@ class AiUtils(object):
screen_w = cls.parse_float(app[0], 'width', 414.0) if app else 414.0
screen_h = cls.parse_float(app[0], 'height', 736.0) if app else 736.0
# ---------- 主容器探测(评分选择最像聊天区的容器) ----------
# ---------- 主容器探测 ----------
def pick_container():
cands = []
for xp, ctype in (
@@ -343,7 +567,6 @@ class AiUtils(object):
for n in nodes:
y = cls.parse_float(n, 'y', 0.0)
h = cls.parse_float(n, 'height', screen_h)
# Cell 数越多越像聊天列表;越靠中间越像
cells = n.xpath('.//XCUIElementTypeCell')
score = len(cells) * 10 - abs((y + h / 2) - screen_h / 2)
cands.append((score, n, ctype))
@@ -354,13 +577,12 @@ class AiUtils(object):
container, container_type = pick_container()
# ---------- 可视区area_top, area_bot ----------
# ---------- 可视区 ----------
if container is not None:
area_top = cls.parse_float(container, 'y', 0.0)
area_h = cls.parse_float(container, 'height', screen_h)
area_bot = area_top + area_h
else:
# 顶栏底缘作为上边界(选最靠上的宽>200的块
blocks = [n for n in root.xpath('//XCUIElementTypeOther[@y and @height and @width>="200"]') if
is_visible(n)]
area_top = 0.0
@@ -368,7 +590,6 @@ class AiUtils(object):
blocks.sort(key=lambda n: cls.parse_float(n, 'y', 0.0))
b = blocks[0]
area_top = cls.parse_float(b, 'y', 0.0) + cls.parse_float(b, 'height', 0.0)
# 输入框 TextView 顶边作为下边界
tvs = [n for n in root.xpath('//XCUIElementTypeTextView') if is_visible(n)]
if tvs:
tvs.sort(key=lambda n: cls.parse_float(n, 'y', 0.0))
@@ -384,10 +605,10 @@ class AiUtils(object):
y = cls.parse_float(el, 'y', -1e9)
h = cls.parse_float(el, 'height', 0.0)
by = y + h
tol = 8.0 # 容差,避免边缘误判
tol = 8.0
return not (by <= area_top + tol or y >= area_bot - tol)
# ---------- 时间分隔Header ----------
# ---------- 时间分隔 ----------
items = []
for t in root.xpath('//XCUIElementTypeStaticText[contains(@traits, "Header")]'):
if not in_view(t):
@@ -402,8 +623,9 @@ class AiUtils(object):
'分享发布内容', '视频贴纸标签页', '双击发送表情', '贴纸',
}
SYSTEM_PATTERNS = [
r"(消息请求已被接受|你开始了和.*的聊天|你打开了这个与.*的聊天).*"
r"回复时接收通知", r"开启(私信)?通知", r"开启通知",
r"消息请求已被接受。你们可以开始聊天了。",
r"(消息请求已被接受|你开始了和.*的聊天|你打开了这个与.*的聊天).*",
r"开启(私信)?通知", r"开启通知",
r"你打开了这个与 .* 的聊天。.*隐私",
r"在此用户接受你的消息请求之前,你最多只能发送 ?\d+ 条消息。?",
r"聊天消息条数已达上限,你将无法向该用户发送消息。?",
@@ -413,10 +635,13 @@ class AiUtils(object):
r"Get notified when .* replies",
r"You opened this chat .* privacy",
r"Only \d+ message can be sent .* accepts .* request",
r"此消息可能违反.*",
r"无法发送",
r"请告知我们"
]
SYSTEM_RE = re.compile("|".join(SYSTEM_PATTERNS), re.IGNORECASE)
# 排除底部贴纸/GIF/分享栏(通常是位于底部、较矮的一排 CollectionView
def is_toolbar_like(o) -> bool:
txt = get_text(o)
if txt in EXCLUDES_LITERAL:
@@ -430,7 +655,6 @@ class AiUtils(object):
# ---------- 收集消息候选 ----------
msg_nodes = []
if container is not None:
# 容器内优先找 Cell 下的文本节点Other/StaticText/TextView
cand = container.xpath(
'.//XCUIElementTypeCell//*[self::XCUIElementTypeOther or self::XCUIElementTypeStaticText or self::XCUIElementTypeTextView]'
'[@y and (@name or @label or @value)]'
@@ -445,7 +669,6 @@ class AiUtils(object):
continue
msg_nodes.append(o)
else:
# 全局兜底:排除直接挂在 CollectionView底部工具栏下的节点
cand = root.xpath(
'//XCUIElementTypeOther[@y and (@name or @label or @value)]'
' | //XCUIElementTypeStaticText[@y and (@name or @label or @value)]'
@@ -465,10 +688,9 @@ class AiUtils(object):
# ---------- 方向判定 & 组装 ----------
for o in msg_nodes:
txt = get_text(o)
if not txt or txt in EXCLUDES_LITERAL:
if not txt or txt in EXCLUDES_LITERAL or SYSTEM_RE.search(txt):
continue
# 找所在 Cell用于查头像
cell = o.getparent()
while cell is not None and cell.get('type') != 'XCUIElementTypeCell':
cell = cell.getparent()
@@ -481,12 +703,17 @@ class AiUtils(object):
direction = None
if cell is not None:
avatars = [a for a in cell.xpath(
'.//XCUIElementTypeButton[@visible="true" and (@name="图片头像" or @label="图片头像")]'
'.//XCUIElementTypeButton[@visible="true" and (@name="Profile photo" or @label="Profile photo")]'
) if is_visible(a)]
if not avatars and SYSTEM_RE.search(txt):
continue # 没头像且系统消息,直接跳过
if avatars:
ax = cls.parse_float(avatars[0], 'x', 0.0)
direction = 'in' if ax < (screen_w / 2) else 'out'
if direction is None:
if w > screen_w * 0.8 and SYSTEM_RE.search(txt):
continue
direction = 'out' if right_edge > (screen_w * 0.75) else 'in'
items.append({'type': 'msg', 'dir': direction, 'text': txt, 'y': y})
@@ -498,21 +725,16 @@ class AiUtils(object):
it.pop('y', None)
return items
@classmethod
def parse_float(cls, el, attr, default=0.0):
try:
v = el.get(attr)
if v is None:
return default
return float(v)
except Exception:
return default
# 从导航栏读取主播名称。找不到时返回空字符串
@classmethod
def get_navbar_anchor_name(cls, session, timeout: float = 2.0) -> str:
"""从导航栏读取主播名称找不到返回空字符串。"""
# 可选:限制快照深度,提升解析速度/稳定性
def get_navbar_anchor_name(cls, session, timeout: float = 5) -> str:
"""聊天页导航栏读取主播名称找不到返回空字符串。"""
# 限制快照深度(可选)
try:
session.appium_settings({"snapshotMaxDepth": 22})
except Exception:
@@ -523,50 +745,81 @@ class AiUtils(object):
return (info.get("label") or info.get("name") or info.get("value") or "").strip()
def _clean_tail(s: str) -> str:
# 去掉末尾中英标点和空白TikTok 昵称右侧常带一个顿号/逗号,比如 “Alina
return re.sub(r"[,、,。.\s]+$", "", s).strip()
# 导航容器:从“返回”按钮向上找最近祖先,且该祖先内包含“更多/举报”(多语言兜底)
NAV_CONTAINER = (
# ---- 关键修复:导航容器 ----
# 1) “返回”可能是 Button 也可能是 Other同时页面右上角有 “更多/举报” 按钮
BACK_ELEM = (
"//*[@type='XCUIElementTypeButton' or @type='XCUIElementTypeOther']"
"[@name='返回' or @label='返回' or @name='Back' or @label='Back' "
" or @name='戻る' or @label='戻る']"
)
RIGHT_MENU_BTN = (
"//XCUIElementTypeButton"
"[@name='返回' or @label='返回' or @name='Back' or @label='Back' or @name='戻る' or @label='戻る']"
"/ancestor::XCUIElementTypeOther"
"[ .//XCUIElementTypeButton"
" [@name='更多' or @label='更多' or @name='More' or @label='More' or "
" @name='その他' or @label='その他' or @name='詳細' or @label='詳細' or "
" @name='举报' or @label='举报' or @name='Report' or @label='Report' or "
" @name='報告' or @label='報告']"
"][1]"
"[@name='更多' or @label='更多' or @name='More' or @label='More' or "
" @name='その他' or @label='その他' or @name='詳細' or @label='詳細' or "
" @name='举报' or @label='举报' or @name='Report' or @label='Report' or "
" @name='報告' or @label='報告']"
)
# 从“返回”向上找到最近祖先 Other且该祖先内包含“更多/举报”按钮
NAV_CONTAINER = (
BACK_ELEM +
"/ancestor::XCUIElementTypeOther[ .//XCUIElementTypeOther or .//XCUIElementTypeButton ][ ."
+ RIGHT_MENU_BTN +
"][1]"
)
# ① 优先:可访问的 Other自身有文本且不含子 Button,更贴近 TikTok 的实现
# ① 优先:在导航容器里找“可访问的 Other有文本且不含子 Button
XPATH_TITLE_OTHER = (
NAV_CONTAINER +
"//XCUIElementTypeOther[@accessible='true' and count(.//XCUIElementTypeButton)=0 "
" and (string-length(@name)>0 or string-length(@label)>0 or string-length(@value)>0)"
"][1]"
" and (string-length(@name)>0 or string-length(@label)>0 or string-length(@value)>0)][1]"
)
# ② 退路:第一个 StaticText
# ② 退路:导航容器内第一个有文本的 StaticText
XPATH_TITLE_STATIC = (
NAV_CONTAINER +
"//XCUIElementTypeStaticText[string-length(@value)>0 or string-length(@label)>0 or string-length(@name)>0][1]"
)
# 尝试 ①
q = session.xpath(XPATH_TITLE_OTHER)
if q.wait(timeout):
t = _clean_tail(_text_of(q.get()))
if t:
return t
# ③ 兜底:直接在“返回”与右侧菜单同一层级/附近范围内找可读的 Other适配部分机型结构差异
XPATH_FALLBACK_NEAR_BACK = (
BACK_ELEM +
"/ancestor::XCUIElementTypeOther[1]"
"//XCUIElementTypeOther[@accessible='true' and "
" (string-length(@name)>0 or string-length(@label)>0 or string-length(@value)>0)]"
"[count(.//XCUIElementTypeButton)=0][1]"
)
# 尝试 ②
q2 = session.xpath(XPATH_TITLE_STATIC)
if q2.wait(1.0):
t = _clean_tail(_text_of(q2.get()))
if t:
return t
# ④ 兜底:昵称往往以顿号/逗号结尾(例如 “Alina利用这个规律匹配
XPATH_HINT_COMMA_END = (
NAV_CONTAINER +
"//*[ (contains(@name,'') or contains(@label,'') or contains(@value,'')) "
" and string-length(@name)+string-length(@label)+string-length(@value) < 64 ]"
"[1]"
)
# ---- 查询顺序:① -> ② -> ③ -> ④ ----
for xp, wait_s in [
(XPATH_TITLE_OTHER, timeout),
(XPATH_TITLE_STATIC, 1.0),
(XPATH_FALLBACK_NEAR_BACK, 1.0),
(XPATH_HINT_COMMA_END, 1.0),
]:
try:
q = session.xpath(xp)
if q.wait(wait_s):
txt = _clean_tail(_text_of(q.get()))
if txt:
return txt
except Exception:
pass
return ""
# 检查字符串中是否包含中文
@classmethod
def contains_chinese(cls, text):

View File

@@ -58,14 +58,29 @@ class ControlUtils(object):
@classmethod
def clickBack(cls, session: Client):
try:
# back = session.xpath(
# "//*[@label='返回']"
# " | "
# "//*[@label='返回上一屏幕']"
# " | "
# "//XCUIElementTypeButton[@visible='true' and @name='TTKProfileNavBarBaseItemComponent' and @label='IconChevronLeftOffsetLTR']"
# )
back = session.xpath(
"//*[@label='返回']"
# ① 常见中文文案
"//*[@label='返回' or @label='返回上一屏幕']"
" | "
"//*[@label='返回上一屏幕']"
" | "
"//XCUIElementTypeButton[@visible='true' and @name='TTKProfileNavBarBaseItemComponent' and @label='IconChevronLeftOffsetLTR']"
# ② 英文 / 内部 name / 图标 label 的按钮(仅限 Button且可见
"//XCUIElementTypeButton[@visible='true' and ("
"@name='Back' or @label='Back' or " # 英文
"@name='返回' or @label='返回' or " # 中文
"@label='返回上一屏幕' or " # 中文另一种
"@name='nav_bar_start_back' or " # 内部常见 name
"(@name='TTKProfileNavBarBaseItemComponent' and @label='IconChevronLeftOffsetLTR')" # 你给的特例
")]"
)
if back.exists:
back.click()
return True
@@ -104,7 +119,8 @@ class ControlUtils(object):
# 点击搜索
@classmethod
def clickSearch(cls, session: Client):
obj = session.xpath("//*[@name='搜索']")
# obj = session.xpath("//*[@name='搜索']")
obj = session(xpath='//*[@name="搜索" or @label="搜索" or @name="Search" or @label="Search"]')
try:
if obj.exists:
obj.click()

View File

@@ -1,4 +1,5 @@
import json
import os
from pathlib import Path
@@ -12,18 +13,52 @@ class IOSAIStorage:
iosai_dir.mkdir(parents=True, exist_ok=True)
return iosai_dir
# @classmethod
# def save(cls, data: dict | list, filename: str = "data.json") -> Path:
# """
# 存储数据到 C:/Users/<用户名>/IOSAI/filename
# """
# file_path = cls._get_iosai_dir() / filename
# try:
# with open(file_path, "w", encoding="utf-8") as f:
# json.dump(data, f, ensure_ascii=False, indent=2)
# print(f"[IOSAIStorage] 已保存到: {file_path}")
# except Exception as e:
# print(f"[IOSAIStorage] 写入失败: {e}")
# return file_path
@classmethod
def save(cls, data: dict | list, filename: str = "data.json") -> Path:
"""
存储数据到 C:/Users/<用户名>/IOSAI/filename
"""
def save(cls, data: dict | list, filename: str = "data.json", mode: str = "overwrite") -> Path:
file_path = cls._get_iosai_dir() / filename
try:
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"[IOSAIStorage] 已保存到: {file_path}")
except Exception as e:
print(f"[IOSAIStorage] 写入失败: {e}")
file_path.parent.mkdir(parents=True, exist_ok=True)
def _load_json():
try:
return json.loads(file_path.read_text("utf-8"))
except Exception:
return {} if isinstance(data, dict) else []
if mode == "merge" and isinstance(data, dict):
old = _load_json()
if not isinstance(old, dict):
old = {}
old.update(data)
to_write = old
elif mode == "append" and isinstance(data, list):
old = _load_json()
if not isinstance(old, list):
old = []
old.extend(data)
to_write = old
else:
to_write = data # 覆盖
# 原子写入
tmp = file_path.with_suffix(file_path.suffix + ".tmp")
with open(tmp, "w", encoding="utf-8") as f:
json.dump(to_write, f, ensure_ascii=False, indent=2)
os.replace(tmp, file_path)
print(f"[IOSAIStorage] 已写入: {file_path}")
return file_path
@classmethod

View File

@@ -201,7 +201,10 @@ class JsonUtils:
if not isinstance(data, list):
return []
# 过滤 sender 为空字符串的项
return [item for item in data if isinstance(item, dict) and item.get("sender", "").strip()]
# return [item for item in data if isinstance(item, dict) and item.get("sender", "").strip()]
return [item for item in data if isinstance(item, dict)]
@classmethod
def delete_json_items(cls,

View File

@@ -26,7 +26,7 @@ def _force_utf8_everywhere():
except Exception:
pass
# _force_utf8_everywhere()
_force_utf8_everywhere()
class LogManager:
"""

View File

@@ -53,6 +53,34 @@ class Requester():
except Exception as e:
LogManager.method_error(f"翻译失败,报错的原因:{e}", "翻译失败异常")
# 翻译
@classmethod
def translationToChinese(cls, msg):
try:
param = {
"msg": msg,
}
url = "https://ai.yolozs.com/translationToChinese"
result = requests.post(url=url, json=param, verify=False)
LogManager.info(f"翻译 请求的参数:{param}", "翻译")
LogManager.info(f"翻译,状态码:{result.status_code},服务器返回的内容:{result.text}", "翻译")
if result.status_code != 200:
LogManager.error(f"翻译失败,状态码:{result.status_code},服务器返回的内容:{result.text}")
return None
json = result.json()
data = json.get("data")
return data
except Exception as e:
LogManager.method_error(f"翻译失败,报错的原因:{e}", "翻译失败异常")
# ai聊天
@classmethod
def chatToAi(cls, param):
@@ -75,10 +103,13 @@ class Requester():
try:
url = "https://ai.yolozs.com/chat"
result = requests.post(url=url, json=param, verify=False)
LogManager.method_info(f"ai聊天的参数{param}", "ai聊天")
json = result.json()
data = json.get("answer", {})
session_id = json.get("conversation_id", {})
LogManager.method_info(f"ai聊天的参数:{param},ai聊天返回的内容:{result.json()}", "ai聊天")
LogManager.method_info(f"ai聊天返回的内容{result.json()}", "ai聊天")
return data, session_id
except Exception as e:

View File

@@ -20,6 +20,7 @@ class ThreadManager:
@classmethod
def add(cls, udid: str, thread: threading.Thread, event: threading.Event) -> Tuple[int, str]:
LogManager.method_info(f"准备创建任务:{udid}", "task")
LogManager.method_info("创建线程成功","监控消息")
with cls._lock:
# 判断当前设备是否有任务
if cls._tasks.get(udid, None) is not None:
@@ -61,13 +62,19 @@ class ThreadManager:
@classmethod
def _kill_thread(cls, tid: int) -> bool:
"""向原生线程 ID 抛 KeyboardInterrupt强制跳出"""
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid),
ctypes.py_object(KeyboardInterrupt))
if res == 0: # 线程已不存在
print("线程不存在")
return False
if res > 1: # 命中多个线程,重置
print("命中了多个线程")
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), 0)
print("杀死线程成功")
return True
try:
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid),
ctypes.py_object(KeyboardInterrupt))
# LogManager.method_info(f"向原生线程 {tid} 抛 KeyboardInterrupt强制跳出", "task")
if res == 0: # 线程已不存在
print("线程不存在")
return False
if res > 1: # 命中多个线程,重置
print("命中了多个线程")
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), 0)
LogManager.method_info("杀死线程创建成功", "监控消息")
print("杀死线程成功")
return True
except Exception as e:
print("杀死线程出现问题 错误的原因:",e)