Files
iOSAI/Utils/ThreadManager.py
2025-10-25 01:18:41 +08:00

145 lines
5.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import ctypes
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, Tuple, List
from Utils.LogManager import LogManager
def _async_raise(tid: int, exc_type=KeyboardInterrupt) -> bool:
"""向指定线程抛异常"""
if not tid:
LogManager.method_error(f"强杀失败: 线程ID为空", "task")
return False
try:
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(tid), ctypes.py_object(exc_type)
)
if res == 0:
LogManager.method_info(f"线程 {tid} 不存在", "task")
return False
elif res > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), 0)
LogManager.method_info(f"线程 {tid} 命中多个线程,已回滚", "task")
return False
return True
except Exception as e:
LogManager.method_error(f"强杀线程失败: {e}", "task")
return False
class ThreadManager:
_tasks: Dict[str, Dict] = {}
_lock = threading.RLock()
@classmethod
def _cleanup_if_dead(cls, udid: str):
obj = cls._tasks.get(udid)
if obj and not obj["thread"].is_alive():
cls._tasks.pop(udid, None)
LogManager.method_info(f"检测到 [{udid}] 线程已结束,自动清理。", "task")
@classmethod
def add(cls, udid: str, thread: threading.Thread, event: threading.Event, force: bool = False) -> Tuple[int, str]:
with cls._lock:
cls._cleanup_if_dead(udid)
old = cls._tasks.get(udid)
if old and old["thread"].is_alive():
if not force:
return 1001, "当前设备已存在任务"
LogManager.method_info(f"[{udid}] 检测到旧任务,尝试强制停止", "task")
cls._force_stop_locked(udid)
try:
thread.start()
# 等待 ident 初始化
for _ in range(10):
if thread.ident:
break
time.sleep(0.05)
cls._tasks[udid] = {
"id": thread.ident,
"thread": thread,
"event": event,
"start_time": time.time(),
}
LogManager.method_info(f"创建任务成功 [{udid}]线程ID={thread.ident}", "task")
return 200, "创建成功"
except Exception as e:
LogManager.method_error(f"线程启动失败: {e}", "task")
return 1002, f"线程启动失败: {e}"
@classmethod
def stop(cls, udid: str, stop_timeout: float = 5.0, kill_timeout: float = 2.0) -> Tuple[int, str]:
with cls._lock:
obj = cls._tasks.get(udid)
if not obj:
return 200, "任务不存在"
thread = obj["thread"]
event = obj["event"]
tid = obj["id"]
LogManager.method_info(f"请求停止 [{udid}] 线程ID={tid}", "task")
if not thread.is_alive():
cls._tasks.pop(udid, None)
return 200, "已结束"
# 协作式停止
try:
event.set()
except Exception as e:
LogManager.method_error(f"[{udid}] 设置停止事件失败: {e}", "task")
# 🔹 不阻塞主线程
def _wait_stop():
thread.join(timeout=stop_timeout)
if thread.is_alive():
LogManager.method_info(f"[{udid}] 协作超时 -> 尝试强杀", "task")
_async_raise(tid)
thread.join(timeout=kill_timeout)
if not thread.is_alive():
LogManager.method_info(f"[{udid}] 停止成功", "task")
else:
LogManager.method_error(f"[{udid}] 停止失败(线程卡死),已清理占位", "task")
with cls._lock:
cls._tasks.pop(udid, None)
threading.Thread(target=_wait_stop, daemon=True).start()
return 200, "停止请求已提交"
@classmethod
def _force_stop_locked(cls, udid: str):
obj = cls._tasks.get(udid)
if not obj:
return
try:
event = obj["event"]
event.set()
obj["thread"].join(timeout=2)
if obj["thread"].is_alive():
_async_raise(obj["id"])
obj["thread"].join(timeout=1)
except Exception as e:
LogManager.method_error(f"[{udid}] 强制停止失败: {e}", "task")
finally:
cls._tasks.pop(udid, None)
@classmethod
def batch_stop(cls, ids: List[str]) -> Tuple[int, str]:
failed = []
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(cls.stop, udid): udid for udid in ids}
for future in as_completed(futures):
code, msg = future.result()
if code != 200:
failed.append(futures[future])
if failed:
return 207, f"部分任务停止失败: {failed}"
return 200, "全部停止请求已提交"