Compare commits

...

2 Commits

Author SHA1 Message Date
71e9bf9045 Merge remote-tracking branch 'origin/main'
# Conflicts:
#	.idea/workspace.xml
#	Module/FlaskService.py
#	tidevice_entry.py
2025-09-08 21:44:17 +08:00
4b3247d0bf 20250904-初步功能已完成 2025-09-08 21:42:09 +08:00
7 changed files with 585 additions and 123 deletions

View File

@@ -2,6 +2,7 @@ import json
import os
import socket
import threading
from pathlib import Path
from queue import Queue
from typing import Any, Dict
@@ -232,6 +233,10 @@ def stopScript():
code, msg = ThreadManager.stop(udid)
return ResultData(code=code, data="", msg=msg).toJson()
# 关注打招呼
@app.route('/passAnchorData', methods=['POST'])
def passAnchorData():
@@ -241,6 +246,8 @@ def passAnchorData():
idList = data.get("deviceList", [])
# 主播列表
acList = data.get("anchorList", [])
AiUtils.save_aclist_flat_append(acList)
# 是否需要回复
needReply = data.get("needReply", True)
# 获取打招呼数据
@@ -268,11 +275,20 @@ def getPrologueList():
return ResultData(data=Variables.prologueList).toJson()
# 添加临时数据
# 批量追加主播到 JSON 文件
@app.route("/addTempAnchorData", methods=['POST'])
def addTempAnchorData():
"""
请求体支持:
- 单个对象:{"anchorId": "xxx", "country": "CN"}
- 对象数组:[{"anchorId": "xxx", "country": "CN"}, {"anchorId": "yyy", "country": "US"}]
"""
data = request.get_json()
addModelToAnchorList(data)
return ResultData(data="").toJson()
if not data:
return ResultData(code=400, msg="请求数据为空").toJson()
# 追加到 JSON 文件
AiUtils.save_aclist_flat_append(data, "log/acList.json")
return ResultData(data="ok").toJson()
# 获取当前屏幕上的聊天信息
@@ -332,20 +348,27 @@ def upLoadLogLogs():
# 获取当前的主播列表数据
@app.route("/anchorList", methods=['POST'])
def queryAnchorList():
file_path = "log/acList.json"
data = []
for model in anchorList:
data.append(AnchorModel.modelToDict(model))
if Path(file_path).exists():
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
except Exception as e:
LogManager.error(f"[anchorList] 读取失败: {e}")
data = []
return ResultData(data=data).toJson()
# 删除主播
@app.route("/deleteAnchorWithIds", methods=['POST'])
def deleteAnchorWithIds():
ls: list[dict] = request.get_json()
for dic in ls:
for model in anchorList:
if dic.get("anchorId") == model.anchorId:
removeModelFromAnchorList(model)
return ResultData(data="").toJson()
ls: list[dict] = request.get_json() # [{"anchorId": "xxx"}, ...]
ids = [d.get("anchorId") for d in ls if d.get("anchorId")]
deleted = AiUtils.delete_anchors_by_ids(ids)
return ResultData(data={"deleted": deleted}).toJson()
if __name__ == '__main__':
app.run("0.0.0.0", port=5000, debug=True, use_reloader=False)

View File

