# -*- coding: utf-8 -*- import datetime import io import logging import os import re import sys import shutil import zipfile from pathlib import Path import requests # ========= 全局:强制 UTF-8(打包 EXE / 无控制台也生效) ========= def _force_utf8_everywhere(): os.environ.setdefault("PYTHONUTF8", "1") os.environ.setdefault("PYTHONIOENCODING", "utf-8") # windowed 模式下 stdout/stderr 可能没有 buffer,这里做保护包装 try: if getattr(sys.stdout, "buffer", None): sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace") except Exception: pass try: if getattr(sys.stderr, "buffer", None): sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace") except Exception: pass # _force_utf8_everywhere() class LogManager: """ 设备级与“设备+方法”级日志管理: - log//info.log | warning.log | error.log - log//.log - 文件统一 UTF-8 编码,避免 GBK/CP936 导致的 emoji 报错 - 提供 clearLogs() 与 upload_all_logs() """ # 运行根目录:打包后取 exe 目录;源码运行取项目目录 if getattr(sys, "frozen", False): projectRoot = os.path.dirname(sys.executable) else: projectRoot = os.path.dirname(os.path.dirname(__file__)) logDir = os.path.join(projectRoot, "log") _method_loggers = {} # 缓存“设备+方法”的 logger # ---------- 工具:安全文本/文件名 ---------- @staticmethod def _safe_text(obj) -> str: """把任意对象安全转为可写字符串(避免因编码问题再次抛异常)""" try: if isinstance(obj, bytes): return obj.decode("utf-8", "replace") s = str(obj) # 确保解码上屏不再出错 _ = s.encode("utf-8", "replace") return s except Exception: try: return repr(obj) except Exception: return "" @classmethod def _safe_filename(cls, name: str, max_len: int = 80) -> str: """ 将方法名/udid等转成安全文件名: - 允许字母数字、点、下划线、连字符 - 保留常见 CJK 字符(中日韩) - 其余替换为下划线;合并下划线;避免保留名;限长 """ if not name: return "unknown" name = str(name).strip() name = re.sub(r'[\\/:*?"<>|\r\n\t]+', '_', name) # Windows 非法字符 name = re.sub( r'[^a-zA-Z0-9_.\-' r'\u4e00-\u9fff' # 中 r'\u3040-\u30ff' # 日 r'\uac00-\ud7a3' # 韩 r']+', '_', name ) name = re.sub(r'_+', '_', name).strip(' _.') name = name or "unknown" if re.fullmatch(r'(?i)(CON|PRN|AUX|NUL|COM[1-9]|LPT[1-9])', name): name = f"_{name}" return name[:max_len] or "unknown" # ---------- 设备级固定文件:info/warning/error ---------- @classmethod def _setupLogger(cls, udid, name, logName, level=logging.INFO): """创建或获取 logger,并绑定到设备目录下的固定文件(info.log / warning.log / error.log)""" udid_key = cls._safe_filename(udid or "system") deviceLogDir = os.path.join(cls.logDir, udid_key) os.makedirs(deviceLogDir, exist_ok=True) logFile = os.path.join(deviceLogDir, logName) logger_name = f"{udid_key}_{name}" logger = logging.getLogger(logger_name) logger.setLevel(level) logger.propagate = False # 不向根 logger 传播,避免重复 # 避免重复添加同一路径的 file handler abs_target = os.path.abspath(logFile) for h in logger.handlers: if isinstance(h, logging.FileHandler) and getattr(h, "baseFilename", "") == abs_target: return logger fileHandler = logging.FileHandler(logFile, mode="a", encoding="utf-8") formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) fileHandler.setFormatter(formatter) logger.addHandler(fileHandler) return logger @classmethod def info(cls, text, udid="system"): msg = cls._safe_text(f"[{udid}] {text}") cls._setupLogger(udid, "infoLogger", "info.log", level=logging.INFO).info(msg) @classmethod def warning(cls, text, udid="system"): msg = cls._safe_text(f"[{udid}] {text}") cls._setupLogger(udid, "warningLogger", "warning.log", level=logging.WARNING).warning(msg) @classmethod def error(cls, text, udid="system"): msg = cls._safe_text(f"[{udid}] {text}") cls._setupLogger(udid, "errorLogger", "error.log", level=logging.ERROR).error(msg) # ---------- “设备+方法”独立文件:/.log ---------- @classmethod def _setupMethodLogger(cls, udid: str, method: str, level=logging.INFO): """ 为某设备的某个方法单独创建 logger:log//.log """ udid_key = cls._safe_filename(udid or "system") method_key = cls._safe_filename(method or "general") cache_key = (udid_key, method_key) # 命中缓存 logger = cls._method_loggers.get(cache_key) if logger: return logger deviceLogDir = os.path.join(cls.logDir, udid_key) os.makedirs(deviceLogDir, exist_ok=True) logFile = os.path.join(deviceLogDir, f"{method_key}.log") logger_name = f"{udid_key}.{method_key}" logger = logging.getLogger(logger_name) logger.setLevel(level) logger.propagate = False abs_target = os.path.abspath(logFile) for h in logger.handlers: if isinstance(h, logging.FileHandler) and getattr(h, "baseFilename", "") == abs_target: cls._method_loggers[cache_key] = logger return logger fileHandler = logging.FileHandler(logFile, mode="a", encoding="utf-8") formatter = logging.Formatter( "%(asctime)s - %(levelname)s - %(name)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) fileHandler.setFormatter(formatter) logger.addHandler(fileHandler) cls._method_loggers[cache_key] = logger return logger @classmethod def method_info(cls, text, method, udid="system"): msg = cls._safe_text(f"[{udid}][{method}] {text}") cls._setupMethodLogger(udid, method, level=logging.INFO).info(msg) @classmethod def method_warning(cls, text, method, udid="system"): msg = cls._safe_text(f"[{udid}][{method}] {text}") cls._setupMethodLogger(udid, method, level=logging.WARNING).warning(msg) @classmethod def method_error(cls, text, method, udid="system"): msg = cls._safe_text(f"[{udid}][{method}] {text}") cls._setupMethodLogger(udid, method, level=logging.ERROR).error(msg) # ---------- 清空日志 ---------- @classmethod def clearLogs(cls): """启动时清空 log 目录下所有文件""" # 先关闭所有 logger 的文件句柄 for _, logger in logging.Logger.manager.loggerDict.items(): if isinstance(logger, logging.Logger): for handler in list(logger.handlers): try: handler.close() except Exception: pass logger.removeHandler(handler) log_path = Path(cls.logDir) if log_path.exists(): for item in log_path.iterdir(): try: if item.is_file(): item.unlink(missing_ok=True) # py>=3.8 elif item.is_dir(): shutil.rmtree(item, ignore_errors=True) except Exception: # 不阻塞清理 pass cls._method_loggers.clear() # ---------- 上传所有日志(内存打包 zip) ---------- @classmethod def upload_all_logs(cls, server_url, token, userId, tenantId): """ 将 log/ 目录下所有日志打包为 zip(内存),上传至服务器: - headers: {"vvtoken": } - form: {"tenantId": , "userId": , "file": } 返回 True/False """ try: log_path = Path(cls.logDir) if not log_path.exists(): logging.info("[upload_all_logs] 日志目录不存在:%s", log_path) return False # 文件名仅用于表单,不落盘,可包含安全字符 timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{timestamp}_logs.zip" logging.info("[upload_all_logs] 打包文件名:%s", filename) # 1) 内存中打包 zip zip_buf = io.BytesIO() with zipfile.ZipFile(zip_buf, "w", compression=zipfile.ZIP_DEFLATED) as zf: for p in log_path.rglob("*"): if p.is_file(): arcname = str(p.relative_to(log_path)) zf.write(p, arcname=arcname) zip_bytes = zip_buf.getvalue() # 2) 组织请求 headers = {"vvtoken": token} if token else {} data = {"tenantId": tenantId, "userId": userId} files = {"file": (filename, io.BytesIO(zip_bytes), "application/zip")} # 3) 上传 resp = requests.post(server_url, headers=headers, data=data, files=files, timeout=120) ok = False try: js = resp.json() ok = bool(js.get("data")) except Exception: # 响应不是 JSON,也许是纯文本;降级按状态码判断 ok = resp.ok if ok: logging.info("[upload_all_logs] 上传成功:%s", server_url) return True else: logging.error("[upload_all_logs] 上传失败,status=%s, text=%s", resp.status_code, LogManager._safe_text(resp.text)) return False except Exception as e: logging.error("[upload_all_logs] 异常:%s", LogManager._safe_text(e)) return False