diff --git a/Module/__pycache__/FlaskService.cpython-312.pyc b/Module/__pycache__/FlaskService.cpython-312.pyc index 734e202..ac77a51 100644 Binary files a/Module/__pycache__/FlaskService.cpython-312.pyc and b/Module/__pycache__/FlaskService.cpython-312.pyc differ diff --git a/Utils/ThreadManager.py b/Utils/ThreadManager.py index ab4e561..ddd6804 100644 --- a/Utils/ThreadManager.py +++ b/Utils/ThreadManager.py @@ -1,635 +1,207 @@ -# import ctypes -# import threading -# import time -# import os -# import signal -# from concurrent.futures import ThreadPoolExecutor, as_completed -# from typing import Dict, Tuple, List, Optional -# -# from Utils.LogManager import LogManager -# -# try: -# import psutil # 可选:用来级联杀子进程 -# except Exception: -# psutil = None -# -# -# def _async_raise(tid: int, exc_type=SystemExit) -> bool: -# """向指定线程异步注入异常(仅对 Python 解释器栈可靠)""" -# if not tid: -# LogManager.method_error("强杀失败: 线程ID为空", "task") -# return False -# try: -# res = ctypes.pythonapi.PyThreadState_SetAsyncExc( -# ctypes.c_long(tid), ctypes.py_object(exc_type) -# ) -# if res == 0: -# LogManager.method_info(f"线程 {tid} 不存在", "task") -# return False -# elif res > 1: -# ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), 0) -# LogManager.method_info(f"线程 {tid} 命中多个线程,已回滚", "task") -# return False -# return True -# except Exception as e: -# LogManager.method_error(f"强杀线程失败: {e}", "task") -# return False -# -# -# class ThreadManager: -# """ -# 维持你的 add(udid, thread, event) 调用方式不变。 -# - 线程统一设为 daemon -# - 停止:协作 -> 多次强杀注入 -> zombie 放弃占位 -# - 可选:注册并级联杀掉业务里创建的外部子进程 -# """ -# _tasks: Dict[str, Dict] = {} -# _lock = threading.RLock() -# -# @classmethod -# def _cleanup_if_dead(cls, udid: str): -# obj = cls._tasks.get(udid) -# if obj: -# th = obj.get("thread") -# if th and not th.is_alive(): -# cls._tasks.pop(udid, None) -# LogManager.method_info(f"检测到 [{udid}] 线程已结束,自动清理。", "task") -# -# @classmethod -# def register_child_pid(cls, udid: str, pid: int): -# """业务里如果起了 adb/scrcpy/ffmpeg 等外部进程,请在启动后调用这个登记,便于 stop 时一起杀掉。""" -# with cls._lock: -# obj = cls._tasks.get(udid) -# if not obj: -# return -# pids: set = obj.setdefault("child_pids", set()) -# pids.add(int(pid)) -# LogManager.method_info(f"[{udid}] 记录子进程 PID={pid}", "task") -# -# @classmethod -# def _kill_child_pids(cls, udid: str, child_pids: Optional[set]): -# if not child_pids: -# return -# for pid in list(child_pids): -# try: -# if psutil: -# if psutil.pid_exists(pid): -# proc = psutil.Process(pid) -# # 先温柔 terminate,再等 0.5 秒,仍活则 kill;并级联子进程 -# for c in proc.children(recursive=True): -# try: -# c.terminate() -# except Exception: -# pass -# proc.terminate() -# gone, alive = psutil.wait_procs([proc], timeout=0.5) -# for a in alive: -# try: -# a.kill() -# except Exception: -# pass -# else: -# if os.name == "nt": -# os.system(f"taskkill /PID {pid} /T /F >NUL 2>&1") -# else: -# try: -# os.kill(pid, signal.SIGTERM) -# time.sleep(0.2) -# os.kill(pid, signal.SIGKILL) -# except Exception: -# pass -# LogManager.method_info(f"[{udid}] 已尝试结束子进程 PID={pid}", "task") -# except Exception as e: -# LogManager.method_error(f"[{udid}] 结束子进程 {pid} 异常: {e}", "task") -# -# @classmethod -# def add(cls, udid: str, thread: threading.Thread, event: threading.Event, force: bool = False) -> Tuple[int, str]: -# with cls._lock: -# cls._cleanup_if_dead(udid) -# old = cls._tasks.get(udid) -# if old and old["thread"].is_alive(): -# if not force: -# return 1001, "当前设备已存在任务" -# LogManager.method_info(f"[{udid}] 检测到旧任务,尝试强制停止", "task") -# cls._force_stop_locked(udid) -# -# # 强制守护线程,防止进程被挂死 -# try: -# thread.daemon = True -# except Exception: -# pass -# -# try: -# thread.start() -# # 等 ident 初始化 -# for _ in range(20): -# if thread.ident: -# break -# time.sleep(0.02) -# -# cls._tasks[udid] = { -# "id": thread.ident, -# "thread": thread, -# "event": event, -# "start_time": time.time(), -# "state": "running", -# "child_pids": set(), -# } -# LogManager.method_info(f"创建任务成功 [{udid}],线程ID={thread.ident}", "task") -# return 200, "创建成功" -# except Exception as e: -# LogManager.method_error(f"线程启动失败: {e}", "task") -# return 1002, f"线程启动失败: {e}" -# -# @classmethod -# def stop(cls, udid: str, stop_timeout: float = 5.0, kill_timeout: float = 2.0) -> Tuple[int, str]: -# with cls._lock: -# obj = cls._tasks.get(udid) -# if not obj: -# return 200, "任务不存在" -# -# thread = obj["thread"] -# event = obj["event"] -# tid = obj["id"] -# child_pids = set(obj.get("child_pids") or []) -# -# LogManager.method_info(f"请求停止 [{udid}] 线程ID={tid}", "task") -# -# if not thread.is_alive(): -# cls._tasks.pop(udid, None) -# return 200, "已结束" -# -# obj["state"] = "stopping" -# -# # 先把 event 打开,给协作退出的机会 -# try: -# event.set() -# except Exception as e: -# LogManager.method_error(f"[{udid}] 设置停止事件失败: {e}", "task") -# -# def _wait_stop(): -# # 高频窗口 1s(很多 I/O 点会在这个窗口立刻感知到) -# t0 = time.time() -# while time.time() - t0 < 1.0 and thread.is_alive(): -# time.sleep(0.05) -# -# # 子进程先收拾(避免后台外部程序继续卡死) -# cls._kill_child_pids(udid, child_pids) -# -# # 正常 join 窗口 -# if thread.is_alive(): -# thread.join(timeout=stop_timeout) -# -# # 仍活着 → 多次注入 SystemExit -# if thread.is_alive(): -# LogManager.method_info(f"[{udid}] 协作超时 -> 尝试强杀注入", "task") -# for i in range(6): -# ok = _async_raise(tid, SystemExit) -# # 给解释器一些调度时间 -# time.sleep(0.06) -# if not thread.is_alive(): -# break -# -# # 最后等待 kill_timeout -# if thread.is_alive(): -# thread.join(timeout=kill_timeout) -# -# with cls._lock: -# if not thread.is_alive(): -# LogManager.method_info(f"[{udid}] 停止成功", "task") -# cls._tasks.pop(udid, None) -# else: -# # 彻底杀不掉:标记 zombie、释放占位 -# LogManager.method_error(f"[{udid}] 停止失败(线程卡死),标记为 zombie,释放占位", "task") -# obj = cls._tasks.get(udid) -# if obj: -# obj["state"] = "zombie" -# cls._tasks.pop(udid, None) -# -# threading.Thread(target=_wait_stop, daemon=True).start() -# return 200, "停止请求已提交" -# -# @classmethod -# def _force_stop_locked(cls, udid: str): -# """持锁情况下的暴力停止(用于 add(force=True) 覆盖旧任务)""" -# obj = cls._tasks.get(udid) -# if not obj: -# return -# th = obj["thread"] -# tid = obj["id"] -# event = obj["event"] -# child_pids = set(obj.get("child_pids") or []) -# try: -# try: -# event.set() -# except Exception: -# pass -# cls._kill_child_pids(udid, child_pids) -# th.join(timeout=1.5) -# if th.is_alive(): -# for _ in range(6): -# _async_raise(tid, SystemExit) -# time.sleep(0.05) -# if not th.is_alive(): -# break -# th.join(timeout=0.8) -# except Exception as e: -# LogManager.method_error(f"[{udid}] 强制停止失败: {e}", "task") -# finally: -# cls._tasks.pop(udid, None) -# -# @classmethod -# def status(cls, udid: str) -> Dict: -# with cls._lock: -# obj = cls._tasks.get(udid) -# if not obj: -# return {"exists": False} -# o = { -# "exists": True, -# "state": obj.get("state"), -# "start_time": obj.get("start_time"), -# "thread_id": obj.get("id"), -# "alive": obj["thread"].is_alive(), -# "child_pids": list(obj.get("child_pids") or []), -# } -# return o -# -# # @classmethod -# # def batch_stop(cls, ids: List[str]) -> Tuple[int, str]: -# # failed = [] -# # with ThreadPoolExecutor(max_workers=4) as executor: -# # futures = {executor.submit(cls.stop, udid): udid for udid in ids} -# # for future in as_completed(futures): -# # udid = futures[future] -# # try: -# # code, msg = future.result() -# # except Exception as e: -# # LogManager.method_error(f"[{udid}] stop 调用异常: {e}", "task") -# # failed.append(udid) -# # continue -# # if code != 200: -# # failed.append(udid) -# # if failed: -# # return 207, f"部分任务停止失败: {failed}" -# # return 200, "全部停止请求已提交" -# -# @classmethod -# def batch_stop(cls, ids: List[str]) -> Tuple[int, str]: -# failed = [] -# results = [] -# -# with ThreadPoolExecutor(max_workers=4) as executor: -# futures = {executor.submit(cls.stop, udid): udid for udid in ids} -# for future in as_completed(futures): -# udid = futures[future] -# try: -# code, msg = future.result() -# results.append((udid, code, msg)) -# except Exception as e: -# LogManager.method_error(f"[{udid}] stop 调用异常: {e}", "task") -# failed.append(udid) -# continue -# if code != 200: -# failed.append(udid) -# -# # 等待所有线程完全停止 -# for udid, code, msg in results: -# if code == 200: -# obj = cls._tasks.get(udid) -# if obj: -# thread = obj["thread"] -# while thread.is_alive(): -# time.sleep(0.1) -# -# if failed: -# return 207, f"部分任务停止失败: {failed}" -# return 200, "全部任务已成功停止" +# -*- coding: utf-8 -*- - -import ctypes import threading +import ctypes +import inspect import time -import os -import signal -from concurrent.futures import ThreadPoolExecutor, as_completed -from typing import Dict, Tuple, List, Optional +from typing import Dict, Optional, List, Tuple, Any + +from Utils.LogManager import LogManager -# 假设 LogManager 存在 -class MockLogManager: - @staticmethod - def method_error(msg, category): - print(f"[ERROR:{category}] {msg}") - - @staticmethod - def method_info(msg, category): - print(f"[INFO:{category}] {msg}") +def _raise_async_exception(tid: int, exc_type) -> int: + if not inspect.isclass(exc_type): + raise TypeError("exc_type must be a class") + return ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_long(tid), ctypes.py_object(exc_type) + ) -LogManager = MockLogManager -# from Utils.LogManager import LogManager # 恢复实际导入 - -try: - import psutil # 可选:用来级联杀子进程 -except Exception: - psutil = None - - -def _async_raise(tid: int, exc_type=SystemExit) -> bool: - """ - 向指定线程异步注入异常。 - 注意:此方法在线程阻塞于C/OS调用(如I/O等待)时可能无效或延迟。 - """ - if not tid: - LogManager.method_error("强杀失败: 线程ID为空", "task") +def _kill_thread_by_tid(tid: Optional[int]) -> bool: + if tid is None: return False - try: - res = ctypes.pythonapi.PyThreadState_SetAsyncExc( - ctypes.c_long(tid), ctypes.py_object(exc_type) - ) - if res == 0: - # 线程可能已经退出 - return False - elif res > 1: - # 命中多个线程,非常罕见,回滚以防误杀 - ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), 0) - LogManager.method_info(f"线程 {tid} 命中多个线程,已回滚", "task") - return False - return True - except Exception as e: - LogManager.method_error(f"强杀线程失败: {e}", "task") + res = _raise_async_exception(tid, SystemExit) + if res == 0: return False + if res > 1: + _raise_async_exception(tid, None) + raise SystemError("PyThreadState_SetAsyncExc affected multiple threads; reverted.") + return True class ThreadManager: """ - 线程管理类:支持协作停止、强制注入SystemExit和级联杀死外部子进程。 - 注意:stop 方法已改为同步阻塞,直到线程真正停止或被标记为zombie。 + - add(udid, thread_or_target, *args, **kwargs) -> (code, msg) + - stop(udid, join_timeout=2.0, retries=5, wait_step=0.2) -> (code, msg) # 强杀 + - batch_stop(udids, join_timeout_each=2.0, retries_each=5, wait_step_each=0.2) -> (code, msg) + - get_thread / get_tid / is_running / list_udids """ - _tasks: Dict[str, Dict] = {} + _threads: Dict[str, threading.Thread] = {} _lock = threading.RLock() + # ========== 基础 ========== @classmethod - def _cleanup_if_dead(cls, udid: str): - obj = cls._tasks.get(udid) - if obj: - th = obj.get("thread") - if th and not th.is_alive(): - cls._tasks.pop(udid, None) - LogManager.method_info(f"检测到 [{udid}] 线程已结束,自动清理。", "task") - - @classmethod - def register_child_pid(cls, udid: str, pid: int): - """登记外部子进程 PID,便于 stop 时一起杀掉。""" - with cls._lock: - obj = cls._tasks.get(udid) - if not obj: - return - pids: set = obj.setdefault("child_pids", set()) - # 确保 pid 是 int 类型 - pids.add(int(pid)) - LogManager.method_info(f"[{udid}] 记录子进程 PID={pid}", "task") - - @classmethod - def _kill_child_pids(cls, udid: str, child_pids: Optional[set]): - """终止所有已登记的外部子进程及其子进程(重要:用于解决 I/O 阻塞)。""" - if not child_pids: - return - # 创建一个副本,防止迭代过程中集合被修改 - for pid in list(child_pids): - try: - if psutil: - if psutil.pid_exists(pid): - proc = psutil.Process(pid) - # 级联终止所有后代进程 - for c in proc.children(recursive=True): - try: - c.terminate() - except Exception: - pass - # 先温柔 terminate - proc.terminate() - gone, alive = psutil.wait_procs([proc], timeout=0.5) - # 仍活则 kill - for a in alive: - try: - a.kill() - except Exception: - pass - else: - # 无 psutil 时的系统命令兜底 - if os.name == "nt": - # /T 级联杀死子进程 /F 强制 /NUL 隐藏输出 - os.system(f"taskkill /PID {pid} /T /F >NUL 2>&1") - else: - try: - os.kill(pid, signal.SIGTERM) - time.sleep(0.2) - os.kill(pid, signal.SIGKILL) - except Exception: - pass - LogManager.method_info(f"[{udid}] 已尝试结束子进程 PID={pid}", "task") - except Exception as e: - LogManager.method_error(f"[{udid}] 结束子进程 {pid} 异常: {e}", "task") - - # 在同步停止模式下,这里不主动清理 set。 - - @classmethod - def add(cls, udid: str, thread: threading.Thread, event: threading.Event, force: bool = False) -> Tuple[int, str]: - with cls._lock: - cls._cleanup_if_dead(udid) - old = cls._tasks.get(udid) - if old and old["thread"].is_alive(): - if not force: - return 1001, "当前设备已存在任务" - LogManager.method_info(f"[{udid}] 检测到旧任务,尝试强制停止", "task") - cls._force_stop_locked(udid) - - # 强制守护线程,防止进程被挂死 - try: - thread.daemon = True - except Exception: - pass - - try: - thread.start() - # 等 ident 初始化 - for _ in range(20): - if thread.ident: - break - time.sleep(0.02) - - # 获取线程 ID - tid = thread.ident - - cls._tasks[udid] = { - "id": tid, - "thread": thread, - "event": event, - "start_time": time.time(), - "state": "running", - "child_pids": set(), - } - LogManager.method_info(f"创建任务成功 [{udid}],线程ID={tid}", "task") - return 200, "创建成功" - except Exception as e: - LogManager.method_error(f"线程启动失败: {e}", "task") - return 1002, f"线程启动失败: {e}" - - @classmethod - def stop(cls, udid: str, stop_timeout: float = 5.0, kill_timeout: float = 2.0) -> Tuple[int, str]: - """同步阻塞请求停止任务,直到线程真正停止或被标记为zombie。""" - # 1. 初始检查、状态设置和事件触发 (需要锁) - with cls._lock: - obj = cls._tasks.get(udid) - if not obj: - return 200, "任务不存在" - - thread = obj["thread"] - event = obj["event"] - tid = obj["id"] - # 拷贝 child_pids,以便在释放锁后使用 - child_pids = set(obj.get("child_pids") or []) - - LogManager.method_info(f"请求停止 [{udid}] 线程ID={tid}", "task") - - if not thread.is_alive(): - cls._tasks.pop(udid, None) - return 200, "已结束" - - obj["state"] = "stopping" - - # 先把 event 打开,给协作退出的机会 - try: - event.set() - except Exception as e: - LogManager.method_error(f"[{udid}] 设置停止事件失败: {e}", "task") - - # 锁已释放。以下执行耗时的阻塞操作。 - # ----------------- 阻塞停止逻辑开始 ----------------- - - # 2. 预等待窗口 1s - t0 = time.time() - while time.time() - t0 < 1.0 and thread.is_alive(): - time.sleep(0.05) - - # 3. 子进程先收拾 (优先解决 I/O 阻塞) - cls._kill_child_pids(udid, child_pids) - - # 4. 正常 join 窗口 - if thread.is_alive(): - thread.join(timeout=stop_timeout) - - # 5. 仍活着 → 多次注入 SystemExit - if thread.is_alive(): - LogManager.method_info(f"[{udid}] 协作超时 -> 尝试强杀注入", "task") - for i in range(6): - # 确保 tid 存在 - if tid: - _async_raise(tid, SystemExit) - time.sleep(0.06) - if not thread.is_alive(): - break - - # 6. 最后等待 kill_timeout - if thread.is_alive(): - thread.join(timeout=kill_timeout) - - # ----------------- 阻塞停止逻辑结束 ----------------- - # 7. 清理和返回结果 (需要重新加锁) - final_result_code: int = 500 - final_result_msg: str = "停止失败(线程卡死)" - - with cls._lock: - if not thread.is_alive(): - LogManager.method_info(f"[{udid}] 停止成功", "task") - cls._tasks.pop(udid, None) - final_result_code = 200 - final_result_msg = "停止成功" - else: - # 彻底杀不掉:标记 zombie、释放占位 - LogManager.method_error(f"[{udid}] 停止失败(线程卡死),标记为 zombie,释放占位", "task") - obj = cls._tasks.get(udid) - if obj: - obj["state"] = "zombie" - # 即使卡死,也移除任务记录,防止后续操作被阻塞 - cls._tasks.pop(udid, None) - final_result_code = 500 - final_result_msg = "停止失败(线程卡死)" - - return final_result_code, final_result_msg - - @classmethod - def _force_stop_locked(cls, udid: str): - """持锁情况下的暴力停止(用于 add(force=True) 覆盖旧任务)""" - obj = cls._tasks.get(udid) - if not obj: - return - th = obj["thread"] - tid = obj["id"] - event = obj["event"] - child_pids = set(obj.get("child_pids") or []) - try: - try: - event.set() - except Exception: - pass - cls._kill_child_pids(udid, child_pids) - th.join(timeout=1.5) - if th.is_alive(): - for _ in range(6): - if tid: - _async_raise(tid, SystemExit) - time.sleep(0.05) - if not th.is_alive(): - break - th.join(timeout=0.8) - except Exception as e: - LogManager.method_error(f"[{udid}] 强制停止失败: {e}", "task") - finally: - cls._tasks.pop(udid, None) - - @classmethod - def status(cls, udid: str) -> Dict: - with cls._lock: - obj = cls._tasks.get(udid) - if not obj: - return {"exists": False} - o = { - "exists": True, - "state": obj.get("state"), - "start_time": obj.get("start_time"), - "thread_id": obj.get("id"), - "alive": obj["thread"].is_alive(), - "child_pids": list(obj.get("child_pids") or []), - } - return o - - @classmethod - def batch_stop(cls, ids: List[str]) -> Tuple[int, str]: + def add(cls, udid: str, thread_or_target: Any, *args, **kwargs) -> Tuple[int, str]: """ - 批量停止任务。由于 stop 方法现在是同步阻塞的,此方法将等待所有线程完全停止后返回。 + 兼容两种用法: + 1) add(udid, t) # t 是 threading.Thread 实例 + 2) add(udid, target, *args, **kwargs) # target 是可调用 + 返回:(200, "创建任务成功") / (1001, "任务已存在") / (1001, "创建任务失败") """ - failed = [] + with cls._lock: + exist = cls._threads.get(udid) + if exist and exist.is_alive(): + return 1001, "任务已存在" - # 1. 并发发出所有停止请求 (现在是并发执行阻塞停止) - with ThreadPoolExecutor(max_workers=4) as executor: - futures = {executor.submit(cls.stop, udid): udid for udid in ids} - for future in as_completed(futures): - udid = futures[future] + if isinstance(thread_or_target, threading.Thread): + t = thread_or_target try: - code, msg = future.result() - # 检查是否成功停止(状态码 200) - if code != 200: - failed.append(f"{udid} ({msg})") - except Exception as e: - LogManager.method_error(f"[{udid}] stop 调用异常: {e}", "task") - failed.append(f"{udid} (异常)") + t.daemon = True + except Exception: + pass + if not t.name: + t.name = f"task-{udid}" - # 2. 返回结果 - if failed: - # 返回 207 表示部分失败或全部失败 - return 207, f"部分任务停止失败: {', '.join(failed)}" + # 包装 run,退出时从表移除 + orig_run = t.run + def run_wrapper(): + try: + orig_run() + finally: + with cls._lock: + if cls._threads.get(udid) is t: + cls._threads.pop(udid, None) + t.run = run_wrapper # type: ignore - # 返回 200 表示所有任务都已成功停止(因为 stop 方法是同步阻塞的) - return 200, "全部任务已成功停止" \ No newline at end of file + else: + target = thread_or_target + def _wrapper(): + try: + target(*args, **kwargs) + finally: + with cls._lock: + cur = cls._threads.get(udid) + if cur is threading.current_thread(): + cls._threads.pop(udid, None) + t = threading.Thread(target=_wrapper, daemon=True, name=f"task-{udid}") + + try: + t.start() + except Exception: + return 1001, "创建任务失败" + + cls._threads[udid] = t + # 保留你原有的创建成功日志 + try: + LogManager.method_info(f"创建任务成功 [{udid}],线程ID={t.ident}", "task") + except Exception: + pass + return 200, "创建任务成功" + + @classmethod + def get_thread(cls, udid: str) -> Optional[threading.Thread]: + with cls._lock: + return cls._threads.get(udid) + + @classmethod + def get_tid(cls, udid: str) -> Optional[int]: + t = cls.get_thread(udid) + return t.ident if t else None + + @classmethod + def is_running(cls, udid: str) -> bool: + t = cls.get_thread(udid) + return bool(t and t.is_alive()) + + @classmethod + def list_udids(cls) -> List[str]: + with cls._lock: + return list(cls._threads.keys()) + + # ========== 内部:强杀一次 ========== + + @classmethod + def _stop_once(cls, udid: str, join_timeout: float, retries: int, wait_step: float) -> bool: + """ + 对指定 udid 执行一次强杀流程;返回 True=已停止/不存在,False=仍存活或被拒。 + """ + with cls._lock: + t = cls._threads.get(udid) + + if not t: + return True # 视为已停止 + + main_tid = threading.main_thread().ident + cur_tid = threading.get_ident() + if t.ident in (main_tid, cur_tid): + return False + + try: + _kill_thread_by_tid(t.ident) + except Exception: + pass + + if join_timeout < 0: + join_timeout = 0.0 + t.join(join_timeout) + + while t.is_alive() and retries > 0: + evt = threading.Event() + evt.wait(wait_step) + retries -= 1 + + dead = not t.is_alive() + if dead: + with cls._lock: + if cls._threads.get(udid) is t: + cls._threads.pop(udid, None) + return dead + + # ========== 对外:stop / batch_stop(均返回二元组) ========== + + @classmethod + def stop(cls, udid: str, join_timeout: float = 2.0, + retries: int = 5, wait_step: float = 0.2) -> Tuple[int, str]: + """ + 强杀单个:返回 (200, "stopped") 或 (1001, "failed") + """ + ok = cls._stop_once(udid, join_timeout, retries, wait_step) + if ok: + return 200, "stopped" + else: + return 1001, "failed" + + @classmethod + def batch_stop(cls, udids: List[str], join_timeout_each: float = 2.0, + retries_each: int = 5, wait_step_each: float = 0.2) -> Tuple[int, str]: + """ + 先全量执行一遍 -> 记录失败 -> 对失败重试 3 轮(每轮间隔 1 秒) + 全部完成后统一返回: + 全成功 : (200, "停止任务成功") + 仍有失败: (1001, "停止任务失败") + """ + udids = udids or [] + fail: List[str] = [] + + # 第一轮 + for u in udids: + ok = cls._stop_once(u, join_timeout_each, retries_each, wait_step_each) + if not ok: + fail.append(u) + + # 三轮只对失败重试 + for _ in range(3): + if not fail: + break + time.sleep(1.0) + remain: List[str] = [] + for u in fail: + ok = cls._stop_once(u, join_timeout_each, retries_each, wait_step_each) + if not ok: + remain.append(u) + fail = remain + + if not fail: + return 200, "停止任务成功" + else: + return 1001, "停止任务失败" \ No newline at end of file diff --git a/Utils/__pycache__/LogManager.cpython-312.pyc b/Utils/__pycache__/LogManager.cpython-312.pyc index c4851fc..89e42fe 100644 Binary files a/Utils/__pycache__/LogManager.cpython-312.pyc and b/Utils/__pycache__/LogManager.cpython-312.pyc differ diff --git a/Utils/__pycache__/ThreadManager.cpython-312.pyc b/Utils/__pycache__/ThreadManager.cpython-312.pyc index 1bc0c8c..b13febb 100644 Binary files a/Utils/__pycache__/ThreadManager.cpython-312.pyc and b/Utils/__pycache__/ThreadManager.cpython-312.pyc differ diff --git a/script/ScriptManager.py b/script/ScriptManager.py index bf9f605..ef1241d 100644 --- a/script/ScriptManager.py +++ b/script/ScriptManager.py @@ -23,15 +23,15 @@ from Utils.TencentOCRUtils import TencentOCR # 脚本管理类 class ScriptManager(): - # 单利对象 - _instance = None # 类变量,用于存储单例实例 - - def __new__(cls): - # 如果实例不存在,则创建一个新实例 - if cls._instance is None: - cls._instance = super(ScriptManager, cls).__new__(cls) - # 返回已存在的实例 - return cls._instance + # # 单利对象 + # _instance = None # 类变量,用于存储单例实例 + # + # def __new__(cls): + # # 如果实例不存在,则创建一个新实例 + # if cls._instance is None: + # cls._instance = super(ScriptManager, cls).__new__(cls) + # # 返回已存在的实例 + # return cls._instance def __init__(self): super().__init__() diff --git a/script/__pycache__/ScriptManager.cpython-312.pyc b/script/__pycache__/ScriptManager.cpython-312.pyc index 964aef0..04de84f 100644 Binary files a/script/__pycache__/ScriptManager.cpython-312.pyc and b/script/__pycache__/ScriptManager.cpython-312.pyc differ