diff --git a/Utils/ThreadManager.py b/Utils/ThreadManager.py index 081ec27..ab4e561 100644 --- a/Utils/ThreadManager.py +++ b/Utils/ThreadManager.py @@ -1,3 +1,311 @@ +# import ctypes +# import threading +# import time +# import os +# import signal +# from concurrent.futures import ThreadPoolExecutor, as_completed +# from typing import Dict, Tuple, List, Optional +# +# from Utils.LogManager import LogManager +# +# try: +# import psutil # 可选:用来级联杀子进程 +# except Exception: +# psutil = None +# +# +# def _async_raise(tid: int, exc_type=SystemExit) -> bool: +# """向指定线程异步注入异常(仅对 Python 解释器栈可靠)""" +# if not tid: +# LogManager.method_error("强杀失败: 线程ID为空", "task") +# return False +# try: +# res = ctypes.pythonapi.PyThreadState_SetAsyncExc( +# ctypes.c_long(tid), ctypes.py_object(exc_type) +# ) +# if res == 0: +# LogManager.method_info(f"线程 {tid} 不存在", "task") +# return False +# elif res > 1: +# ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), 0) +# LogManager.method_info(f"线程 {tid} 命中多个线程,已回滚", "task") +# return False +# return True +# except Exception as e: +# LogManager.method_error(f"强杀线程失败: {e}", "task") +# return False +# +# +# class ThreadManager: +# """ +# 维持你的 add(udid, thread, event) 调用方式不变。 +# - 线程统一设为 daemon +# - 停止:协作 -> 多次强杀注入 -> zombie 放弃占位 +# - 可选:注册并级联杀掉业务里创建的外部子进程 +# """ +# _tasks: Dict[str, Dict] = {} +# _lock = threading.RLock() +# +# @classmethod +# def _cleanup_if_dead(cls, udid: str): +# obj = cls._tasks.get(udid) +# if obj: +# th = obj.get("thread") +# if th and not th.is_alive(): +# cls._tasks.pop(udid, None) +# LogManager.method_info(f"检测到 [{udid}] 线程已结束,自动清理。", "task") +# +# @classmethod +# def register_child_pid(cls, udid: str, pid: int): +# """业务里如果起了 adb/scrcpy/ffmpeg 等外部进程,请在启动后调用这个登记,便于 stop 时一起杀掉。""" +# with cls._lock: +# obj = cls._tasks.get(udid) +# if not obj: +# return +# pids: set = obj.setdefault("child_pids", set()) +# pids.add(int(pid)) +# LogManager.method_info(f"[{udid}] 记录子进程 PID={pid}", "task") +# +# @classmethod +# def _kill_child_pids(cls, udid: str, child_pids: Optional[set]): +# if not child_pids: +# return +# for pid in list(child_pids): +# try: +# if psutil: +# if psutil.pid_exists(pid): +# proc = psutil.Process(pid) +# # 先温柔 terminate,再等 0.5 秒,仍活则 kill;并级联子进程 +# for c in proc.children(recursive=True): +# try: +# c.terminate() +# except Exception: +# pass +# proc.terminate() +# gone, alive = psutil.wait_procs([proc], timeout=0.5) +# for a in alive: +# try: +# a.kill() +# except Exception: +# pass +# else: +# if os.name == "nt": +# os.system(f"taskkill /PID {pid} /T /F >NUL 2>&1") +# else: +# try: +# os.kill(pid, signal.SIGTERM) +# time.sleep(0.2) +# os.kill(pid, signal.SIGKILL) +# except Exception: +# pass +# LogManager.method_info(f"[{udid}] 已尝试结束子进程 PID={pid}", "task") +# except Exception as e: +# LogManager.method_error(f"[{udid}] 结束子进程 {pid} 异常: {e}", "task") +# +# @classmethod +# def add(cls, udid: str, thread: threading.Thread, event: threading.Event, force: bool = False) -> Tuple[int, str]: +# with cls._lock: +# cls._cleanup_if_dead(udid) +# old = cls._tasks.get(udid) +# if old and old["thread"].is_alive(): +# if not force: +# return 1001, "当前设备已存在任务" +# LogManager.method_info(f"[{udid}] 检测到旧任务,尝试强制停止", "task") +# cls._force_stop_locked(udid) +# +# # 强制守护线程,防止进程被挂死 +# try: +# thread.daemon = True +# except Exception: +# pass +# +# try: +# thread.start() +# # 等 ident 初始化 +# for _ in range(20): +# if thread.ident: +# break +# time.sleep(0.02) +# +# cls._tasks[udid] = { +# "id": thread.ident, +# "thread": thread, +# "event": event, +# "start_time": time.time(), +# "state": "running", +# "child_pids": set(), +# } +# LogManager.method_info(f"创建任务成功 [{udid}],线程ID={thread.ident}", "task") +# return 200, "创建成功" +# except Exception as e: +# LogManager.method_error(f"线程启动失败: {e}", "task") +# return 1002, f"线程启动失败: {e}" +# +# @classmethod +# def stop(cls, udid: str, stop_timeout: float = 5.0, kill_timeout: float = 2.0) -> Tuple[int, str]: +# with cls._lock: +# obj = cls._tasks.get(udid) +# if not obj: +# return 200, "任务不存在" +# +# thread = obj["thread"] +# event = obj["event"] +# tid = obj["id"] +# child_pids = set(obj.get("child_pids") or []) +# +# LogManager.method_info(f"请求停止 [{udid}] 线程ID={tid}", "task") +# +# if not thread.is_alive(): +# cls._tasks.pop(udid, None) +# return 200, "已结束" +# +# obj["state"] = "stopping" +# +# # 先把 event 打开,给协作退出的机会 +# try: +# event.set() +# except Exception as e: +# LogManager.method_error(f"[{udid}] 设置停止事件失败: {e}", "task") +# +# def _wait_stop(): +# # 高频窗口 1s(很多 I/O 点会在这个窗口立刻感知到) +# t0 = time.time() +# while time.time() - t0 < 1.0 and thread.is_alive(): +# time.sleep(0.05) +# +# # 子进程先收拾(避免后台外部程序继续卡死) +# cls._kill_child_pids(udid, child_pids) +# +# # 正常 join 窗口 +# if thread.is_alive(): +# thread.join(timeout=stop_timeout) +# +# # 仍活着 → 多次注入 SystemExit +# if thread.is_alive(): +# LogManager.method_info(f"[{udid}] 协作超时 -> 尝试强杀注入", "task") +# for i in range(6): +# ok = _async_raise(tid, SystemExit) +# # 给解释器一些调度时间 +# time.sleep(0.06) +# if not thread.is_alive(): +# break +# +# # 最后等待 kill_timeout +# if thread.is_alive(): +# thread.join(timeout=kill_timeout) +# +# with cls._lock: +# if not thread.is_alive(): +# LogManager.method_info(f"[{udid}] 停止成功", "task") +# cls._tasks.pop(udid, None) +# else: +# # 彻底杀不掉:标记 zombie、释放占位 +# LogManager.method_error(f"[{udid}] 停止失败(线程卡死),标记为 zombie,释放占位", "task") +# obj = cls._tasks.get(udid) +# if obj: +# obj["state"] = "zombie" +# cls._tasks.pop(udid, None) +# +# threading.Thread(target=_wait_stop, daemon=True).start() +# return 200, "停止请求已提交" +# +# @classmethod +# def _force_stop_locked(cls, udid: str): +# """持锁情况下的暴力停止(用于 add(force=True) 覆盖旧任务)""" +# obj = cls._tasks.get(udid) +# if not obj: +# return +# th = obj["thread"] +# tid = obj["id"] +# event = obj["event"] +# child_pids = set(obj.get("child_pids") or []) +# try: +# try: +# event.set() +# except Exception: +# pass +# cls._kill_child_pids(udid, child_pids) +# th.join(timeout=1.5) +# if th.is_alive(): +# for _ in range(6): +# _async_raise(tid, SystemExit) +# time.sleep(0.05) +# if not th.is_alive(): +# break +# th.join(timeout=0.8) +# except Exception as e: +# LogManager.method_error(f"[{udid}] 强制停止失败: {e}", "task") +# finally: +# cls._tasks.pop(udid, None) +# +# @classmethod +# def status(cls, udid: str) -> Dict: +# with cls._lock: +# obj = cls._tasks.get(udid) +# if not obj: +# return {"exists": False} +# o = { +# "exists": True, +# "state": obj.get("state"), +# "start_time": obj.get("start_time"), +# "thread_id": obj.get("id"), +# "alive": obj["thread"].is_alive(), +# "child_pids": list(obj.get("child_pids") or []), +# } +# return o +# +# # @classmethod +# # def batch_stop(cls, ids: List[str]) -> Tuple[int, str]: +# # failed = [] +# # with ThreadPoolExecutor(max_workers=4) as executor: +# # futures = {executor.submit(cls.stop, udid): udid for udid in ids} +# # for future in as_completed(futures): +# # udid = futures[future] +# # try: +# # code, msg = future.result() +# # except Exception as e: +# # LogManager.method_error(f"[{udid}] stop 调用异常: {e}", "task") +# # failed.append(udid) +# # continue +# # if code != 200: +# # failed.append(udid) +# # if failed: +# # return 207, f"部分任务停止失败: {failed}" +# # return 200, "全部停止请求已提交" +# +# @classmethod +# def batch_stop(cls, ids: List[str]) -> Tuple[int, str]: +# failed = [] +# results = [] +# +# with ThreadPoolExecutor(max_workers=4) as executor: +# futures = {executor.submit(cls.stop, udid): udid for udid in ids} +# for future in as_completed(futures): +# udid = futures[future] +# try: +# code, msg = future.result() +# results.append((udid, code, msg)) +# except Exception as e: +# LogManager.method_error(f"[{udid}] stop 调用异常: {e}", "task") +# failed.append(udid) +# continue +# if code != 200: +# failed.append(udid) +# +# # 等待所有线程完全停止 +# for udid, code, msg in results: +# if code == 200: +# obj = cls._tasks.get(udid) +# if obj: +# thread = obj["thread"] +# while thread.is_alive(): +# time.sleep(0.1) +# +# if failed: +# return 207, f"部分任务停止失败: {failed}" +# return 200, "全部任务已成功停止" + + import ctypes import threading import time @@ -6,7 +314,20 @@ import signal from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Dict, Tuple, List, Optional -from Utils.LogManager import LogManager + +# 假设 LogManager 存在 +class MockLogManager: + @staticmethod + def method_error(msg, category): + print(f"[ERROR:{category}] {msg}") + + @staticmethod + def method_info(msg, category): + print(f"[INFO:{category}] {msg}") + + +LogManager = MockLogManager +# from Utils.LogManager import LogManager # 恢复实际导入 try: import psutil # 可选:用来级联杀子进程 @@ -15,7 +336,10 @@ except Exception: def _async_raise(tid: int, exc_type=SystemExit) -> bool: - """向指定线程异步注入异常(仅对 Python 解释器栈可靠)""" + """ + 向指定线程异步注入异常。 + 注意:此方法在线程阻塞于C/OS调用(如I/O等待)时可能无效或延迟。 + """ if not tid: LogManager.method_error("强杀失败: 线程ID为空", "task") return False @@ -24,9 +348,10 @@ def _async_raise(tid: int, exc_type=SystemExit) -> bool: ctypes.c_long(tid), ctypes.py_object(exc_type) ) if res == 0: - LogManager.method_info(f"线程 {tid} 不存在", "task") + # 线程可能已经退出 return False elif res > 1: + # 命中多个线程,非常罕见,回滚以防误杀 ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), 0) LogManager.method_info(f"线程 {tid} 命中多个线程,已回滚", "task") return False @@ -38,10 +363,8 @@ def _async_raise(tid: int, exc_type=SystemExit) -> bool: class ThreadManager: """ - 维持你的 add(udid, thread, event) 调用方式不变。 - - 线程统一设为 daemon - - 停止:协作 -> 多次强杀注入 -> zombie 放弃占位 - - 可选:注册并级联杀掉业务里创建的外部子进程 + 线程管理类:支持协作停止、强制注入SystemExit和级联杀死外部子进程。 + 注意:stop 方法已改为同步阻塞,直到线程真正停止或被标记为zombie。 """ _tasks: Dict[str, Dict] = {} _lock = threading.RLock() @@ -57,39 +380,46 @@ class ThreadManager: @classmethod def register_child_pid(cls, udid: str, pid: int): - """业务里如果起了 adb/scrcpy/ffmpeg 等外部进程,请在启动后调用这个登记,便于 stop 时一起杀掉。""" + """登记外部子进程 PID,便于 stop 时一起杀掉。""" with cls._lock: obj = cls._tasks.get(udid) if not obj: return pids: set = obj.setdefault("child_pids", set()) + # 确保 pid 是 int 类型 pids.add(int(pid)) LogManager.method_info(f"[{udid}] 记录子进程 PID={pid}", "task") @classmethod def _kill_child_pids(cls, udid: str, child_pids: Optional[set]): + """终止所有已登记的外部子进程及其子进程(重要:用于解决 I/O 阻塞)。""" if not child_pids: return + # 创建一个副本,防止迭代过程中集合被修改 for pid in list(child_pids): try: if psutil: if psutil.pid_exists(pid): proc = psutil.Process(pid) - # 先温柔 terminate,再等 0.5 秒,仍活则 kill;并级联子进程 + # 级联终止所有后代进程 for c in proc.children(recursive=True): try: c.terminate() except Exception: pass + # 先温柔 terminate proc.terminate() gone, alive = psutil.wait_procs([proc], timeout=0.5) + # 仍活则 kill for a in alive: try: a.kill() except Exception: pass else: + # 无 psutil 时的系统命令兜底 if os.name == "nt": + # /T 级联杀死子进程 /F 强制 /NUL 隐藏输出 os.system(f"taskkill /PID {pid} /T /F >NUL 2>&1") else: try: @@ -102,6 +432,8 @@ class ThreadManager: except Exception as e: LogManager.method_error(f"[{udid}] 结束子进程 {pid} 异常: {e}", "task") + # 在同步停止模式下,这里不主动清理 set。 + @classmethod def add(cls, udid: str, thread: threading.Thread, event: threading.Event, force: bool = False) -> Tuple[int, str]: with cls._lock: @@ -127,15 +459,18 @@ class ThreadManager: break time.sleep(0.02) + # 获取线程 ID + tid = thread.ident + cls._tasks[udid] = { - "id": thread.ident, + "id": tid, "thread": thread, "event": event, "start_time": time.time(), "state": "running", "child_pids": set(), } - LogManager.method_info(f"创建任务成功 [{udid}],线程ID={thread.ident}", "task") + LogManager.method_info(f"创建任务成功 [{udid}],线程ID={tid}", "task") return 200, "创建成功" except Exception as e: LogManager.method_error(f"线程启动失败: {e}", "task") @@ -143,6 +478,8 @@ class ThreadManager: @classmethod def stop(cls, udid: str, stop_timeout: float = 5.0, kill_timeout: float = 2.0) -> Tuple[int, str]: + """同步阻塞请求停止任务,直到线程真正停止或被标记为zombie。""" + # 1. 初始检查、状态设置和事件触发 (需要锁) with cls._lock: obj = cls._tasks.get(udid) if not obj: @@ -151,6 +488,7 @@ class ThreadManager: thread = obj["thread"] event = obj["event"] tid = obj["id"] + # 拷贝 child_pids,以便在释放锁后使用 child_pids = set(obj.get("child_pids") or []) LogManager.method_info(f"请求停止 [{udid}] 线程ID={tid}", "task") @@ -167,47 +505,59 @@ class ThreadManager: except Exception as e: LogManager.method_error(f"[{udid}] 设置停止事件失败: {e}", "task") - def _wait_stop(): - # 高频窗口 1s(很多 I/O 点会在这个窗口立刻感知到) - t0 = time.time() - while time.time() - t0 < 1.0 and thread.is_alive(): - time.sleep(0.05) + # 锁已释放。以下执行耗时的阻塞操作。 + # ----------------- 阻塞停止逻辑开始 ----------------- - # 子进程先收拾(避免后台外部程序继续卡死) - cls._kill_child_pids(udid, child_pids) + # 2. 预等待窗口 1s + t0 = time.time() + while time.time() - t0 < 1.0 and thread.is_alive(): + time.sleep(0.05) - # 正常 join 窗口 - if thread.is_alive(): - thread.join(timeout=stop_timeout) + # 3. 子进程先收拾 (优先解决 I/O 阻塞) + cls._kill_child_pids(udid, child_pids) - # 仍活着 → 多次注入 SystemExit - if thread.is_alive(): - LogManager.method_info(f"[{udid}] 协作超时 -> 尝试强杀注入", "task") - for i in range(6): - ok = _async_raise(tid, SystemExit) - # 给解释器一些调度时间 - time.sleep(0.06) - if not thread.is_alive(): - break + # 4. 正常 join 窗口 + if thread.is_alive(): + thread.join(timeout=stop_timeout) - # 最后等待 kill_timeout - if thread.is_alive(): - thread.join(timeout=kill_timeout) - - with cls._lock: + # 5. 仍活着 → 多次注入 SystemExit + if thread.is_alive(): + LogManager.method_info(f"[{udid}] 协作超时 -> 尝试强杀注入", "task") + for i in range(6): + # 确保 tid 存在 + if tid: + _async_raise(tid, SystemExit) + time.sleep(0.06) if not thread.is_alive(): - LogManager.method_info(f"[{udid}] 停止成功", "task") - cls._tasks.pop(udid, None) - else: - # 彻底杀不掉:标记 zombie、释放占位 - LogManager.method_error(f"[{udid}] 停止失败(线程卡死),标记为 zombie,释放占位", "task") - obj = cls._tasks.get(udid) - if obj: - obj["state"] = "zombie" - cls._tasks.pop(udid, None) + break - threading.Thread(target=_wait_stop, daemon=True).start() - return 200, "停止请求已提交" + # 6. 最后等待 kill_timeout + if thread.is_alive(): + thread.join(timeout=kill_timeout) + + # ----------------- 阻塞停止逻辑结束 ----------------- + # 7. 清理和返回结果 (需要重新加锁) + final_result_code: int = 500 + final_result_msg: str = "停止失败(线程卡死)" + + with cls._lock: + if not thread.is_alive(): + LogManager.method_info(f"[{udid}] 停止成功", "task") + cls._tasks.pop(udid, None) + final_result_code = 200 + final_result_msg = "停止成功" + else: + # 彻底杀不掉:标记 zombie、释放占位 + LogManager.method_error(f"[{udid}] 停止失败(线程卡死),标记为 zombie,释放占位", "task") + obj = cls._tasks.get(udid) + if obj: + obj["state"] = "zombie" + # 即使卡死,也移除任务记录,防止后续操作被阻塞 + cls._tasks.pop(udid, None) + final_result_code = 500 + final_result_msg = "停止失败(线程卡死)" + + return final_result_code, final_result_msg @classmethod def _force_stop_locked(cls, udid: str): @@ -228,7 +578,8 @@ class ThreadManager: th.join(timeout=1.5) if th.is_alive(): for _ in range(6): - _async_raise(tid, SystemExit) + if tid: + _async_raise(tid, SystemExit) time.sleep(0.05) if not th.is_alive(): break @@ -254,53 +605,31 @@ class ThreadManager: } return o - # @classmethod - # def batch_stop(cls, ids: List[str]) -> Tuple[int, str]: - # failed = [] - # with ThreadPoolExecutor(max_workers=4) as executor: - # futures = {executor.submit(cls.stop, udid): udid for udid in ids} - # for future in as_completed(futures): - # udid = futures[future] - # try: - # code, msg = future.result() - # except Exception as e: - # LogManager.method_error(f"[{udid}] stop 调用异常: {e}", "task") - # failed.append(udid) - # continue - # if code != 200: - # failed.append(udid) - # if failed: - # return 207, f"部分任务停止失败: {failed}" - # return 200, "全部停止请求已提交" - @classmethod def batch_stop(cls, ids: List[str]) -> Tuple[int, str]: + """ + 批量停止任务。由于 stop 方法现在是同步阻塞的,此方法将等待所有线程完全停止后返回。 + """ failed = [] - results = [] + # 1. 并发发出所有停止请求 (现在是并发执行阻塞停止) with ThreadPoolExecutor(max_workers=4) as executor: futures = {executor.submit(cls.stop, udid): udid for udid in ids} for future in as_completed(futures): udid = futures[future] try: code, msg = future.result() - results.append((udid, code, msg)) + # 检查是否成功停止(状态码 200) + if code != 200: + failed.append(f"{udid} ({msg})") except Exception as e: LogManager.method_error(f"[{udid}] stop 调用异常: {e}", "task") - failed.append(udid) - continue - if code != 200: - failed.append(udid) - - # 等待所有线程完全停止 - for udid, code, msg in results: - if code == 200: - obj = cls._tasks.get(udid) - if obj: - thread = obj["thread"] - while thread.is_alive(): - time.sleep(0.1) + failed.append(f"{udid} (异常)") + # 2. 返回结果 if failed: - return 207, f"部分任务停止失败: {failed}" + # 返回 207 表示部分失败或全部失败 + return 207, f"部分任务停止失败: {', '.join(failed)}" + + # 返回 200 表示所有任务都已成功停止(因为 stop 方法是同步阻塞的) return 200, "全部任务已成功停止" \ No newline at end of file