Files
iOSAI/Utils/ThreadManager.py
2025-09-20 21:57:37 +08:00

137 lines
5.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, Tuple, List
from Utils.LogManager import LogManager
class ThreadManager:
_tasks: Dict[str, Dict] = {}
_lock = threading.Lock()
@classmethod
def add(cls, udid: str, thread: threading.Thread, event: threading.Event) -> Tuple[int, str]:
"""
添加一个线程到线程管理器。
:param udid: 设备的唯一标识符
:param thread: 线程对象
:param event: 用于控制线程退出的 Event 对象
:return: 状态码和信息
"""
with cls._lock:
if udid in cls._tasks and cls._tasks[udid].get("running", False):
LogManager.method_info(f"任务添加失败:设备 {udid} 已存在运行中的任务", method="task")
return 400, f"该设备中已存在任务 {udid}"
# 如果任务已经存在但已停止,清理旧任务记录
if udid in cls._tasks and not cls._tasks[udid].get("running", False):
LogManager.method_info(f"清理设备 {udid} 的旧任务记录", method="task")
del cls._tasks[udid]
# 添加新任务记录
cls._tasks[udid] = {
"thread": thread,
"event": event,
"running": True
}
LogManager.method_info(f"设备 {udid} 开始任务成功", method="task")
return 200, f"创建任务成功 {udid}"
@classmethod
def stop(cls, udid: str) -> Tuple[int, str]:
"""
停止指定设备的线程。
:param udid: 设备的唯一标识符
:return: 状态码和信息
"""
with cls._lock:
if udid not in cls._tasks or not cls._tasks[udid].get("running", False):
LogManager.method_info(f"任务停止失败:设备 {udid} 没有执行相关任务", method="task")
return 400, f"当前设备没有执行相关任务 {udid}"
task = cls._tasks[udid]
event = task["event"]
thread = task["thread"]
LogManager.method_info(f"设备 {udid} 的任务正在停止", method="task")
# 设置停止标志位
event.set()
# 等待线程结束
thread.join(timeout=5) # 可设置超时时间,避免阻塞
# 清理任务记录
del cls._tasks[udid] # 删除任务记录
LogManager.method_info(f"设备 {udid} 的任务停止成功", method="task")
return 200, f"当前任务停止成功 {udid}"
@classmethod
def batch_stop(cls, udids: List[str]) -> Tuple[int, List[str], str]:
if not udids:
return 200, [], "无设备需要停止"
fail_list: List[str] = []
# ---------- 1. 瞬间置位 event ----------
with cls._lock:
for udid in udids:
task = cls._tasks.get(udid)
if task and task.get("running"):
task["event"].set()
# ---------- 2. 单设备重试 5 次 ----------
def _wait_and_clean_with_retry(udid: str) -> Tuple[int, List[str], str]:
"""
内部重试 5 次,最终返回格式与 batch_stop 完全一致:
(code, fail_list, msg) fail_list 空表示成功
"""
for attempt in range(1, 6):
with cls._lock:
task = cls._tasks.get(udid)
if not task:
return 200, [], "任务已不存在" # 成功
thread = task["thread"]
thread.join(timeout=5)
still_alive = thread.is_alive()
# 最后一次重试才清理记录
if attempt == 5:
with cls._lock:
cls._tasks.pop(udid, None)
if not still_alive:
return 200, [], "已停止"
LogManager.warning(f"[batch_stop] {udid}{attempt}/5 次停止未立即结束,重试", udid)
# 5 次都失败
LogManager.error(f"[batch_stop] {udid} 停止失败5 次重试后线程仍活)", udid)
return 1001, [udid], "部分设备停止失败"
# ---------- 3. 并发执行 ----------
with ThreadPoolExecutor(max_workers=min(32, len(udids))) as executor:
future_map = {executor.submit(_wait_and_clean_with_retry, udid): udid for udid in udids}
for future in as_completed(future_map):
udid = future_map[future]
code, sub_fail_list, sub_msg = future.result() # ← 现在解包正确
if code != 200:
fail_list.extend(sub_fail_list) # 收集失败 udid
# ---------- 4. 返回兼容格式 ----------
if fail_list:
return 1001, fail_list, "部分设备停止失败"
return 200, [], "全部设备停止成功"
@classmethod
def is_task_running(cls, udid: str) -> bool:
"""
检查任务是否正在运行。
:param udid: 设备的唯一标识符
:return: True 表示任务正在运行False 表示没有任务运行
"""
with cls._lock:
is_running = cls._tasks.get(udid, {}).get("running", False)
LogManager.method_info(f"检查设备 {udid} 的任务状态:{'运行中' if is_running else '未运行'}", method="task")
return is_running