Files
iOSAI/Utils/LogManager.py

306 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# import logging
# import os
# import sys
# import shutil
# from pathlib import Path
#
#
# class LogManager:
# # 运行根目录:打包后取 exe 目录;源码运行取项目目录
# if getattr(sys, "frozen", False):
# projectRoot = os.path.dirname(sys.executable)
# else:
# projectRoot = os.path.dirname(os.path.dirname(__file__))
#
# logDir = os.path.join(projectRoot, "log")
# _loggers = {}
#
# @classmethod
# def _setupLogger(cls, udid, name, logName, level=logging.INFO):
# """创建或获取 logger并绑定到文件"""
# deviceLogDir = os.path.join(cls.logDir, udid)
# os.makedirs(deviceLogDir, exist_ok=True)
# logFile = os.path.join(deviceLogDir, logName)
#
# logger_name = f"{udid}_{name}"
# logger = logging.getLogger(logger_name)
# logger.setLevel(level)
#
# # 避免重复添加 handler
# if not any(
# isinstance(h, logging.FileHandler) and h.baseFilename == os.path.abspath(logFile)
# for h in logger.handlers
# ):
# fileHandler = logging.FileHandler(logFile, mode="a", encoding="utf-8")
# formatter = logging.Formatter(
# "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
# datefmt="%Y-%m-%d %H:%M:%S"
# )
# fileHandler.setFormatter(formatter)
# logger.addHandler(fileHandler)
#
# return logger
#
# @classmethod
# def info(cls, text, udid="system"):
# cls._setupLogger(udid, "infoLogger", "info.log", level=logging.INFO).info(f"[{udid}] {text}")
#
# @classmethod
# def warning(cls, text, udid="system"):
# cls._setupLogger(udid, "warningLogger", "warning.log", level=logging.WARNING).warning(f"[{udid}] {text}")
#
# @classmethod
# def error(cls, text, udid="system"):
# cls._setupLogger(udid, "errorLogger", "error.log", level=logging.ERROR).error(f"[{udid}] {text}")
#
# # 清空日志
# @classmethod
# def clearLogs(cls):
# """启动时清空 log 目录下所有文件"""
# print("开始清空日志...")
#
# # 关闭所有 handler
# for name, logger in logging.Logger.manager.loggerDict.items():
# if isinstance(logger, logging.Logger):
# for handler in logger.handlers[:]:
# try:
# handler.close()
# except Exception:
# pass
# logger.removeHandler(handler)
#
# # 仅删除目录里的所有文件和子目录
# log_path = Path(cls.logDir)
# if log_path.exists():
# for item in log_path.iterdir():
# if item.is_file():
# item.unlink()
# elif item.is_dir():
# import shutil
# shutil.rmtree(item)
#
# print("日志清空完成")
import datetime
import io
import logging
import os
import re
import sys
import shutil
import zipfile
from pathlib import Path
import requests
import Entity.Variables
class LogManager:
# 运行根目录:打包后取 exe 目录;源码运行取项目目录
if getattr(sys, "frozen", False):
projectRoot = os.path.dirname(sys.executable)
else:
projectRoot = os.path.dirname(os.path.dirname(__file__))
logDir = os.path.join(projectRoot, "log")
_loggers = {}
_method_loggers = {} # 新增:缓存“设备+方法”的 logger
# ---------- 工具函数 ----------
@classmethod
def _safe_filename(cls, name: str, max_len: int = 80) -> str:
"""
将方法名/udid等转成安全文件名
- 允许字母数字、点、下划线、连字符
- 允许常见 CJK 字符(中日韩)
- 其他非法字符替换为下划线
- 合并多余下划线,裁剪长度
"""
if not name:
return "unknown"
name = str(name).strip()
# 替换 Windows 非法字符和控制符
name = re.sub(r'[\\/:*?"<>|\r\n\t]+', '_', name)
# 只保留 ① 英数._- ② CJK 统一表意文字、日文平/片假名、韩文音节
name = re.sub(rf'[^a-zA-Z0-9_.\-'
r'\u4e00-\u9fff' # 中
r'\u3040-\u30ff' # 日
r'\uac00-\ud7a3' # 韩
r']+', '_', name)
# 合并多余下划线,去两端空白与下划线
name = re.sub(r'_+', '_', name).strip(' _.')
# 避免空
name = name or "unknown"
# Windows 预留名避免CON/PRN/AUX/NUL/COM1…
if re.fullmatch(r'(?i)(CON|PRN|AUX|NUL|COM[1-9]|LPT[1-9])', name):
name = f"_{name}"
# 限长
return name[:max_len] or "unknown"
# ---------- 旧的:按级别写固定文件 ----------
@classmethod
def _setupLogger(cls, udid, name, logName, level=logging.INFO):
"""创建或获取 logger并绑定到设备目录下的固定文件info.log / warning.log / error.log"""
deviceLogDir = os.path.join(cls.logDir, cls._safe_filename(udid))
os.makedirs(deviceLogDir, exist_ok=True)
logFile = os.path.join(deviceLogDir, logName)
logger_name = f"{udid}_{name}"
logger = logging.getLogger(logger_name)
logger.setLevel(level)
# 避免重复添加 handler
if not any(
isinstance(h, logging.FileHandler) and h.baseFilename == os.path.abspath(logFile)
for h in logger.handlers
):
fileHandler = logging.FileHandler(logFile, mode="a", encoding="utf-8")
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
fileHandler.setFormatter(formatter)
logger.addHandler(fileHandler)
return logger
@classmethod
def info(cls, text, udid="system"):
cls._setupLogger(udid, "infoLogger", "info.log", level=logging.INFO).info(f"[{udid}] {text}")
@classmethod
def warning(cls, text, udid="system"):
cls._setupLogger(udid, "warningLogger", "warning.log", level=logging.WARNING).warning(f"[{udid}] {text}")
@classmethod
def error(cls, text, udid="system"):
cls._setupLogger(udid, "errorLogger", "error.log", level=logging.ERROR).error(f"[{udid}] {text}")
# ---------- 新增:按“设备+方法”分别写独立日志文件 ----------
@classmethod
def _setupMethodLogger(cls, udid: str, method: str, level=logging.INFO):
"""
为某设备的某个方法单独创建 logger
log/<udid>/<method>.log
"""
udid_key = cls._safe_filename(udid or "system")
method_key = cls._safe_filename(method or "general")
cache_key = (udid_key, method_key)
# 命中缓存
if cache_key in cls._method_loggers:
return cls._method_loggers[cache_key]
deviceLogDir = os.path.join(cls.logDir, udid_key)
os.makedirs(deviceLogDir, exist_ok=True)
logFile = os.path.join(deviceLogDir, f"{method_key}.log")
logger_name = f"{udid_key}.{method_key}"
logger = logging.getLogger(logger_name)
logger.setLevel(level)
logger.propagate = False # 避免向根 logger 传播导致控制台重复打印
# 避免重复添加 handler
if not any(
isinstance(h, logging.FileHandler) and h.baseFilename == os.path.abspath(logFile)
for h in logger.handlers
):
fileHandler = logging.FileHandler(logFile, mode="a", encoding="utf-8")
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
fileHandler.setFormatter(formatter)
logger.addHandler(fileHandler)
cls._method_loggers[cache_key] = logger
return logger
@classmethod
def method_info(cls, text, method, udid="system"):
"""按设备+方法写 INFO 到 log/<udid>/<method>.log"""
cls._setupMethodLogger(udid, method, level=logging.INFO).info(f"[{udid}][{method}] {text}")
@classmethod
def method_warning(cls, text, method, udid="system"):
cls._setupMethodLogger(udid, method, level=logging.WARNING).warning(f"[{udid}][{method}] {text}")
@classmethod
def method_error(cls, text, method, udid="system"):
cls._setupMethodLogger(udid, method, level=logging.ERROR).error(f"[{udid}][{method}] {text}")
# 清空日志
@classmethod
def clearLogs(cls):
"""启动时清空 log 目录下所有文件"""
print("开始清空日志...")
# 关闭所有 handler
for name, logger in logging.Logger.manager.loggerDict.items():
if isinstance(logger, logging.Logger):
for handler in logger.handlers[:]:
try:
handler.close()
except Exception:
pass
logger.removeHandler(handler)
# 删除 log 目录
log_path = Path(cls.logDir)
if log_path.exists():
for item in log_path.iterdir():
if item.is_file():
item.unlink()
elif item.is_dir():
shutil.rmtree(item)
# 清缓存
cls._method_loggers.clear()
print("日志清空完成")
@classmethod
def upload_all_logs(cls, server_url, token, userId, tenantId):
log_path = Path(cls.logDir)
if not log_path.exists():
print("[upload_all_logs] 日志目录不存在")
return False
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{timestamp}_logs.zip"
zip_buf = io.BytesIO()
with zipfile.ZipFile(zip_buf, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for p in log_path.rglob("*"):
if p.is_file():
arcname = str(p.relative_to(log_path))
zf.write(p, arcname=arcname)
zip_bytes = zip_buf.getvalue()
headers = {"vvtoken": token}
data = {"tenantId": tenantId, "userId": userId}
files = {
"file": (filename, io.BytesIO(zip_bytes), "application/zip")
}
print(
f"[upload_all_logs] "
f"server={server_url} (type={type(server_url)}) "
f"tenantId={tenantId} (type={type(tenantId)}) "
f"userId={userId} (type={type(userId)}) "
f"token={token} (type={type(token)})"
)
print("开始上传")
# 3) 上传
resp = requests.post(server_url, headers=headers, data=data, files=files)
print(resp.text)
print("上传结束")
if resp.json()['data']:
return True
return False