优化停止任务逻辑
This commit is contained in:
@@ -28,43 +28,6 @@ def _force_utf8_everywhere():
|
||||
|
||||
# _force_utf8_everywhere()
|
||||
|
||||
|
||||
# ========= 全局:强制 UTF-8 + 关闭缓冲(运行期立刻生效) =========
|
||||
def _force_utf8_everywhere():
|
||||
os.environ.setdefault("PYTHONUTF8", "1")
|
||||
# 等价于 -u:让 stdout/stderr 无缓冲
|
||||
os.environ.setdefault("PYTHONUNBUFFERED", "1")
|
||||
os.environ.setdefault("PYTHONIOENCODING", "utf-8")
|
||||
|
||||
# 若是 3.7+,优先用 reconfigure 实时改流
|
||||
try:
|
||||
if hasattr(sys.stdout, "reconfigure"):
|
||||
sys.stdout.reconfigure(encoding="utf-8", errors="replace",
|
||||
line_buffering=True, write_through=True)
|
||||
elif getattr(sys.stdout, "buffer", None):
|
||||
# 退路:重新包一层,启用行缓冲 + 直写
|
||||
sys.stdout = io.TextIOWrapper(
|
||||
sys.stdout.buffer, encoding="utf-8",
|
||||
errors="replace", line_buffering=True
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
if hasattr(sys.stderr, "reconfigure"):
|
||||
sys.stderr.reconfigure(encoding="utf-8", errors="replace",
|
||||
line_buffering=True, write_through=True)
|
||||
elif getattr(sys.stderr, "buffer", None):
|
||||
sys.stderr = io.TextIOWrapper(
|
||||
sys.stderr.buffer, encoding="utf-8",
|
||||
errors="replace", line_buffering=True
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# ===========================================================
|
||||
|
||||
|
||||
class LogManager:
|
||||
"""
|
||||
设备级与“设备+方法”级日志管理:
|
||||
|
||||
@@ -1,126 +1,77 @@
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import psutil
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from threading import Event, Thread
|
||||
from typing import Dict, Optional
|
||||
|
||||
from typing import Dict, Tuple
|
||||
from Utils.LogManager import LogManager
|
||||
|
||||
|
||||
class ThreadManager:
|
||||
"""
|
||||
对调用方完全透明:
|
||||
add(udid, thread_obj, stop_event) 保持原签名
|
||||
stop(udid) 保持原签名
|
||||
但内部把 thread_obj 当成“壳”,真正拉起的是子进程。
|
||||
"""
|
||||
_pool: Dict[str, psutil.Process] = {}
|
||||
_tasks: Dict[str, Dict] = {}
|
||||
_lock = threading.Lock()
|
||||
|
||||
@classmethod
|
||||
def add(cls, udid: str, dummy_thread, dummy_event: Event) -> None:
|
||||
LogManager.method_info(f"【1】入口 udid={udid} 长度={len(udid)}", method="task")
|
||||
if udid in cls._pool:
|
||||
LogManager.method_warning(f"{udid} 仍在运行,先强制清理旧任务", method="task")
|
||||
cls.stop(udid)
|
||||
LogManager.method_info(f"【2】判断旧任务后 udid={udid} 长度={len(udid)}", method="task")
|
||||
port = cls._find_free_port()
|
||||
LogManager.method_info(f"【3】找端口后 udid={udid} 长度={len(udid)}", method="task")
|
||||
proc = cls._start_worker_process(udid, port)
|
||||
LogManager.method_info(f"【4】子进程启动后 udid={udid} 长度={len(udid)}", method="task")
|
||||
cls._pool[udid] = proc
|
||||
LogManager.method_info(f"【5】已写入字典,udid={udid} 长度={len(udid)}", method="task")
|
||||
def add(cls, udid: str, thread: threading.Thread, event: threading.Event) -> Tuple[int, str]:
|
||||
"""
|
||||
添加一个线程到线程管理器。
|
||||
:param udid: 设备的唯一标识符
|
||||
:param thread: 线程对象
|
||||
:param event: 用于控制线程退出的 Event 对象
|
||||
:return: 状态码和信息
|
||||
"""
|
||||
with cls._lock:
|
||||
if udid in cls._tasks and cls._tasks[udid].get("running", False):
|
||||
LogManager.method_info(f"任务添加失败:设备 {udid} 已存在运行中的任务", method="task")
|
||||
return 400, f"该设备中已存在任务 {udid}"
|
||||
|
||||
# 如果任务已经存在但已停止,清理旧任务记录
|
||||
if udid in cls._tasks and not cls._tasks[udid].get("running", False):
|
||||
LogManager.method_info(f"清理设备 {udid} 的旧任务记录", method="task")
|
||||
del cls._tasks[udid]
|
||||
|
||||
# 添加新任务记录
|
||||
cls._tasks[udid] = {
|
||||
"thread": thread,
|
||||
"event": event,
|
||||
"running": True
|
||||
}
|
||||
LogManager.method_info(f"设备 {udid} 开始任务成功", method="task")
|
||||
return 200, f"创建任务成功 {udid}"
|
||||
|
||||
@classmethod
|
||||
def stop(cls, udid: str) -> tuple[int, str]:
|
||||
with cls._lock: # 类级锁
|
||||
proc = cls._pool.get(udid) # 1. 只读,不删
|
||||
if proc is None:
|
||||
return 1001, f"无此任务 {udid}"
|
||||
|
||||
try:
|
||||
proc.terminate()
|
||||
gone, alive = psutil.wait_procs([proc], timeout=3)
|
||||
if alive:
|
||||
for p in alive:
|
||||
for child in p.children(recursive=True):
|
||||
child.kill()
|
||||
p.kill()
|
||||
psutil.wait_procs(alive, timeout=2)
|
||||
|
||||
# 正常退出
|
||||
cls._pool.pop(udid)
|
||||
LogManager.method_info("任务停止成功", method="task")
|
||||
return 200, f"停止线程成功 {udid}"
|
||||
|
||||
except psutil.NoSuchProcess: # 精准捕获
|
||||
cls._pool.pop(udid)
|
||||
LogManager.method_info("进程已自然退出", method="task")
|
||||
return 200, f"进程已退出 {udid}"
|
||||
|
||||
except Exception as e: # 真正的异常
|
||||
LogManager.method_error(f"停止异常: {e}", method="task")
|
||||
return 1002, f"停止异常 {udid}"
|
||||
|
||||
# ------------------------------------------------------
|
||||
# 以下全是内部工具,外部无需调用
|
||||
# ------------------------------------------------------
|
||||
@staticmethod
|
||||
def _find_free_port(start: int = 50000) -> int:
|
||||
"""找个随机空闲端口,给子进程当通信口(可选)"""
|
||||
import socket
|
||||
for p in range(start, start + 1000):
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
if s.connect_ex(("127.0.0.1", p)) != 0:
|
||||
return p
|
||||
raise RuntimeError("无可用端口")
|
||||
|
||||
@staticmethod
|
||||
def _start_worker_process(udid: str, port: int) -> psutil.Process:
|
||||
def stop(cls, udid: str) -> Tuple[int, str]:
|
||||
"""
|
||||
真正拉起子进程:
|
||||
打包环境:exe --udid=xxx
|
||||
源码环境:python -m Module.Worker --udid=xxx
|
||||
停止指定设备的线程。
|
||||
:param udid: 设备的唯一标识符
|
||||
:return: 状态码和信息
|
||||
"""
|
||||
exe_path = Path(sys.executable).resolve()
|
||||
is_frozen = exe_path.suffix.lower() == ".exe" and exe_path.exists()
|
||||
with cls._lock:
|
||||
if udid not in cls._tasks or not cls._tasks[udid].get("running", False):
|
||||
LogManager.method_info(f"任务停止失败:设备 {udid} 没有执行相关任务", method="task")
|
||||
return 400, f"当前设备没有执行相关任务 {udid}"
|
||||
|
||||
if is_frozen:
|
||||
# 打包后
|
||||
cmd = [str(exe_path), "--role=worker", f"--udid={udid}", f"--port={port}"]
|
||||
cwd = str(exe_path.parent)
|
||||
else:
|
||||
# 源码运行
|
||||
cmd = [sys.executable, "-u", "-m", "Module.Worker", f"--udid={udid}", f"--port={port}"]
|
||||
cwd = str(Path(__file__).resolve().parent.parent)
|
||||
task = cls._tasks[udid]
|
||||
event = task["event"]
|
||||
thread = task["thread"]
|
||||
|
||||
# 核心:CREATE_NO_WINDOW + 独立会话,父进程死也不影响
|
||||
creation_flags = 0x08000000 if os.name == "nt" else 0
|
||||
proc = subprocess.Popen(
|
||||
cmd,
|
||||
stdin=subprocess.DEVNULL,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
text=True,
|
||||
encoding="utf-8",
|
||||
errors="replace",
|
||||
bufsize=1,
|
||||
cwd=cwd,
|
||||
start_new_session=True, # 独立进程组
|
||||
creationflags=creation_flags
|
||||
)
|
||||
# 守护线程:把子进程 stdout 实时打到日志
|
||||
Thread(target=lambda: ThreadManager._log_stdout(proc, udid), daemon=True).start()
|
||||
return psutil.Process(proc.pid)
|
||||
LogManager.method_info(f"设备 {udid} 的任务正在停止", method="task")
|
||||
|
||||
@staticmethod
|
||||
def _log_stdout(proc: subprocess.Popen, udid: str):
|
||||
for line in iter(proc.stdout.readline, ""):
|
||||
if line:
|
||||
LogManager.info(line.rstrip(), udid)
|
||||
proc.stdout.close()
|
||||
# 设置停止标志位
|
||||
event.set()
|
||||
|
||||
# 等待线程结束
|
||||
thread.join(timeout=5) # 可设置超时时间,避免阻塞
|
||||
|
||||
# 清理任务记录
|
||||
del cls._tasks[udid] # 删除任务记录
|
||||
LogManager.method_info(f"设备 {udid} 的任务停止成功", method="task")
|
||||
return 200, f"当前任务停止成功 {udid}"
|
||||
|
||||
@classmethod
|
||||
def is_task_running(cls, udid: str) -> bool:
|
||||
"""
|
||||
检查任务是否正在运行。
|
||||
:param udid: 设备的唯一标识符
|
||||
:return: True 表示任务正在运行,False 表示没有任务运行
|
||||
"""
|
||||
with cls._lock:
|
||||
is_running = cls._tasks.get(udid, {}).get("running", False)
|
||||
LogManager.method_info(f"检查设备 {udid} 的任务状态:{'运行中' if is_running else '未运行'}", method="task")
|
||||
return is_running
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Reference in New Issue
Block a user