import threading from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Dict, Tuple, List from Utils.LogManager import LogManager class ThreadManager: _tasks: Dict[str, Dict] = {} _lock = threading.Lock() @classmethod def add(cls, udid: str, thread: threading.Thread, event: threading.Event) -> Tuple[int, str]: """ 添加一个线程到线程管理器。 :param udid: 设备的唯一标识符 :param thread: 线程对象 :param event: 用于控制线程退出的 Event 对象 :return: 状态码和信息 """ with cls._lock: if udid in cls._tasks and cls._tasks[udid].get("running", False): LogManager.method_info(f"任务添加失败:设备 {udid} 已存在运行中的任务", method="task") return 400, f"该设备中已存在任务 {udid}" # 如果任务已经存在但已停止,清理旧任务记录 if udid in cls._tasks and not cls._tasks[udid].get("running", False): LogManager.method_info(f"清理设备 {udid} 的旧任务记录", method="task") del cls._tasks[udid] # 添加新任务记录 cls._tasks[udid] = { "thread": thread, "event": event, "running": True } LogManager.method_info(f"设备 {udid} 开始任务成功", method="task") return 200, f"创建任务成功 {udid}" @classmethod def stop(cls, udid: str) -> Tuple[int, str]: """ 停止指定设备的线程。 :param udid: 设备的唯一标识符 :return: 状态码和信息 """ with cls._lock: if udid not in cls._tasks or not cls._tasks[udid].get("running", False): LogManager.method_info(f"任务停止失败:设备 {udid} 没有执行相关任务", method="task") return 400, f"当前设备没有执行相关任务 {udid}" task = cls._tasks[udid] event = task["event"] thread = task["thread"] LogManager.method_info(f"设备 {udid} 的任务正在停止", method="task") # 设置停止标志位 event.set() # 等待线程结束 thread.join(timeout=5) # 可设置超时时间,避免阻塞 # 清理任务记录 del cls._tasks[udid] # 删除任务记录 LogManager.method_info(f"设备 {udid} 的任务停止成功", method="task") return 200, f"当前任务停止成功 {udid}" @classmethod def batch_stop(cls, udids: List[str]) -> Tuple[int, List[str], str]: """ 批量停止任务——瞬间下发停止信号,仍统计失败结果。 返回格式与原接口 100% 兼容: (code, fail_list, msg) code=200 全部成功 code=1001 部分/全部失败 """ if not udids: return 200, [], "无设备需要停止" fail_list: List[str] = [] # ---------- 1. 瞬间置位 event ---------- with cls._lock: for udid in udids: task = cls._tasks.get(udid) if not task or not task.get("running"): fail_list.append(f"设备{udid}停止失败:当前设备没有执行相关任务") continue task["event"].set() # 下发停止信号 # ---------- 2. 并发等 0.2 s 收尾 ---------- def _wait_and_clean(udid: str) -> Tuple[int, str]: with cls._lock: task = cls._tasks.get(udid) if not task: return 400, "任务记录已丢失" thread = task["thread"] # 第一次等 3 秒,让“分片睡眠”有机会退出 thread.join(timeout=3) # 如果还活,再补 2 秒 if thread.is_alive(): thread.join(timeout=2) # 最终仍活,记录日志但不硬杀,避免僵尸 with cls._lock: cls._tasks.pop(udid, None) if thread.is_alive(): LogManager.warning(f"[batch_stop] 线程 5s 未退出,已清理记录但线程仍跑 {udid}") return 201, "已下发停止,线程超长任务未立即结束" return 200, "已停止" with ThreadPoolExecutor(max_workers=min(32, len(udids))) as executor: future_map = {executor.submit(_wait_and_clean, udid): udid for udid in udids} for future in as_completed(future_map): udid = future_map[future] try: code, reason = future.result() if code != 200: fail_list.append(f"设备{udid}停止失败:{reason}") except Exception as exc: fail_list.append(f"设备{udid}停止异常:{exc}") # ---------- 3. 返回兼容格式 ---------- if fail_list: return 1001, fail_list, "部分设备停止失败" return 200, [], "全部设备停止成功" @classmethod def is_task_running(cls, udid: str) -> bool: """ 检查任务是否正在运行。 :param udid: 设备的唯一标识符 :return: True 表示任务正在运行,False 表示没有任务运行 """ with cls._lock: is_running = cls._tasks.get(udid, {}).get("running", False) LogManager.method_info(f"检查设备 {udid} 的任务状态:{'运行中' if is_running else '未运行'}", method="task") return is_running