优化批量停止任务
This commit is contained in:
@@ -1,13 +1,17 @@
|
|||||||
import ctypes
|
import ctypes
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||||
from typing import Dict, Tuple, List
|
from typing import Dict, Tuple, List
|
||||||
|
|
||||||
from Utils.LogManager import LogManager
|
from Utils.LogManager import LogManager
|
||||||
|
|
||||||
|
|
||||||
def _async_raise(tid: int, exc_type=KeyboardInterrupt) -> bool:
|
def _async_raise(tid: int, exc_type=KeyboardInterrupt) -> bool:
|
||||||
"""向指定线程抛异常(兜底方案)"""
|
"""向指定线程抛异常"""
|
||||||
|
if not tid:
|
||||||
|
LogManager.method_error(f"强杀失败: 线程ID为空", "task")
|
||||||
|
return False
|
||||||
try:
|
try:
|
||||||
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
|
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
|
||||||
ctypes.c_long(tid), ctypes.py_object(exc_type)
|
ctypes.c_long(tid), ctypes.py_object(exc_type)
|
||||||
@@ -31,7 +35,6 @@ class ThreadManager:
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _cleanup_if_dead(cls, udid: str):
|
def _cleanup_if_dead(cls, udid: str):
|
||||||
"""如果任务线程已结束,清理占位"""
|
|
||||||
obj = cls._tasks.get(udid)
|
obj = cls._tasks.get(udid)
|
||||||
if obj and not obj["thread"].is_alive():
|
if obj and not obj["thread"].is_alive():
|
||||||
cls._tasks.pop(udid, None)
|
cls._tasks.pop(udid, None)
|
||||||
@@ -41,8 +44,6 @@ class ThreadManager:
|
|||||||
def add(cls, udid: str, thread: threading.Thread, event: threading.Event, force: bool = False) -> Tuple[int, str]:
|
def add(cls, udid: str, thread: threading.Thread, event: threading.Event, force: bool = False) -> Tuple[int, str]:
|
||||||
with cls._lock:
|
with cls._lock:
|
||||||
cls._cleanup_if_dead(udid)
|
cls._cleanup_if_dead(udid)
|
||||||
|
|
||||||
# 已存在任务还在运行
|
|
||||||
old = cls._tasks.get(udid)
|
old = cls._tasks.get(udid)
|
||||||
if old and old["thread"].is_alive():
|
if old and old["thread"].is_alive():
|
||||||
if not force:
|
if not force:
|
||||||
@@ -50,9 +51,14 @@ class ThreadManager:
|
|||||||
LogManager.method_info(f"[{udid}] 检测到旧任务,尝试强制停止", "task")
|
LogManager.method_info(f"[{udid}] 检测到旧任务,尝试强制停止", "task")
|
||||||
cls._force_stop_locked(udid)
|
cls._force_stop_locked(udid)
|
||||||
|
|
||||||
# 启动新任务
|
|
||||||
try:
|
try:
|
||||||
thread.start()
|
thread.start()
|
||||||
|
# 等待 ident 初始化
|
||||||
|
for _ in range(10):
|
||||||
|
if thread.ident:
|
||||||
|
break
|
||||||
|
time.sleep(0.05)
|
||||||
|
|
||||||
cls._tasks[udid] = {
|
cls._tasks[udid] = {
|
||||||
"id": thread.ident,
|
"id": thread.ident,
|
||||||
"thread": thread,
|
"thread": thread,
|
||||||
@@ -67,7 +73,6 @@ class ThreadManager:
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def stop(cls, udid: str, stop_timeout: float = 5.0, kill_timeout: float = 2.0) -> Tuple[int, str]:
|
def stop(cls, udid: str, stop_timeout: float = 5.0, kill_timeout: float = 2.0) -> Tuple[int, str]:
|
||||||
"""安全停止单个任务"""
|
|
||||||
with cls._lock:
|
with cls._lock:
|
||||||
obj = cls._tasks.get(udid)
|
obj = cls._tasks.get(udid)
|
||||||
if not obj:
|
if not obj:
|
||||||
@@ -79,41 +84,37 @@ class ThreadManager:
|
|||||||
|
|
||||||
LogManager.method_info(f"请求停止 [{udid}] 线程ID={tid}", "task")
|
LogManager.method_info(f"请求停止 [{udid}] 线程ID={tid}", "task")
|
||||||
|
|
||||||
# 已经结束
|
|
||||||
if not thread.is_alive():
|
if not thread.is_alive():
|
||||||
cls._tasks.pop(udid, None)
|
cls._tasks.pop(udid, None)
|
||||||
return 200, "已结束"
|
return 200, "已结束"
|
||||||
|
|
||||||
# 1. 协作式停止
|
# 协作式停止
|
||||||
try:
|
try:
|
||||||
event.set()
|
event.set()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
LogManager.method_error(f"[{udid}] 设置停止事件失败: {e}", "task")
|
LogManager.method_error(f"[{udid}] 设置停止事件失败: {e}", "task")
|
||||||
|
|
||||||
|
# 🔹 不阻塞主线程
|
||||||
|
def _wait_stop():
|
||||||
thread.join(timeout=stop_timeout)
|
thread.join(timeout=stop_timeout)
|
||||||
if not thread.is_alive():
|
if thread.is_alive():
|
||||||
cls._tasks.pop(udid, None)
|
LogManager.method_info(f"[{udid}] 协作超时 -> 尝试强杀", "task")
|
||||||
LogManager.method_info(f"[{udid}] 协作式停止成功", "task")
|
_async_raise(tid)
|
||||||
return 200, "已停止"
|
|
||||||
|
|
||||||
# 2. 强杀兜底
|
|
||||||
LogManager.method_info(f"[{udid}] 协作式超时,尝试强杀", "task")
|
|
||||||
if _async_raise(tid):
|
|
||||||
thread.join(timeout=kill_timeout)
|
thread.join(timeout=kill_timeout)
|
||||||
|
|
||||||
if not thread.is_alive():
|
if not thread.is_alive():
|
||||||
cls._tasks.pop(udid, None)
|
LogManager.method_info(f"[{udid}] 停止成功", "task")
|
||||||
LogManager.method_info(f"[{udid}] 强杀成功", "task")
|
else:
|
||||||
return 200, "已停止"
|
LogManager.method_error(f"[{udid}] 停止失败(线程卡死),已清理占位", "task")
|
||||||
|
|
||||||
# 3. 最终兜底:标记释放占位
|
with cls._lock:
|
||||||
LogManager.method_error(f"[{udid}] 无法停止(线程可能卡死),已释放占位", "task")
|
|
||||||
cls._tasks.pop(udid, None)
|
cls._tasks.pop(udid, None)
|
||||||
return 206, "停止超时,线程可能仍在后台运行"
|
|
||||||
|
threading.Thread(target=_wait_stop, daemon=True).start()
|
||||||
|
return 200, "停止请求已提交"
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _force_stop_locked(cls, udid: str):
|
def _force_stop_locked(cls, udid: str):
|
||||||
"""内部用,带锁强制停止旧任务"""
|
|
||||||
obj = cls._tasks.get(udid)
|
obj = cls._tasks.get(udid)
|
||||||
if not obj:
|
if not obj:
|
||||||
return
|
return
|
||||||
@@ -132,10 +133,13 @@ class ThreadManager:
|
|||||||
@classmethod
|
@classmethod
|
||||||
def batch_stop(cls, ids: List[str]) -> Tuple[int, str]:
|
def batch_stop(cls, ids: List[str]) -> Tuple[int, str]:
|
||||||
failed = []
|
failed = []
|
||||||
for udid in ids:
|
with ThreadPoolExecutor(max_workers=4) as executor:
|
||||||
code, msg = cls.stop(udid)
|
futures = {executor.submit(cls.stop, udid): udid for udid in ids}
|
||||||
|
for future in as_completed(futures):
|
||||||
|
code, msg = future.result()
|
||||||
if code != 200:
|
if code != 200:
|
||||||
failed.append(udid)
|
failed.append(futures[future])
|
||||||
|
|
||||||
if failed:
|
if failed:
|
||||||
return 207, f"部分任务未成功停止: {failed}"
|
return 207, f"部分任务停止失败: {failed}"
|
||||||
return 200, "全部停止成功"
|
return 200, "全部停止请求已提交"
|
||||||
Reference in New Issue
Block a user