优化批量停止脚本

This commit is contained in:
2025-09-22 19:10:58 +08:00
parent 81e3462f15
commit 4dd3eb59a4
8 changed files with 85 additions and 152 deletions

View File

@@ -1,136 +1,73 @@
import ctypes
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, Tuple, List
from Utils.LogManager import LogManager
def _async_raise(tid: int, exc_type=KeyboardInterrupt):
"""向指定线程抛异常,强制跳出"""
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(exc_type))
if res == 0:
raise ValueError("线程不存在")
elif res > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), 0)
class ThreadManager:
_tasks: Dict[str, Dict] = {}
_lock = threading.Lock()
@classmethod
def add(cls, udid: str, thread: threading.Thread, event: threading.Event) -> Tuple[int, str]:
"""
添加一个线程到线程管理器。
:param udid: 设备的唯一标识符
:param thread: 线程对象
:param event: 用于控制线程退出的 Event 对象
:return: 状态码和信息
"""
LogManager.method_info(f"准备创建任务:{udid}", "task")
with cls._lock:
if udid in cls._tasks and cls._tasks[udid].get("running", False):
LogManager.method_info(f"任务添加失败:设备 {udid} 已存在运行中的任务", method="task")
return 400, f"设备已存在任务 {udid}"
# 判断当前设备是否有任务
if cls._tasks.get(udid, None) is not None:
return 1001, "当前设备已存在任务"
thread.start()
print(thread.ident)
# 如果任务已经存在但已停止,清理旧任务记录
if udid in cls._tasks and not cls._tasks[udid].get("running", False):
LogManager.method_info(f"清理设备 {udid} 的旧任务记录", method="task")
del cls._tasks[udid]
# 添加新任务记录
cls._tasks[udid] = {
"id": thread.ident,
"thread": thread,
"event": event,
"running": True
"event": event
}
LogManager.method_info(f"设备 {udid} 开始任务成功", method="task")
return 200, f"创建任务成功 {udid}"
return 200, "创建成功"
@classmethod
def stop(cls, udid: str) -> Tuple[int, str]:
"""
停止指定设备的线程。
:param udid: 设备的唯一标识符
:return: 状态码和信息
"""
with cls._lock:
if udid not in cls._tasks or not cls._tasks[udid].get("running", False):
LogManager.method_info(f"任务停止失败:设备 {udid} 没有执行相关任务", method="task")
return 400, f"当前设备没有执行相关任务 {udid}"
task = cls._tasks[udid]
event = task["event"]
thread = task["thread"]
LogManager.method_info(f"设备 {udid} 的任务正在停止", method="task")
# 设置停止标志位
event.set()
# 等待线程结束
thread.join(timeout=5) # 可设置超时时间,避免阻塞
# 清理任务记录
del cls._tasks[udid] # 删除任务记录
LogManager.method_info(f"设备 {udid} 的任务停止成功", method="task")
return 200, f"当前任务停止成功 {udid}"
try:
print(cls._tasks)
obj = cls._tasks.get(udid, None)
obj["event"].set()
r = cls._kill_thread(obj.get("id"))
if r:
cls._tasks.pop(udid, None)
else:
print("好像有问题")
except Exception as e:
print(e)
return 200, "操作成功"
@classmethod
def batch_stop(cls, udids: List[str]) -> Tuple[int, List[str], str]:
if not udids:
return 200, [], "无设备需要停止"
fail_list: List[str] = []
# ---------- 1. 瞬间置位 event ----------
with cls._lock:
for udid in udids:
task = cls._tasks.get(udid)
if task and task.get("running"):
task["event"].set()
# ---------- 2. 单设备重试 5 次 ----------
def _wait_and_clean_with_retry(udid: str) -> Tuple[int, List[str], str]:
"""
内部重试 5 次,最终返回格式与 batch_stop 完全一致:
(code, fail_list, msg) fail_list 空表示成功
"""
for attempt in range(1, 6):
with cls._lock:
task = cls._tasks.get(udid)
if not task:
return 200, [], "任务已不存在" # 成功
thread = task["thread"]
thread.join(timeout=5)
still_alive = thread.is_alive()
# 最后一次重试才清理记录
if attempt == 5:
with cls._lock:
cls._tasks.pop(udid, None)
if not still_alive:
return 200, [], "已停止"
LogManager.warning(f"[batch_stop] {udid}{attempt}/5 次停止未立即结束,重试", udid)
# 5 次都失败
LogManager.error(f"[batch_stop] {udid} 停止失败5 次重试后线程仍活)", udid)
return 1001, [udid], "部分设备停止失败"
# ---------- 3. 并发执行 ----------
with ThreadPoolExecutor(max_workers=min(32, len(udids))) as executor:
future_map = {executor.submit(_wait_and_clean_with_retry, udid): udid for udid in udids}
for future in as_completed(future_map):
udid = future_map[future]
code, sub_fail_list, sub_msg = future.result() # ← 现在解包正确
if code != 200:
fail_list.extend(sub_fail_list) # 收集失败 udid
# ---------- 4. 返回兼容格式 ----------
if fail_list:
return 1001, fail_list, "部分设备停止失败"
return 200, [], "全部设备停止成功"
def batch_stop(cls, ids: List[str]) -> Tuple[int, str]:
try:
for udid in ids:
cls.stop(udid)
except Exception as e:
print(e)
return 200, "停止成功."
@classmethod
def is_task_running(cls, udid: str) -> bool:
"""
检查任务是否正在运行。
:param udid: 设备的唯一标识符
:return: True 表示任务正在运行False 表示没有任务运行
"""
with cls._lock:
is_running = cls._tasks.get(udid, {}).get("running", False)
LogManager.method_info(f"检查设备 {udid} 的任务状态:{'运行中' if is_running else '未运行'}", method="task")
return is_running
def _kill_thread(cls, tid: int) -> bool:
"""向原生线程 ID 抛 KeyboardInterrupt强制跳出"""
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid),
ctypes.py_object(KeyboardInterrupt))
if res == 0: # 线程已不存在
print("线程不存在")
return False
if res > 1: # 命中多个线程,重置
print("命中了多个线程")
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), 0)
print("杀死线程成功")
return True