Merge remote-tracking branch 'origin/main'

# Conflicts:
#	.idea/workspace.xml
#	Module/DeviceInfo.py
#	Utils/ControlUtils.py
#	script/ScriptManager.py
This commit is contained in:
2025-09-18 13:11:50 +08:00
84 changed files with 414 additions and 412 deletions

View File

@@ -1,217 +1,3 @@
#
# import datetime
# import io
# import logging
# import os
# import re
# import sys
# import shutil
# import zipfile
# from pathlib import Path
# import requests
#
#
# class LogManager:
# # 运行根目录:打包后取 exe 目录;源码运行取项目目录
# if getattr(sys, "frozen", False):
# projectRoot = os.path.dirname(sys.executable)
# else:
# projectRoot = os.path.dirname(os.path.dirname(__file__))
#
# logDir = os.path.join(projectRoot, "log")
# _loggers = {}
# _method_loggers = {} # 新增:缓存“设备+方法”的 logger
#
# # ---------- 工具函数 ----------
# @classmethod
# def _safe_filename(cls, name: str, max_len: int = 80) -> str:
# """
# 将方法名/udid等转成安全文件名
# - 允许字母数字、点、下划线、连字符
# - 允许常见 CJK 字符(中日韩)
# - 其他非法字符替换为下划线
# - 合并多余下划线,裁剪长度
# """
# if not name:
# return "unknown"
# name = str(name).strip()
#
# # 替换 Windows 非法字符和控制符
# name = re.sub(r'[\\/:*?"<>|\r\n\t]+', '_', name)
#
# # 只保留 ① 英数._- ② CJK 统一表意文字、日文平/片假名、韩文音节
# name = re.sub(rf'[^a-zA-Z0-9_.\-'
# r'\u4e00-\u9fff' # 中
# r'\u3040-\u30ff' # 日
# r'\uac00-\ud7a3' # 韩
# r']+', '_', name)
# # 合并多余下划线,去两端空白与下划线
# name = re.sub(r'_+', '_', name).strip(' _.')
# # 避免空
# name = name or "unknown"
# # Windows 预留名避免CON/PRN/AUX/NUL/COM1…
# if re.fullmatch(r'(?i)(CON|PRN|AUX|NUL|COM[1-9]|LPT[1-9])', name):
# name = f"_{name}"
# # 限长
# return name[:max_len] or "unknown"
#
# # ---------- 旧的:按级别写固定文件 ----------
# @classmethod
# def _setupLogger(cls, udid, name, logName, level=logging.INFO):
# """创建或获取 logger并绑定到设备目录下的固定文件info.log / warning.log / error.log"""
# deviceLogDir = os.path.join(cls.logDir, cls._safe_filename(udid))
# os.makedirs(deviceLogDir, exist_ok=True)
# logFile = os.path.join(deviceLogDir, logName)
#
# logger_name = f"{udid}_{name}"
# logger = logging.getLogger(logger_name)
# logger.setLevel(level)
#
# # 避免重复添加 handler
# if not any(
# isinstance(h, logging.FileHandler) and h.baseFilename == os.path.abspath(logFile)
# for h in logger.handlers
# ):
# fileHandler = logging.FileHandler(logFile, mode="a", encoding="utf-8")
# formatter = logging.Formatter(
# "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
# datefmt="%Y-%m-%d %H:%M:%S"
# )
# fileHandler.setFormatter(formatter)
# logger.addHandler(fileHandler)
#
# return logger
#
# @classmethod
# def info(cls, text, udid="system"):
# cls._setupLogger(udid, "infoLogger", "info.log", level=logging.INFO).info(f"[{udid}] {text}")
#
# @classmethod
# def warning(cls, text, udid="system"):
# cls._setupLogger(udid, "warningLogger", "warning.log", level=logging.WARNING).warning(f"[{udid}] {text}")
#
# @classmethod
# def error(cls, text, udid="system"):
# cls._setupLogger(udid, "errorLogger", "error.log", level=logging.ERROR).error(f"[{udid}] {text}")
#
# # ---------- 新增:按“设备+方法”分别写独立日志文件 ----------
# @classmethod
# def _setupMethodLogger(cls, udid: str, method: str, level=logging.INFO):
# """
# 为某设备的某个方法单独创建 logger
# log/<udid>/<method>.log
# """
# udid_key = cls._safe_filename(udid or "system")
# method_key = cls._safe_filename(method or "general")
# cache_key = (udid_key, method_key)
#
# # 命中缓存
# if cache_key in cls._method_loggers:
# return cls._method_loggers[cache_key]
#
# deviceLogDir = os.path.join(cls.logDir, udid_key)
# os.makedirs(deviceLogDir, exist_ok=True)
# logFile = os.path.join(deviceLogDir, f"{method_key}.log")
#
# logger_name = f"{udid_key}.{method_key}"
# logger = logging.getLogger(logger_name)
# logger.setLevel(level)
# logger.propagate = False # 避免向根 logger 传播导致控制台重复打印
#
# # 避免重复添加 handler
# if not any(
# isinstance(h, logging.FileHandler) and h.baseFilename == os.path.abspath(logFile)
# for h in logger.handlers
# ):
# fileHandler = logging.FileHandler(logFile, mode="a", encoding="utf-8")
# formatter = logging.Formatter(
# "%(asctime)s - %(levelname)s - %(name)s - %(message)s",
# datefmt="%Y-%m-%d %H:%M:%S"
# )
# fileHandler.setFormatter(formatter)
# logger.addHandler(fileHandler)
#
# cls._method_loggers[cache_key] = logger
# return logger
#
# @classmethod
# def method_info(cls, text, method, udid="system"):
# """按设备+方法写 INFO 到 log/<udid>/<method>.log"""
# cls._setupMethodLogger(udid, method, level=logging.INFO).info(f"[{udid}][{method}] {text}")
#
# @classmethod
# def method_warning(cls, text, method, udid="system"):
# cls._setupMethodLogger(udid, method, level=logging.WARNING).warning(f"[{udid}][{method}] {text}")
#
# @classmethod
# def method_error(cls, text, method, udid="system"):
# cls._setupMethodLogger(udid, method, level=logging.ERROR).error(f"[{udid}][{method}] {text}")
#
# # 清空日志
# @classmethod
# def clearLogs(cls):
# """启动时清空 log 目录下所有文件"""
#
# # 关闭所有 handler
# for name, logger in logging.Logger.manager.loggerDict.items():
# if isinstance(logger, logging.Logger):
# for handler in logger.handlers[:]:
# try:
# handler.close()
# except Exception:
# pass
# logger.removeHandler(handler)
#
# # 删除 log 目录
# log_path = Path(cls.logDir)
# if log_path.exists():
# for item in log_path.iterdir():
# if item.is_file():
# item.unlink()
# elif item.is_dir():
# shutil.rmtree(item)
#
# # 清缓存
# cls._method_loggers.clear()
#
# @classmethod
# def upload_all_logs(cls, server_url, token, userId, tenantId):
# log_path = Path(cls.logDir)
# if not log_path.exists():
# return False
#
# timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# filename = f"{timestamp}_logs.zip"
# print(filename)
# zip_buf = io.BytesIO()
# with zipfile.ZipFile(zip_buf, "w", compression=zipfile.ZIP_DEFLATED) as zf:
# for p in log_path.rglob("*"):
# if p.is_file():
# arcname = str(p.relative_to(log_path))
# zf.write(p, arcname=arcname)
#
# zip_bytes = zip_buf.getvalue()
#
# headers = {"vvtoken": token}
# data = {"tenantId": tenantId, "userId": userId}
#
#
# files = {
# "file": (filename, io.BytesIO(zip_bytes), "application/zip")
# }
#
# # 3) 上传
# resp = requests.post(server_url, headers=headers, data=data, files=files)
# if resp.json()['data']:
# return True
# return False
# -*- coding: utf-8 -*-
import datetime
import io

