Files
iOSAI/Utils/ThreadManager.py

207 lines
6.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import threading
import ctypes
import inspect
import time
from typing import Dict, Optional, List, Tuple, Any
from Utils.LogManager import LogManager
def _raise_async_exception(tid: int, exc_type) -> int:
if not inspect.isclass(exc_type):
raise TypeError("exc_type must be a class")
return ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(tid), ctypes.py_object(exc_type)
)
def _kill_thread_by_tid(tid: Optional[int]) -> bool:
if tid is None:
return False
res = _raise_async_exception(tid, SystemExit)
if res == 0:
return False
if res > 1:
_raise_async_exception(tid, None)
raise SystemError("PyThreadState_SetAsyncExc affected multiple threads; reverted.")
return True
class ThreadManager:
"""
- add(udid, thread_or_target, *args, **kwargs) -> (code, msg)
- stop(udid, join_timeout=2.0, retries=5, wait_step=0.2) -> (code, msg) # 强杀
- batch_stop(udids, join_timeout_each=2.0, retries_each=5, wait_step_each=0.2) -> (code, msg)
- get_thread / get_tid / is_running / list_udids
"""
_threads: Dict[str, threading.Thread] = {}
_lock = threading.RLock()
# ========== 基础 ==========
@classmethod
def add(cls, udid: str, thread_or_target: Any, *args, **kwargs) -> Tuple[int, str]:
"""
兼容两种用法:
1) add(udid, t) # t 是 threading.Thread 实例
2) add(udid, target, *args, **kwargs) # target 是可调用
返回:(200, "创建任务成功") / (1001, "任务已存在") / (1001, "创建任务失败")
"""
with cls._lock:
exist = cls._threads.get(udid)
if exist and exist.is_alive():
return 1001, "任务已存在"
if isinstance(thread_or_target, threading.Thread):
t = thread_or_target
try:
t.daemon = True
except Exception:
pass
if not t.name:
t.name = f"task-{udid}"
# 包装 run退出时从表移除
orig_run = t.run
def run_wrapper():
try:
orig_run()
finally:
with cls._lock:
if cls._threads.get(udid) is t:
cls._threads.pop(udid, None)
t.run = run_wrapper # type: ignore
else:
target = thread_or_target
def _wrapper():
try:
target(*args, **kwargs)
finally:
with cls._lock:
cur = cls._threads.get(udid)
if cur is threading.current_thread():
cls._threads.pop(udid, None)
t = threading.Thread(target=_wrapper, daemon=True, name=f"task-{udid}")
try:
t.start()
except Exception:
return 1001, "创建任务失败"
cls._threads[udid] = t
# 保留你原有的创建成功日志
try:
LogManager.method_info(f"创建任务成功 [{udid}]线程ID={t.ident}", "task")
except Exception:
pass
return 200, "创建任务成功"
@classmethod
def get_thread(cls, udid: str) -> Optional[threading.Thread]:
with cls._lock:
return cls._threads.get(udid)
@classmethod
def get_tid(cls, udid: str) -> Optional[int]:
t = cls.get_thread(udid)
return t.ident if t else None
@classmethod
def is_running(cls, udid: str) -> bool:
t = cls.get_thread(udid)
return bool(t and t.is_alive())
@classmethod
def list_udids(cls) -> List[str]:
with cls._lock:
return list(cls._threads.keys())
# ========== 内部:强杀一次 ==========
@classmethod
def _stop_once(cls, udid: str, join_timeout: float, retries: int, wait_step: float) -> bool:
"""
对指定 udid 执行一次强杀流程;返回 True=已停止/不存在False=仍存活或被拒。
"""
with cls._lock:
t = cls._threads.get(udid)
if not t:
return True # 视为已停止
main_tid = threading.main_thread().ident
cur_tid = threading.get_ident()
if t.ident in (main_tid, cur_tid):
return False
try:
_kill_thread_by_tid(t.ident)
except Exception:
pass
if join_timeout < 0:
join_timeout = 0.0
t.join(join_timeout)
while t.is_alive() and retries > 0:
evt = threading.Event()
evt.wait(wait_step)
retries -= 1
dead = not t.is_alive()
if dead:
with cls._lock:
if cls._threads.get(udid) is t:
cls._threads.pop(udid, None)
return dead
# ========== 对外stop / batch_stop均返回二元组 ==========
@classmethod
def stop(cls, udid: str, join_timeout: float = 2.0,
retries: int = 5, wait_step: float = 0.2) -> Tuple[int, str]:
"""
强杀单个:返回 (200, "stopped") 或 (1001, "failed")
"""
ok = cls._stop_once(udid, join_timeout, retries, wait_step)
if ok:
return 200, "stopped"
else:
return 1001, "failed"
@classmethod
def batch_stop(cls, udids: List[str], join_timeout_each: float = 2.0,
retries_each: int = 5, wait_step_each: float = 0.2) -> Tuple[int, str]:
"""
先全量执行一遍 -> 记录失败 -> 对失败重试 3 轮(每轮间隔 1 秒)
全部完成后统一返回:
全成功 : (200, "停止任务成功")
仍有失败: (1001, "停止任务失败")
"""
udids = udids or []
fail: List[str] = []
# 第一轮
for u in udids:
ok = cls._stop_once(u, join_timeout_each, retries_each, wait_step_each)
if not ok:
fail.append(u)
# 三轮只对失败重试
for _ in range(3):
if not fail:
break
time.sleep(1.0)
remain: List[str] = []
for u in fail:
ok = cls._stop_once(u, join_timeout_each, retries_each, wait_step_each)
if not ok:
remain.append(u)
fail = remain
if not fail:
return 200, "停止任务成功"
else:
return 1001, "停止任务失败"