@@ -1,7 +1,11 @@
import json
import os
import re
from pathlib import Path
import cv2
import numpy as np
import unicodedata
import wda
from Utils.LogManager import LogManager
import xml.etree.ElementTree as ET
@@ -294,7 +298,6 @@ class AiUtils(object):
print(f"btn:{btn}")
return cls.findNumber(btn.label)
@classmethod
def extract_messages_from_xml(cls, xml: str):
"""
@@ -304,7 +307,6 @@ class AiUtils(object):
root = etree.fromstring(xml.encode("utf-8"))
items = []
# 屏幕宽度
app = root.xpath('/XCUIElementTypeApplication')
screen_w = cls.parse_float(app[0], 'width', 414.0) if app else 414.0
@@ -474,3 +476,137 @@ class AiUtils(object):
# 使用正则表达式匹配中文字符
pattern = re.compile(r'[\u4e00-\u9fff]')
return bool(pattern.search(text))
@classmethod
def is_language(cls, text: str) -> bool:
if not text:
return False
for ch in text:
if unicodedata.category(ch).startswith("L"):
return True
return False
@classmethod
def _read_json_list(cls, file_path: Path) -> list:
"""读取为 list读取失败或不是 list 则返回空数组"""
if not file_path.exists():
return []
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
return data if isinstance(data, list) else []
except Exception as e:
LogManager.error(f"[acList] 读取失败,将按空数组处理: {e}")
return []
@classmethod
def _write_json_list(cls, file_path: Path, data: list) -> None:
file_path.parent.mkdir(parents=True, exist_ok=True)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
@staticmethod
def _normalize_anchor_items(items):
"""
规范化输入为 [{anchorId, country}] 的列表:
- 允许传入:单个对象、对象列表、字符串(当 anchorId 用)
- 过滤不合规项
"""
result = []
if items is None:
return result
if isinstance(items, dict):
# 单个对象
aid = items.get("anchorId")
if aid:
result.append({"anchorId": str(aid), "country": items.get("country", "")})
return result
if isinstance(items, list):
for it in items:
if isinstance(it, dict):
aid = it.get("anchorId")
if aid:
result.append({"anchorId": str(aid), "country": it.get("country", "")})
elif isinstance(it, str):
result.append({"anchorId": it, "country": ""})
return result
if isinstance(items, str):
result.append({"anchorId": items, "country": ""})
return result
# -------- 追加(对象数组平铺追加) --------
@classmethod
def save_aclist_flat_append(cls, acList, filename="log/acList.json"):
"""
将 anchor 对象数组平铺追加到 JSON 文件(数组)中。
期望 acList 形如:
[
{"anchorId": "ldn327_", "country": ""},
{"anchorId": "tianliang30", "country": ""}
]
"""
file_path = Path(filename)
data = cls._read_json_list(file_path)
# 规范化输入,确保都是 {anchorId, country}
to_add = cls._normalize_anchor_items(acList)
if not to_add:
LogManager.info("[acList] 传入为空或不合规,跳过写入")
return
data.extend(to_add)
cls._write_json_list(file_path, data)
LogManager.info(f"[acList] 已追加 {len(to_add)} 条,当前总数={len(data)} -> {file_path}")
# -------- 弹出(取一个删一个) --------
@classmethod
def pop_aclist_first(cls, filename="log/acList.json"):
"""
从 JSON 数组中取出第一个 anchor 对象,并删除它;为空或文件不存在返回 None。
返回形如:{"anchorId": "...", "country": "..."}
"""
file_path = Path(filename)
data = cls._read_json_list(file_path)
if not data:
return None
first = data.pop(0)
# 兜底保证结构
norm = cls._normalize_anchor_items(first)
first = norm[0] if norm else None
cls._write_json_list(file_path, data)
return first
@classmethod
def delete_anchors_by_ids(cls, ids: list[str], filename="log/acList.json") -> int:
"""
根据 anchorId 列表从 JSON 文件中删除匹配的 anchor。
返回删除数量。
"""
file_path = Path(filename)
if not file_path.exists():
return 0
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, list):
return 0
except Exception as e:
LogManager.error(f"[delete_anchors_by_ids] 读取失败: {e}")
return 0
before = len(data)
# 保留不在 ids 里的对象
data = [d for d in data if isinstance(d, dict) and d.get("anchorId") not in ids]
deleted = before - len(data)
try:
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
LogManager.error(f"[delete_anchors_by_ids] 写入失败: {e}")
return deleted

View File

@@ -172,3 +172,14 @@ class ControlUtils(object):
left_x = max(1, rect.x - 20)
center_y = rect.y + rect.height // 2
session.tap(left_x, center_y)
# 检测五分钟前和当前的状态是否相同
# @classmethod
# def compareCurrentWithPreviousState(cls,xml):

View File

@@ -1,4 +1,212 @@
#
# import datetime
# import io
# import logging
# import os
# import re
# import sys
# import shutil
# import zipfile
# from pathlib import Path
# import requests
#
#
# class LogManager:
# # 运行根目录:打包后取 exe 目录;源码运行取项目目录
# if getattr(sys, "frozen", False):
# projectRoot = os.path.dirname(sys.executable)
# else:
# projectRoot = os.path.dirname(os.path.dirname(__file__))
#
# logDir = os.path.join(projectRoot, "log")
# _loggers = {}
# _method_loggers = {} # 新增:缓存“设备+方法”的 logger
#
# # ---------- 工具函数 ----------
# @classmethod
# def _safe_filename(cls, name: str, max_len: int = 80) -> str:
# """
# 将方法名/udid等转成安全文件名
# - 允许字母数字、点、下划线、连字符
# - 允许常见 CJK 字符(中日韩)
# - 其他非法字符替换为下划线
# - 合并多余下划线,裁剪长度
# """
# if not name:
# return "unknown"
# name = str(name).strip()
#
# # 替换 Windows 非法字符和控制符
# name = re.sub(r'[\\/:*?"<>|\r\n\t]+', '_', name)
#
# # 只保留 ① 英数._- ② CJK 统一表意文字、日文平/片假名、韩文音节
# name = re.sub(rf'[^a-zA-Z0-9_.\-'
# r'\u4e00-\u9fff' # 中
# r'\u3040-\u30ff' # 日
# r'\uac00-\ud7a3' # 韩
# r']+', '_', name)
# # 合并多余下划线,去两端空白与下划线
# name = re.sub(r'_+', '_', name).strip(' _.')
# # 避免空
# name = name or "unknown"
# # Windows 预留名避免CON/PRN/AUX/NUL/COM1…
# if re.fullmatch(r'(?i)(CON|PRN|AUX|NUL|COM[1-9]|LPT[1-9])', name):
# name = f"_{name}"
# # 限长
# return name[:max_len] or "unknown"
#
# # ---------- 旧的:按级别写固定文件 ----------
# @classmethod
# def _setupLogger(cls, udid, name, logName, level=logging.INFO):
# """创建或获取 logger并绑定到设备目录下的固定文件info.log / warning.log / error.log"""
# deviceLogDir = os.path.join(cls.logDir, cls._safe_filename(udid))
# os.makedirs(deviceLogDir, exist_ok=True)
# logFile = os.path.join(deviceLogDir, logName)
#
# logger_name = f"{udid}_{name}"
# logger = logging.getLogger(logger_name)
# logger.setLevel(level)
#
# # 避免重复添加 handler
# if not any(
# isinstance(h, logging.FileHandler) and h.baseFilename == os.path.abspath(logFile)
# for h in logger.handlers
# ):
# fileHandler = logging.FileHandler(logFile, mode="a", encoding="utf-8")
# formatter = logging.Formatter(
# "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
# datefmt="%Y-%m-%d %H:%M:%S"
# )
# fileHandler.setFormatter(formatter)
# logger.addHandler(fileHandler)
#
# return logger
#
# @classmethod
# def info(cls, text, udid="system"):
# cls._setupLogger(udid, "infoLogger", "info.log", level=logging.INFO).info(f"[{udid}] {text}")
#
# @classmethod
# def warning(cls, text, udid="system"):
# cls._setupLogger(udid, "warningLogger", "warning.log", level=logging.WARNING).warning(f"[{udid}] {text}")
#
# @classmethod
# def error(cls, text, udid="system"):
# cls._setupLogger(udid, "errorLogger", "error.log", level=logging.ERROR).error(f"[{udid}] {text}")
#
# # ---------- 新增:按“设备+方法”分别写独立日志文件 ----------
# @classmethod
# def _setupMethodLogger(cls, udid: str, method: str, level=logging.INFO):
# """
# 为某设备的某个方法单独创建 logger
# log/<udid>/<method>.log
# """
# udid_key = cls._safe_filename(udid or "system")
# method_key = cls._safe_filename(method or "general")
# cache_key = (udid_key, method_key)
#
# # 命中缓存
# if cache_key in cls._method_loggers:
# return cls._method_loggers[cache_key]
#
# deviceLogDir = os.path.join(cls.logDir, udid_key)
# os.makedirs(deviceLogDir, exist_ok=True)
# logFile = os.path.join(deviceLogDir, f"{method_key}.log")
#
# logger_name = f"{udid_key}.{method_key}"
# logger = logging.getLogger(logger_name)
# logger.setLevel(level)
# logger.propagate = False # 避免向根 logger 传播导致控制台重复打印
#
# # 避免重复添加 handler
# if not any(
# isinstance(h, logging.FileHandler) and h.baseFilename == os.path.abspath(logFile)
# for h in logger.handlers
# ):
# fileHandler = logging.FileHandler(logFile, mode="a", encoding="utf-8")
# formatter = logging.Formatter(
# "%(asctime)s - %(levelname)s - %(name)s - %(message)s",
# datefmt="%Y-%m-%d %H:%M:%S"
# )
# fileHandler.setFormatter(formatter)
# logger.addHandler(fileHandler)
#
# cls._method_loggers[cache_key] = logger
# return logger
#
# @classmethod
# def method_info(cls, text, method, udid="system"):
# """按设备+方法写 INFO 到 log/<udid>/<method>.log"""
# cls._setupMethodLogger(udid, method, level=logging.INFO).info(f"[{udid}][{method}] {text}")
#
# @classmethod
# def method_warning(cls, text, method, udid="system"):
# cls._setupMethodLogger(udid, method, level=logging.WARNING).warning(f"[{udid}][{method}] {text}")
#
# @classmethod
# def method_error(cls, text, method, udid="system"):
# cls._setupMethodLogger(udid, method, level=logging.ERROR).error(f"[{udid}][{method}] {text}")
#
# # 清空日志
# @classmethod
# def clearLogs(cls):
# """启动时清空 log 目录下所有文件"""
#
# # 关闭所有 handler
# for name, logger in logging.Logger.manager.loggerDict.items():
# if isinstance(logger, logging.Logger):
# for handler in logger.handlers[:]:
# try:
# handler.close()
# except Exception:
# pass
# logger.removeHandler(handler)
#
# # 删除 log 目录
# log_path = Path(cls.logDir)
# if log_path.exists():
# for item in log_path.iterdir():
# if item.is_file():
# item.unlink()
# elif item.is_dir():
# shutil.rmtree(item)
#
# # 清缓存
# cls._method_loggers.clear()
#
# @classmethod
# def upload_all_logs(cls, server_url, token, userId, tenantId):
# log_path = Path(cls.logDir)
# if not log_path.exists():
# return False
#
# timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# filename = f"{timestamp}_logs.zip"
# print(filename)
# zip_buf = io.BytesIO()
# with zipfile.ZipFile(zip_buf, "w", compression=zipfile.ZIP_DEFLATED) as zf:
# for p in log_path.rglob("*"):
# if p.is_file():
# arcname = str(p.relative_to(log_path))
# zf.write(p, arcname=arcname)
#
# zip_bytes = zip_buf.getvalue()
#
# headers = {"vvtoken": token}
# data = {"tenantId": tenantId, "userId": userId}
#
#
# files = {
# "file": (filename, io.BytesIO(zip_bytes), "application/zip")
# }
#
# # 3) 上传
# resp = requests.post(server_url, headers=headers, data=data, files=files)
# if resp.json()['data']:
# return True
# return False
# -*- coding: utf-8 -*-
import datetime
import io
import logging
@@ -10,8 +218,34 @@ import zipfile
from pathlib import Path
import requests
# ========= 全局:强制 UTF-8打包 EXE / 无控制台也生效) =========
def _force_utf8_everywhere():
os.environ.setdefault("PYTHONUTF8", "1")
os.environ.setdefault("PYTHONIOENCODING", "utf-8")
# windowed 模式下 stdout/stderr 可能没有 buffer这里做保护包装
try:
if getattr(sys.stdout, "buffer", None):
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
except Exception:
pass
try:
if getattr(sys.stderr, "buffer", None):
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
except Exception:
pass
_force_utf8_everywhere()
# ===========================================================
class LogManager:
"""
设备级与“设备+方法”级日志管理:
- log/<udid>/info.log | warning.log | error.log
- log/<udid>/<method>.log
- 文件统一 UTF-8 编码,避免 GBK/CP936 导致的 emoji 报错
- 提供 clearLogs() 与 upload_all_logs()
"""
# 运行根目录:打包后取 exe 目录;源码运行取项目目录
if getattr(sys, "frozen", False):
projectRoot = os.path.dirname(sys.executable)
@@ -19,95 +253,111 @@ class LogManager:
projectRoot = os.path.dirname(os.path.dirname(__file__))
logDir = os.path.join(projectRoot, "log")
_loggers = {}
_method_loggers = {} # 新增:缓存“设备+方法”的 logger
_method_loggers = {} # 缓存“设备+方法”的 logger
# ---------- 工具:安全文本/文件名 ----------
@staticmethod
def _safe_text(obj) -> str:
"""把任意对象安全转为可写字符串(避免因编码问题再次抛异常)"""
try:
if isinstance(obj, bytes):
return obj.decode("utf-8", "replace")
s = str(obj)
# 确保解码上屏不再出错
_ = s.encode("utf-8", "replace")
return s
except Exception:
try:
return repr(obj)
except Exception:
return "<unprintable>"
# ---------- 工具函数 ----------
@classmethod
def _safe_filename(cls, name: str, max_len: int = 80) -> str:
"""
将方法名/udid等转成安全文件名
- 允许字母数字、点、下划线、连字符
- 允许常见 CJK 字符(中日韩)
- 其他非法字符替换为下划线
- 合并多余下划线,裁剪长度
- 保留常见 CJK 字符(中日韩)
- 其替换为下划线;合并下划线;避免保留名;限长
"""
if not name:
return "unknown"
name = str(name).strip()
# 替换 Windows 非法字符和控制符
name = re.sub(r'[\\/:*?"<>|\r\n\t]+', '_', name)
# 只保留 ① 英数._- ② CJK 统一表意文字、日文平/片假名、韩文音节
name = re.sub(rf'[^a-zA-Z0-9_.\-'
r'\u4e00-\u9fff' # 中
r'\u3040-\u30ff' # 日
r'\uac00-\ud7a3' # 韩
r']+', '_', name)
# 合并多余下划线,去两端空白与下划线
name = re.sub(r'[\\/:*?"<>|\r\n\t]+', '_', name) # Windows 非法字符
name = re.sub(
r'[^a-zA-Z0-9_.\-'
r'\u4e00-\u9fff' # 中
r'\u3040-\u30ff' # 日
r'\uac00-\ud7a3' # 韩
r']+', '_', name
)
name = re.sub(r'_+', '_', name).strip(' _.')
# 避免空
name = name or "unknown"
# Windows 预留名避免CON/PRN/AUX/NUL/COM1…
if re.fullmatch(r'(?i)(CON|PRN|AUX|NUL|COM[1-9]|LPT[1-9])', name):
name = f"_{name}"
# 限长
return name[:max_len] or "unknown"
# ---------- 旧的:按级别写固定文件 ----------
# ---------- 设备级固定文件info/warning/error ----------
@classmethod
def _setupLogger(cls, udid, name, logName, level=logging.INFO):
"""创建或获取 logger并绑定到设备目录下的固定文件info.log / warning.log / error.log"""
deviceLogDir = os.path.join(cls.logDir, cls._safe_filename(udid))
udid_key = cls._safe_filename(udid or "system")
deviceLogDir = os.path.join(cls.logDir, udid_key)
os.makedirs(deviceLogDir, exist_ok=True)
logFile = os.path.join(deviceLogDir, logName)
logger_name = f"{udid}_{name}"
logger_name = f"{udid_key}_{name}"
logger = logging.getLogger(logger_name)
logger.setLevel(level)
logger.propagate = False # 不向根 logger 传播,避免重复
# 避免重复添加 handler
if not any(
isinstance(h, logging.FileHandler) and h.baseFilename == os.path.abspath(logFile)
for h in logger.handlers
):
fileHandler = logging.FileHandler(logFile, mode="a", encoding="utf-8")
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
fileHandler.setFormatter(formatter)
logger.addHandler(fileHandler)
# 避免重复添加同一路径的 file handler
abs_target = os.path.abspath(logFile)
for h in logger.handlers:
if isinstance(h, logging.FileHandler) and getattr(h, "baseFilename", "") == abs_target:
return logger
fileHandler = logging.FileHandler(logFile, mode="a", encoding="utf-8")
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
fileHandler.setFormatter(formatter)
logger.addHandler(fileHandler)
return logger
@classmethod
def info(cls, text, udid="system"):
cls._setupLogger(udid, "infoLogger", "info.log", level=logging.INFO).info(f"[{udid}] {text}")
msg = cls._safe_text(f"[{udid}] {text}")
cls._setupLogger(udid, "infoLogger", "info.log", level=logging.INFO).info(msg)
@classmethod
def warning(cls, text, udid="system"):
cls._setupLogger(udid, "warningLogger", "warning.log", level=logging.WARNING).warning(f"[{udid}] {text}")
msg = cls._safe_text(f"[{udid}] {text}")
cls._setupLogger(udid, "warningLogger", "warning.log", level=logging.WARNING).warning(msg)
@classmethod
def error(cls, text, udid="system"):
cls._setupLogger(udid, "errorLogger", "error.log", level=logging.ERROR).error(f"[{udid}] {text}")
msg = cls._safe_text(f"[{udid}] {text}")
cls._setupLogger(udid, "errorLogger", "error.log", level=logging.ERROR).error(msg)
# ---------- “设备+方法”独立文件:<udid>/<method>.log ----------
# ---------- 新增:按“设备+方法”分别写独立日志文件 ----------
@classmethod
def _setupMethodLogger(cls, udid: str, method: str, level=logging.INFO):
"""
为某设备的某个方法单独创建 logger
log/<udid>/<method>.log
为某设备的某个方法单独创建 loggerlog/<udid>/<method>.log
"""
udid_key = cls._safe_filename(udid or "system")
method_key = cls._safe_filename(method or "general")
cache_key = (udid_key, method_key)
# 命中缓存
if cache_key in cls._method_loggers:
return cls._method_loggers[cache_key]
logger = cls._method_loggers.get(cache_key)
if logger:
return logger
deviceLogDir = os.path.join(cls.logDir, udid_key)
os.makedirs(deviceLogDir, exist_ok=True)
@@ -116,92 +366,121 @@ class LogManager:
logger_name = f"{udid_key}.{method_key}"
logger = logging.getLogger(logger_name)
logger.setLevel(level)
logger.propagate = False # 避免向根 logger 传播导致控制台重复打印
logger.propagate = False
# 避免重复添加 handler
if not any(
isinstance(h, logging.FileHandler) and h.baseFilename == os.path.abspath(logFile)
for h in logger.handlers
):
fileHandler = logging.FileHandler(logFile, mode="a", encoding="utf-8")
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
fileHandler.setFormatter(formatter)
logger.addHandler(fileHandler)
abs_target = os.path.abspath(logFile)
for h in logger.handlers:
if isinstance(h, logging.FileHandler) and getattr(h, "baseFilename", "") == abs_target:
cls._method_loggers[cache_key] = logger
return logger
fileHandler = logging.FileHandler(logFile, mode="a", encoding="utf-8")
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
fileHandler.setFormatter(formatter)
logger.addHandler(fileHandler)
cls._method_loggers[cache_key] = logger
return logger
@classmethod
def method_info(cls, text, method, udid="system"):
"""按设备+方法写 INFO 到 log/<udid>/<method>.log"""
cls._setupMethodLogger(udid, method, level=logging.INFO).info(f"[{udid}][{method}] {text}")
msg = cls._safe_text(f"[{udid}][{method}] {text}")
cls._setupMethodLogger(udid, method, level=logging.INFO).info(msg)
@classmethod
def method_warning(cls, text, method, udid="system"):
cls._setupMethodLogger(udid, method, level=logging.WARNING).warning(f"[{udid}][{method}] {text}")
msg = cls._safe_text(f"[{udid}][{method}] {text}")
cls._setupMethodLogger(udid, method, level=logging.WARNING).warning(msg)
@classmethod
def method_error(cls, text, method, udid="system"):
cls._setupMethodLogger(udid, method, level=logging.ERROR).error(f"[{udid}][{method}] {text}")
msg = cls._safe_text(f"[{udid}][{method}] {text}")
cls._setupMethodLogger(udid, method, level=logging.ERROR).error(msg)
# ---------- 清空日志 ----------
# 清空日志
@classmethod
def clearLogs(cls):
"""启动时清空 log 目录下所有文件"""
# 关闭所有 handler
for name, logger in logging.Logger.manager.loggerDict.items():
# 先关闭所有 logger 的文件句柄
for _, logger in logging.Logger.manager.loggerDict.items():
if isinstance(logger, logging.Logger):
for handler in logger.handlers[:]:
for handler in list(logger.handlers):
try:
handler.close()
except Exception:
pass
logger.removeHandler(handler)
# 删除 log 目录
log_path = Path(cls.logDir)
if log_path.exists():
for item in log_path.iterdir():
if item.is_file():
item.unlink()
elif item.is_dir():
shutil.rmtree(item)
try:
if item.is_file():
item.unlink(missing_ok=True) # py>=3.8
elif item.is_dir():
shutil.rmtree(item, ignore_errors=True)
except Exception:
# 不阻塞清理
pass
# 清缓存
cls._method_loggers.clear()
# ---------- 上传所有日志(内存打包 zip ----------
@classmethod
def upload_all_logs(cls, server_url, token, userId, tenantId):
log_path = Path(cls.logDir)
if not log_path.exists():
"""
将 log/ 目录下所有日志打包为 zip内存上传至服务器
- headers: {"vvtoken": <token>}
- form: {"tenantId": <tenantId>, "userId": <userId>, "file": <zip>}
返回 True/False
"""
try:
log_path = Path(cls.logDir)
if not log_path.exists():
logging.info("[upload_all_logs] 日志目录不存在:%s", log_path)
return False
# 文件名仅用于表单,不落盘,可包含安全字符
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{timestamp}_logs.zip"
logging.info("[upload_all_logs] 打包文件名:%s", filename)
# 1) 内存中打包 zip
zip_buf = io.BytesIO()
with zipfile.ZipFile(zip_buf, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for p in log_path.rglob("*"):
if p.is_file():
arcname = str(p.relative_to(log_path))
zf.write(p, arcname=arcname)
zip_bytes = zip_buf.getvalue()
# 2) 组织请求
headers = {"vvtoken": token} if token else {}
data = {"tenantId": tenantId, "userId": userId}
files = {"file": (filename, io.BytesIO(zip_bytes), "application/zip")}
# 3) 上传
resp = requests.post(server_url, headers=headers, data=data, files=files, timeout=120)
ok = False
try:
js = resp.json()
ok = bool(js.get("data"))
except Exception:
# 响应不是 JSON也许是纯文本降级按状态码判断
ok = resp.ok
if ok:
logging.info("[upload_all_logs] 上传成功:%s", server_url)
return True
else:
logging.error("[upload_all_logs] 上传失败status=%s, text=%s", resp.status_code, LogManager._safe_text(resp.text))
return False
except Exception as e:
logging.error("[upload_all_logs] 异常:%s", LogManager._safe_text(e))
return False
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
filename = f"{timestamp}_logs.zip"
print(filename)
zip_buf = io.BytesIO()
with zipfile.ZipFile(zip_buf, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for p in log_path.rglob("*"):
if p.is_file():
arcname = str(p.relative_to(log_path))
zf.write(p, arcname=arcname)
zip_bytes = zip_buf.getvalue()
headers = {"vvtoken": token}
data = {"tenantId": tenantId, "userId": userId}
files = {
"file": (filename, io.BytesIO(zip_bytes), "application/zip")
}
# 3) 上传
resp = requests.post(server_url, headers=headers, data=data, files=files)
if resp.json()['data']:
return True
return False

View File

@@ -2,7 +2,6 @@ import requests
from Entity.Variables import prologueList
BaseUrl = "https://crawlclient.api.yolozs.com/api/common/"
# BaseUrl = "http://192.168.1.174:8101/api/common/"
class Requester():
@@ -21,13 +20,12 @@ class Requester():
for i in data:
prologueList.append(i)
# 翻译
@classmethod
def translation(cla, msg, country="英国"):
def translation(cls, msg, country="英国"):
parame = {
"msg":msg,
"country":country,
"msg": msg,
"country": country,
}
url = "http://ai.zhukeping.com/translation"
result = requests.request(url=url, json=parame, method="POST")
@@ -43,4 +41,4 @@ class Requester():
result = requests.request(url=url, json=param, method="POST")
json = result.json()
data = json.get("data", {})
return data
return data

View File

@@ -284,6 +284,8 @@ class ScriptManager():
time.sleep(3)
LogManager.method_error("greetNewFollowers 重试次数耗尽,任务终止", "关注打招呼", udid)
# 关注打招呼以及回复主播消息
def greetNewFollowers(self, udid, needReply, event):
@@ -319,12 +321,15 @@ class ScriptManager():
print("总循环条件", not event.is_set() and len(anchorList) > 0)
# 循环条件。1、 循环关闭 2、 数据处理完毕
while not event.is_set() and len(anchorList) > 0:
while not event.is_set():
# 获取一个主播,并删除
anchor = anchorList.pop(0)
aid = anchor.anchorId
anchorCountry = anchor.country
anchor = AiUtils.pop_aclist_first()
if not anchor:
break
aid = anchor["anchorId"]
anchorCountry = anchor.get("country", "")
# 点击搜索按钮
ControlUtils.clickSearch(session)
@@ -462,10 +467,13 @@ class ScriptManager():
LogManager.method_info("找到输入框了, 准备发送一条打招呼消息", "关注打招呼", udid)
print("打招呼的数据", ev.prologueList)
LogManager.method_info(f"打招呼的数据:{ev.prologueList}", "关注打招呼", udid)
# 准备打招呼的文案
text = random.choice(ev.prologueList)
LogManager.method_info(f"打招呼完成", "关注打招呼", udid)
isContainChniese = AiUtils.contains_chinese(text)
if isContainChniese:
@@ -473,6 +481,8 @@ class ScriptManager():
msg = Requester.translation(text, anchorCountry)
else:
msg = text
LogManager.method_info(f"翻译后的私信数据:{msg}", "关注打招呼", udid)
# 准备发送一条信息
chatInput.click()
@@ -660,9 +670,15 @@ class ScriptManager():
'What do you think makes my streams special?',
'Do you think Im one of the most engaging streamers youve seen?']
last_msg_text = next((item['text'] for item in reversed(msgs) if item['type'] == 'msg'),
last_msg = next((item['text'] for item in reversed(msgs) if item['type'] == 'msg'),
random.choice(text_list))
isLanguage = AiUtils.is_language(last_msg)
if isLanguage:
last_msg_text = last_msg
else:
last_msg_text = random.choice(text_list)
# 向ai发送信息
# 获取主播的名称

View File

@@ -1,4 +1,3 @@
# tidevice 打包入口文件
from tidevice.__main__ import main
if __name__ == '__main__':
main()