# import logging # import os # import sys # import shutil # from pathlib import Path # # # class LogManager: # # 运行根目录:打包后取 exe 目录;源码运行取项目目录 # if getattr(sys, "frozen", False): # projectRoot = os.path.dirname(sys.executable) # else: # projectRoot = os.path.dirname(os.path.dirname(__file__)) # # logDir = os.path.join(projectRoot, "log") # _loggers = {} # # @classmethod # def _setupLogger(cls, udid, name, logName, level=logging.INFO): # """创建或获取 logger,并绑定到文件""" # deviceLogDir = os.path.join(cls.logDir, udid) # os.makedirs(deviceLogDir, exist_ok=True) # logFile = os.path.join(deviceLogDir, logName) # # logger_name = f"{udid}_{name}" # logger = logging.getLogger(logger_name) # logger.setLevel(level) # # # 避免重复添加 handler # if not any( # isinstance(h, logging.FileHandler) and h.baseFilename == os.path.abspath(logFile) # for h in logger.handlers # ): # fileHandler = logging.FileHandler(logFile, mode="a", encoding="utf-8") # formatter = logging.Formatter( # "%(asctime)s - %(name)s - %(levelname)s - %(message)s", # datefmt="%Y-%m-%d %H:%M:%S" # ) # fileHandler.setFormatter(formatter) # logger.addHandler(fileHandler) # # return logger # # @classmethod # def info(cls, text, udid="system"): # cls._setupLogger(udid, "infoLogger", "info.log", level=logging.INFO).info(f"[{udid}] {text}") # # @classmethod # def warning(cls, text, udid="system"): # cls._setupLogger(udid, "warningLogger", "warning.log", level=logging.WARNING).warning(f"[{udid}] {text}") # # @classmethod # def error(cls, text, udid="system"): # cls._setupLogger(udid, "errorLogger", "error.log", level=logging.ERROR).error(f"[{udid}] {text}") # # # 清空日志 # @classmethod # def clearLogs(cls): # """启动时清空 log 目录下所有文件""" # print("开始清空日志...") # # # 关闭所有 handler # for name, logger in logging.Logger.manager.loggerDict.items(): # if isinstance(logger, logging.Logger): # for handler in logger.handlers[:]: # try: # handler.close() # except Exception: # pass # logger.removeHandler(handler) # # # 仅删除目录里的所有文件和子目录 # log_path = Path(cls.logDir) # if log_path.exists(): # for item in log_path.iterdir(): # if item.is_file(): # item.unlink() # elif item.is_dir(): # import shutil # shutil.rmtree(item) # # print("日志清空完成") import logging import os import re import sys import shutil from pathlib import Path import requests from Entity.Variables import token class LogManager: # 运行根目录:打包后取 exe 目录;源码运行取项目目录 if getattr(sys, "frozen", False): projectRoot = os.path.dirname(sys.executable) else: projectRoot = os.path.dirname(os.path.dirname(__file__)) logDir = os.path.join(projectRoot, "log") _loggers = {} _method_loggers = {} # 新增:缓存“设备+方法”的 logger # ---------- 工具函数 ---------- @classmethod def _safe_filename(cls, name: str, max_len: int = 80) -> str: """ 将方法名/udid等转成安全文件名: - 允许字母数字、点、下划线、连字符 - 允许常见 CJK 字符(中日韩) - 其他非法字符替换为下划线 - 合并多余下划线,裁剪长度 """ if not name: return "unknown" name = str(name).strip() # 替换 Windows 非法字符和控制符 name = re.sub(r'[\\/:*?"<>|\r\n\t]+', '_', name) # 只保留 ① 英数._- ② CJK 统一表意文字、日文平/片假名、韩文音节 name = re.sub(rf'[^a-zA-Z0-9_.\-' r'\u4e00-\u9fff' # 中 r'\u3040-\u30ff' # 日 r'\uac00-\ud7a3' # 韩 r']+', '_', name) # 合并多余下划线,去两端空白与下划线 name = re.sub(r'_+', '_', name).strip(' _.') # 避免空 name = name or "unknown" # Windows 预留名避免(CON/PRN/AUX/NUL/COM1…) if re.fullmatch(r'(?i)(CON|PRN|AUX|NUL|COM[1-9]|LPT[1-9])', name): name = f"_{name}" # 限长 return name[:max_len] or "unknown" # ---------- 旧的:按级别写固定文件 ---------- @classmethod def _setupLogger(cls, udid, name, logName, level=logging.INFO): """创建或获取 logger,并绑定到设备目录下的固定文件(info.log / warning.log / error.log)""" deviceLogDir = os.path.join(cls.logDir, cls._safe_filename(udid)) os.makedirs(deviceLogDir, exist_ok=True) logFile = os.path.join(deviceLogDir, logName) logger_name = f"{udid}_{name}" logger = logging.getLogger(logger_name) logger.setLevel(level) # 避免重复添加 handler if not any( isinstance(h, logging.FileHandler) and h.baseFilename == os.path.abspath(logFile) for h in logger.handlers ): fileHandler = logging.FileHandler(logFile, mode="a", encoding="utf-8") formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) fileHandler.setFormatter(formatter) logger.addHandler(fileHandler) return logger @classmethod def info(cls, text, udid="system"): cls._setupLogger(udid, "infoLogger", "info.log", level=logging.INFO).info(f"[{udid}] {text}") @classmethod def warning(cls, text, udid="system"): cls._setupLogger(udid, "warningLogger", "warning.log", level=logging.WARNING).warning(f"[{udid}] {text}") @classmethod def error(cls, text, udid="system"): cls._setupLogger(udid, "errorLogger", "error.log", level=logging.ERROR).error(f"[{udid}] {text}") # ---------- 新增:按“设备+方法”分别写独立日志文件 ---------- @classmethod def _setupMethodLogger(cls, udid: str, method: str, level=logging.INFO): """ 为某设备的某个方法单独创建 logger: log//.log """ udid_key = cls._safe_filename(udid or "system") method_key = cls._safe_filename(method or "general") cache_key = (udid_key, method_key) # 命中缓存 if cache_key in cls._method_loggers: return cls._method_loggers[cache_key] deviceLogDir = os.path.join(cls.logDir, udid_key) os.makedirs(deviceLogDir, exist_ok=True) logFile = os.path.join(deviceLogDir, f"{method_key}.log") logger_name = f"{udid_key}.{method_key}" logger = logging.getLogger(logger_name) logger.setLevel(level) logger.propagate = False # 避免向根 logger 传播导致控制台重复打印 # 避免重复添加 handler if not any( isinstance(h, logging.FileHandler) and h.baseFilename == os.path.abspath(logFile) for h in logger.handlers ): fileHandler = logging.FileHandler(logFile, mode="a", encoding="utf-8") formatter = logging.Formatter( "%(asctime)s - %(levelname)s - %(name)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) fileHandler.setFormatter(formatter) logger.addHandler(fileHandler) cls._method_loggers[cache_key] = logger return logger @classmethod def method_info(cls, text, method, udid="system"): """按设备+方法写 INFO 到 log//.log""" cls._setupMethodLogger(udid, method, level=logging.INFO).info(f"[{udid}][{method}] {text}") @classmethod def method_warning(cls, text, method, udid="system"): cls._setupMethodLogger(udid, method, level=logging.WARNING).warning(f"[{udid}][{method}] {text}") @classmethod def method_error(cls, text, method, udid="system"): cls._setupMethodLogger(udid, method, level=logging.ERROR).error(f"[{udid}][{method}] {text}") # 清空日志 @classmethod def clearLogs(cls): """启动时清空 log 目录下所有文件""" print("开始清空日志...") # 关闭所有 handler for name, logger in logging.Logger.manager.loggerDict.items(): if isinstance(logger, logging.Logger): for handler in logger.handlers[:]: try: handler.close() except Exception: pass logger.removeHandler(handler) # 删除 log 目录 log_path = Path(cls.logDir) if log_path.exists(): for item in log_path.iterdir(): if item.is_file(): item.unlink() elif item.is_dir(): shutil.rmtree(item) # 清缓存 cls._method_loggers.clear() print("日志清空完成") @classmethod def upload_all_logs(cls, server_url: str, extra_data: dict = None): """ 上传 log/ 目录下所有文件和子目录中的日志 """ log_path = Path(cls.logDir) if not log_path.exists(): print("[upload_all_logs] 日志目录不存在") return False success_files, failed_files = [], [] for file in log_path.rglob("*.log"): # 递归找到所有 .log 文件 try: files = {"file": open(file, "rb")} headers = { "Authorization": f"Bearer {token}", } data = { # "relative_path": str(file.relative_to(log_path)) "tenantId": 1, "userId": 1, } if extra_data: data.update(extra_data) resp = requests.post(server_url, files=files, data=data, headers=headers ,timeout=15) print(resp.text) print(resp.status_code) if resp.status_code == 200: success_files.append(file.name) else: failed_files.append((file.name, resp.status_code)) except Exception as e: failed_files.append((file.name, str(e))) finally: try: files["file"].close() except: pass print(f"[upload_all_logs] 成功上传: {success_files}") if failed_files: print(f"[upload_all_logs] 失败文件: {failed_files}") return len(failed_files) == 0