Files
iOSAI/Utils/LogManager.py
2025-11-19 20:47:28 +08:00

278 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import datetime
import io
import logging
import os
import re
import sys
import shutil
import zipfile
from pathlib import Path
import requests
# ========= 全局:强制 UTF-8打包 EXE / 无控制台也生效) =========
def _force_utf8_everywhere():
os.environ.setdefault("PYTHONUTF8", "1")
os.environ.setdefault("PYTHONIOENCODING", "utf-8")
# windowed 模式下 stdout/stderr 可能没有 buffer这里做保护包装
try:
if getattr(sys.stdout, "buffer", None):
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
except Exception:
pass
try:
if getattr(sys.stderr, "buffer", None):
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
except Exception:
pass
_force_utf8_everywhere()
class LogManager:
"""
设备级与“设备+方法”级日志管理:
- log/<udid>/info.log | warning.log | error.log
- log/<udid>/<method>.log
- 文件统一 UTF-8 编码,避免 GBK/CP936 导致的 emoji 报错
- 提供 clearLogs() 与 upload_all_logs()
"""
# 运行根目录:打包后取 exe 目录;源码运行取项目目录
if getattr(sys, "frozen", False):
projectRoot = os.path.dirname(sys.executable)
else:
projectRoot = os.path.dirname(os.path.dirname(__file__))
logDir = os.path.join(projectRoot, "log")
_method_loggers = {} # 缓存“设备+方法”的 logger
# ---------- 工具:安全文本/文件名 ----------
@staticmethod
def _safe_text(obj) -> str:
"""把任意对象安全转为可写字符串(避免因编码问题再次抛异常)"""
try:
if isinstance(obj, bytes):
return obj.decode("utf-8", "replace")
s = str(obj)
# 确保解码上屏不再出错
_ = s.encode("utf-8", "replace")
return s
except Exception:
try:
return repr(obj)
except Exception:
return "<unprintable>"
@classmethod
def _safe_filename(cls, name: str, max_len: int = 80) -> str:
"""
将方法名/udid等转成安全文件名
- 允许字母数字、点、下划线、连字符
- 保留常见 CJK 字符(中日韩)
- 其余替换为下划线;合并下划线;避免保留名;限长
"""
if not name:
return "unknown"
name = str(name).strip()
name = re.sub(r'[\\/:*?"<>|\r\n\t]+', '_', name) # Windows 非法字符
name = re.sub(
r'[^a-zA-Z0-9_.\-'
r'\u4e00-\u9fff' # 中
r'\u3040-\u30ff' # 日
r'\uac00-\ud7a3' # 韩
r']+', '_', name
)
name = re.sub(r'_+', '_', name).strip(' _.')
name = name or "unknown"
if re.fullmatch(r'(?i)(CON|PRN|AUX|NUL|COM[1-9]|LPT[1-9])', name):
name = f"_{name}"
return name[:max_len] or "unknown"
# ---------- 设备级固定文件info/warning/error ----------
@classmethod
def _setupLogger(cls, udid, name, logName, level=logging.INFO):
"""创建或获取 logger并绑定到设备目录下的固定文件info.log / warning.log / error.log"""
udid_key = cls._safe_filename(udid or "system")
deviceLogDir = os.path.join(cls.logDir, udid_key)
os.makedirs(deviceLogDir, exist_ok=True)
logFile = os.path.join(deviceLogDir, logName)
logger_name = f"{udid_key}_{name}"
logger = logging.getLogger(logger_name)
logger.setLevel(level)
logger.propagate = False # 不向根 logger 传播,避免重复
# 避免重复添加同一路径的 file handler
abs_target = os.path.abspath(logFile)
for h in logger.handlers:
if isinstance(h, logging.FileHandler) and getattr(h, "baseFilename", "") == abs_target:
return logger
fileHandler = logging.FileHandler(logFile, mode="a", encoding="utf-8")
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
fileHandler.setFormatter(formatter)
logger.addHandler(fileHandler)
return logger
@classmethod
def info(cls, text, udid="system"):
msg = cls._safe_text(f"[{udid}] {text}")
cls._setupLogger(udid, "infoLogger", "info.log", level=logging.INFO).info(msg)
@classmethod
def warning(cls, text, udid="system"):
msg = cls._safe_text(f"[{udid}] {text}")
cls._setupLogger(udid, "warningLogger", "warning.log", level=logging.WARNING).warning(msg)
@classmethod
def error(cls, text, udid="system"):
msg = cls._safe_text(f"[{udid}] {text}")
cls._setupLogger(udid, "errorLogger", "error.log", level=logging.ERROR).error(msg)
# ---------- “设备+方法”独立文件:<udid>/<method>.log ----------
@classmethod
def _setupMethodLogger(cls, udid: str, method: str, level=logging.INFO):
"""
为某设备的某个方法单独创建 loggerlog/<udid>/<method>.log
"""
udid_key = cls._safe_filename(udid or "system")
method_key = cls._safe_filename(method or "general")
cache_key = (udid_key, method_key)
# 命中缓存
logger = cls._method_loggers.get(cache_key)
if logger:
return logger
deviceLogDir = os.path.join(cls.logDir, udid_key)
os.makedirs(deviceLogDir, exist_ok=True)
logFile = os.path.join(deviceLogDir, f"{method_key}.log")
logger_name = f"{udid_key}.{method_key}"
logger = logging.getLogger(logger_name)
logger.setLevel(level)
logger.propagate = False
abs_target = os.path.abspath(logFile)
for h in logger.handlers:
if isinstance(h, logging.FileHandler) and getattr(h, "baseFilename", "") == abs_target:
cls._method_loggers[cache_key] = logger
return logger
fileHandler = logging.FileHandler(logFile, mode="a", encoding="utf-8")
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
fileHandler.setFormatter(formatter)
logger.addHandler(fileHandler)
cls._method_loggers[cache_key] = logger
return logger
@classmethod
def method_info(cls, text, method, udid="system"):
msg = cls._safe_text(f"[{udid}][{method}] {text}")
cls._setupMethodLogger(udid, method, level=logging.INFO).info(msg)
@classmethod
def method_warning(cls, text, method, udid="system"):
msg = cls._safe_text(f"[{udid}][{method}] {text}")
cls._setupMethodLogger(udid, method, level=logging.WARNING).warning(msg)
@classmethod
def method_error(cls, text, method, udid="system"):
msg = cls._safe_text(f"[{udid}][{method}] {text}")
cls._setupMethodLogger(udid, method, level=logging.ERROR).error(msg)
# ---------- 清空日志 ----------
@classmethod
def clearLogs(cls):
print("清空日志")
"""启动时清空 log 目录下所有文件"""
# 先关闭所有 logger 的文件句柄
for _, logger in logging.Logger.manager.loggerDict.items():
if isinstance(logger, logging.Logger):
for handler in list(logger.handlers):
try:
handler.close()
except Exception:
pass
logger.removeHandler(handler)
log_path = Path(cls.logDir)
if log_path.exists():
for item in log_path.iterdir():
try:
if item.is_file():
item.unlink(missing_ok=True) # py>=3.8
elif item.is_dir():
shutil.rmtree(item, ignore_errors=True)
except Exception:
# 不阻塞清理
pass
cls._method_loggers.clear()
# ---------- 上传所有日志(内存打包 zip ----------
@classmethod
def upload_all_logs(cls, server_url, token, userId, tenantId):
"""
将 log/ 目录下所有日志打包为 zip内存上传至服务器
- headers: {"vvtoken": <token>}
- form: {"tenantId": <tenantId>, "userId": <userId>, "file": <zip>}
返回 True/False
"""
try:
log_path = Path(cls.logDir)
if not log_path.exists():
logging.info("[upload_all_logs] 日志目录不存在:%s", log_path)
return False
# 文件名仅用于表单,不落盘,可包含安全字符
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{timestamp}_logs.zip"
logging.info("[upload_all_logs] 打包文件名:%s", filename)
# 1) 内存中打包 zip
zip_buf = io.BytesIO()
with zipfile.ZipFile(zip_buf, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for p in log_path.rglob("*"):
if p.is_file():
arcname = str(p.relative_to(log_path))
zf.write(p, arcname=arcname)
zip_bytes = zip_buf.getvalue()
# 2) 组织请求
headers = {"vvtoken": token} if token else {}
data = {"tenantId": tenantId, "userId": userId}
files = {"file": (filename, io.BytesIO(zip_bytes), "application/zip")}
# 3) 上传
resp = requests.post(server_url, headers=headers, data=data, files=files, timeout=120)
ok = False
try:
js = resp.json()
ok = bool(js.get("data"))
except Exception:
# 响应不是 JSON也许是纯文本降级按状态码判断
ok = resp.ok
if ok:
logging.info("[upload_all_logs] 上传成功:%s", server_url)
return True
else:
logging.error("[upload_all_logs] 上传失败status=%s, text=%s", resp.status_code, LogManager._safe_text(resp.text))
return False
except Exception as e:
logging.error("[upload_all_logs] 异常:%s", LogManager._safe_text(e))
return False