24
Utils/SubprocessKit.py Normal file
View File

@@ -0,0 +1,24 @@
import os
import subprocess
__all__ = ['check_output', 'popen', 'PIPE']
# 模块级单例,导入时只创建一次
if os.name == "nt":
_si = subprocess.STARTUPINFO()
_si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
_si.wShowWindow = subprocess.SW_HIDE
else:
_si = None
PIPE = subprocess.PIPE
def check_output(cmd, **kw):
if os.name == "nt":
kw.setdefault('startupinfo', _si)
return subprocess.check_output(cmd, **kw)
def popen(*args, **kw):
if os.name == "nt":
kw.setdefault('startupinfo', _si)
return subprocess.Popen(*args, **kw)

View File

@@ -1,33 +1,126 @@
from threading import Thread, Event
import os
import signal
import sys
import threading
import time
import psutil
import subprocess
from pathlib import Path
from threading import Event, Thread
from typing import Dict, Optional
from Utils.LogManager import LogManager
from script.ScriptManager import ScriptManager
class ThreadManager():
threads = {}
class ThreadManager:
"""
对调用方完全透明:
add(udid, thread_obj, stop_event) 保持原签名
stop(udid) 保持原签名
但内部把 thread_obj 当成“壳”,真正拉起的是子进程。
"""
_pool: Dict[str, psutil.Process] = {}
_lock = threading.Lock()
@classmethod
def add(cls, udid, t: Thread, stopEvent: Event):
if udid in cls.threads:
print("▲ 线程已存在")
return
cls.threads[udid] = {"thread": t, "stopEvent": stopEvent}
def add(cls, udid: str, dummy_thread, dummy_event: Event) -> None:
LogManager.method_info(f"【1】入口 udid={udid} 长度={len(udid)}", method="task")
if udid in cls._pool:
LogManager.method_warning(f"{udid} 仍在运行,先强制清理旧任务", method="task")
cls.stop(udid)
LogManager.method_info(f"【2】判断旧任务后 udid={udid} 长度={len(udid)}", method="task")
port = cls._find_free_port()
LogManager.method_info(f"【3】找端口后 udid={udid} 长度={len(udid)}", method="task")
proc = cls._start_worker_process(udid, port)
LogManager.method_info(f"【4】子进程启动后 udid={udid} 长度={len(udid)}", method="task")
cls._pool[udid] = proc
LogManager.method_info(f"【5】已写入字典udid={udid} 长度={len(udid)}", method="task")
@classmethod
def stop(cls, udid):
try:
info = cls.threads[udid]
if info:
info["stopEvent"].set() # 停止线程
info["thread"].join(timeout=3) # 等待线程退出
del cls.threads[udid]
LogManager.info("停止线程成功", udid)
return 200, "停止线程成功 " + udid
else:
LogManager.info("无此线程,无需关闭", udid)
return 1001, "无此线程,无需关闭 " + udid
except KeyError as e:
LogManager.info("无此线程,无需关闭", udid)
return 1001, "停止脚本失败 " + udid
def stop(cls, udid: str) -> tuple[int, str]:
with cls._lock: # 类级锁
proc = cls._pool.get(udid) # 1. 只读,不删
if proc is None:
return 1001, f"无此任务 {udid}"
try:
proc.terminate()
gone, alive = psutil.wait_procs([proc], timeout=3)
if alive:
for p in alive:
for child in p.children(recursive=True):
child.kill()
p.kill()
psutil.wait_procs(alive, timeout=2)
# 正常退出
cls._pool.pop(udid)
LogManager.method_info("任务停止成功", method="task")
return 200, f"停止线程成功 {udid}"
except psutil.NoSuchProcess: # 精准捕获
cls._pool.pop(udid)
LogManager.method_info("进程已自然退出", method="task")
return 200, f"进程已退出 {udid}"
except Exception as e: # 真正的异常
LogManager.method_error(f"停止异常: {e}", method="task")
return 1002, f"停止异常 {udid}"
# ------------------------------------------------------
# 以下全是内部工具,外部无需调用
# ------------------------------------------------------
@staticmethod
def _find_free_port(start: int = 50000) -> int:
"""找个随机空闲端口,给子进程当通信口(可选)"""
import socket
for p in range(start, start + 1000):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
if s.connect_ex(("127.0.0.1", p)) != 0:
return p
raise RuntimeError("无可用端口")
@staticmethod
def _start_worker_process(udid: str, port: int) -> psutil.Process:
"""
真正拉起子进程:
打包环境exe --udid=xxx
源码环境python -m Module.Worker --udid=xxx
"""
exe_path = Path(sys.executable).resolve()
is_frozen = exe_path.suffix.lower() == ".exe" and exe_path.exists()
if is_frozen:
# 打包后
cmd = [str(exe_path), "--role=worker", f"--udid={udid}", f"--port={port}"]
cwd = str(exe_path.parent)
else:
# 源码运行
cmd = [sys.executable, "-u", "-m", "Module.Worker", f"--udid={udid}", f"--port={port}"]
cwd = str(Path(__file__).resolve().parent.parent)
# 核心CREATE_NO_WINDOW + 独立会话,父进程死也不影响
creation_flags = 0x08000000 if os.name == "nt" else 0
proc = subprocess.Popen(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8",
errors="replace",
bufsize=1,
cwd=cwd,
start_new_session=True, # 独立进程组
creationflags=creation_flags
)
# 守护线程:把子进程 stdout 实时打到日志
Thread(target=lambda: ThreadManager._log_stdout(proc, udid), daemon=True).start()
return psutil.Process(proc.pid)
@staticmethod
def _log_stdout(proc: subprocess.Popen, udid: str):
for line in iter(proc.stdout.readline, ""):
if line:
LogManager.info(line.rstrip(), udid)
proc.stdout.close()

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.