# -*- coding: utf-8 -*- import threading import ctypes import inspect import time from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Dict, Optional, List, Tuple, Any from Utils.LogManager import LogManager def _raise_async_exception(tid: int, exc_type) -> int: if not inspect.isclass(exc_type): raise TypeError("exc_type must be a class") return ctypes.pythonapi.PyThreadState_SetAsyncExc( ctypes.c_long(tid), ctypes.py_object(exc_type) ) def _kill_thread_by_tid(tid: Optional[int]) -> bool: if tid is None: return False res = _raise_async_exception(tid, SystemExit) if res == 0: return False if res > 1: _raise_async_exception(tid, None) raise SystemError("PyThreadState_SetAsyncExc affected multiple threads; reverted.") return True class ThreadManager: """ - add(udid, thread_or_target, *args, **kwargs) -> (code, msg) - stop(udid, join_timeout=2.0, retries=5, wait_step=0.2) -> (code, msg) # 强杀 - batch_stop(udids, join_timeout_each=2.0, retries_each=5, wait_step_each=0.2) -> (code, msg) - get_thread / get_tid / is_running / list_udids """ _threads: Dict[str, threading.Thread] = {} _lock = threading.RLock() # ========== 基础 ========== @classmethod def add(cls, udid: str, thread_or_target: Any, *args, **kwargs) -> Tuple[int, str]: """ 兼容两种用法: 1) add(udid, t) # t 是 threading.Thread 实例 2) add(udid, target, *args, **kwargs) # target 是可调用 返回:(200, "创建任务成功") / (1001, "任务已存在") / (1001, "创建任务失败") """ with cls._lock: exist = cls._threads.get(udid) if exist and exist.is_alive(): return 1001, "任务已存在" if isinstance(thread_or_target, threading.Thread): t = thread_or_target try: t.daemon = True except Exception: pass if not t.name: t.name = f"task-{udid}" # 包装 run,退出时从表移除 orig_run = t.run def run_wrapper(): try: orig_run() finally: with cls._lock: if cls._threads.get(udid) is t: cls._threads.pop(udid, None) t.run = run_wrapper # type: ignore else: target = thread_or_target def _wrapper(): try: target(*args, **kwargs) finally: with cls._lock: cur = cls._threads.get(udid) if cur is threading.current_thread(): cls._threads.pop(udid, None) t = threading.Thread(target=_wrapper, daemon=True, name=f"task-{udid}") try: t.start() except Exception: return 1001, "创建任务失败" cls._threads[udid] = t # 保留你原有的创建成功日志 try: LogManager.method_info(f"创建任务成功 [{udid}],线程ID={t.ident}", "task") except Exception: pass return 200, "创建任务成功" @classmethod def get_thread(cls, udid: str) -> Optional[threading.Thread]: with cls._lock: return cls._threads.get(udid) @classmethod def get_tid(cls, udid: str) -> Optional[int]: t = cls.get_thread(udid) return t.ident if t else None @classmethod def is_running(cls, udid: str) -> bool: t = cls.get_thread(udid) return bool(t and t.is_alive()) @classmethod def list_udids(cls) -> List[str]: with cls._lock: return list(cls._threads.keys()) # ========== 内部:强杀一次 ========== @classmethod def _stop_once(cls, udid: str, join_timeout: float, retries: int, wait_step: float) -> bool: """ 对指定 udid 执行一次强杀流程;返回 True=已停止/不存在,False=仍存活或被拒。 """ with cls._lock: t = cls._threads.get(udid) if not t: return True # 视为已停止 main_tid = threading.main_thread().ident cur_tid = threading.get_ident() if t.ident in (main_tid, cur_tid): return False try: _kill_thread_by_tid(t.ident) except Exception: pass if join_timeout < 0: join_timeout = 0.0 t.join(join_timeout) while t.is_alive() and retries > 0: evt = threading.Event() evt.wait(wait_step) retries -= 1 dead = not t.is_alive() if dead: with cls._lock: if cls._threads.get(udid) is t: cls._threads.pop(udid, None) return dead # ========== 对外:stop / batch_stop(均返回二元组) ========== @classmethod def stop(cls, udid: str, join_timeout: float = 2.0, retries: int = 5, wait_step: float = 0.2) -> Tuple[int, str]: """ 强杀单个:返回 (200, "stopped") 或 (1001, "failed") """ ok = cls._stop_once(udid, join_timeout, retries, wait_step) if ok: return 200, "stopped" else: return 1001, "failed" @classmethod def batch_stop(cls, udids: List[str]) -> Tuple[int, str, List[str]]: """ 并行批量停止(简化版): - 只接收 udids 参数 - 其他参数写死:join_timeout=2.0, retries=5, wait_step=0.2 - 所有设备同时执行,失败的重试 3 轮,每轮间隔 1 秒 - 返回: (200, "停止任务成功", []) (1001, "停止任务失败", [失败udid...]) """ if not udids: return 200, "停止任务成功", [] join_timeout = 2.0 retries = 5 wait_step = 0.2 retry_rounds = 3 round_interval = 1.0 def _stop_one(u: str) -> Tuple[str, bool]: ok = cls._stop_once(u, join_timeout, retries, wait_step) return u, ok # === 第一轮:并行执行所有设备 === fail: List[str] = [] with ThreadPoolExecutor(max_workers=len(udids)) as pool: futures = [pool.submit(_stop_one, u) for u in udids] for f in as_completed(futures): u, ok = f.result() if not ok: fail.append(u) # === 对失败的设备重试 3 轮(每轮间隔 1 秒) === for _ in range(retry_rounds): if not fail: break time.sleep(round_interval) remain: List[str] = [] with ThreadPoolExecutor(max_workers=len(fail)) as pool: futures = [pool.submit(_stop_one, u) for u in fail] for f in as_completed(futures): u, ok = f.result() if not ok: remain.append(u) fail = remain # === 返回结果 === if not fail: return 200, "停止任务成功", [] else: return 1001, "停止任务失败", fail