Files
iOSAI/Utils/ThreadManager.py

306 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import ctypes
import threading
import time
import os
import signal
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, Tuple, List, Optional
from Utils.LogManager import LogManager
try:
import psutil # 可选:用来级联杀子进程
except Exception:
psutil = None
def _async_raise(tid: int, exc_type=SystemExit) -> bool:
"""向指定线程异步注入异常(仅对 Python 解释器栈可靠)"""
if not tid:
LogManager.method_error("强杀失败: 线程ID为空", "task")
return False
try:
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(tid), ctypes.py_object(exc_type)
)
if res == 0:
LogManager.method_info(f"线程 {tid} 不存在", "task")
return False
elif res > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), 0)
LogManager.method_info(f"线程 {tid} 命中多个线程,已回滚", "task")
return False
return True
except Exception as e:
LogManager.method_error(f"强杀线程失败: {e}", "task")
return False
class ThreadManager:
"""
维持你的 add(udid, thread, event) 调用方式不变。
- 线程统一设为 daemon
- 停止:协作 -> 多次强杀注入 -> zombie 放弃占位
- 可选:注册并级联杀掉业务里创建的外部子进程
"""
_tasks: Dict[str, Dict] = {}
_lock = threading.RLock()
@classmethod
def _cleanup_if_dead(cls, udid: str):
obj = cls._tasks.get(udid)
if obj:
th = obj.get("thread")
if th and not th.is_alive():
cls._tasks.pop(udid, None)
LogManager.method_info(f"检测到 [{udid}] 线程已结束,自动清理。", "task")
@classmethod
def register_child_pid(cls, udid: str, pid: int):
"""业务里如果起了 adb/scrcpy/ffmpeg 等外部进程,请在启动后调用这个登记,便于 stop 时一起杀掉。"""
with cls._lock:
obj = cls._tasks.get(udid)
if not obj:
return
pids: set = obj.setdefault("child_pids", set())
pids.add(int(pid))
LogManager.method_info(f"[{udid}] 记录子进程 PID={pid}", "task")
@classmethod
def _kill_child_pids(cls, udid: str, child_pids: Optional[set]):
if not child_pids:
return
for pid in list(child_pids):
try:
if psutil:
if psutil.pid_exists(pid):
proc = psutil.Process(pid)
# 先温柔 terminate再等 0.5 秒,仍活则 kill并级联子进程
for c in proc.children(recursive=True):
try:
c.terminate()
except Exception:
pass
proc.terminate()
gone, alive = psutil.wait_procs([proc], timeout=0.5)
for a in alive:
try:
a.kill()
except Exception:
pass
else:
if os.name == "nt":
os.system(f"taskkill /PID {pid} /T /F >NUL 2>&1")
else:
try:
os.kill(pid, signal.SIGTERM)
time.sleep(0.2)
os.kill(pid, signal.SIGKILL)
except Exception:
pass
LogManager.method_info(f"[{udid}] 已尝试结束子进程 PID={pid}", "task")
except Exception as e:
LogManager.method_error(f"[{udid}] 结束子进程 {pid} 异常: {e}", "task")
@classmethod
def add(cls, udid: str, thread: threading.Thread, event: threading.Event, force: bool = False) -> Tuple[int, str]:
with cls._lock:
cls._cleanup_if_dead(udid)
old = cls._tasks.get(udid)
if old and old["thread"].is_alive():
if not force:
return 1001, "当前设备已存在任务"
LogManager.method_info(f"[{udid}] 检测到旧任务,尝试强制停止", "task")
cls._force_stop_locked(udid)
# 强制守护线程,防止进程被挂死
try:
thread.daemon = True
except Exception:
pass
try:
thread.start()
# 等 ident 初始化
for _ in range(20):
if thread.ident:
break
time.sleep(0.02)
cls._tasks[udid] = {
"id": thread.ident,
"thread": thread,
"event": event,
"start_time": time.time(),
"state": "running",
"child_pids": set(),
}
LogManager.method_info(f"创建任务成功 [{udid}]线程ID={thread.ident}", "task")
return 200, "创建成功"
except Exception as e:
LogManager.method_error(f"线程启动失败: {e}", "task")
return 1002, f"线程启动失败: {e}"
@classmethod
def stop(cls, udid: str, stop_timeout: float = 5.0, kill_timeout: float = 2.0) -> Tuple[int, str]:
with cls._lock:
obj = cls._tasks.get(udid)
if not obj:
return 200, "任务不存在"
thread = obj["thread"]
event = obj["event"]
tid = obj["id"]
child_pids = set(obj.get("child_pids") or [])
LogManager.method_info(f"请求停止 [{udid}] 线程ID={tid}", "task")
if not thread.is_alive():
cls._tasks.pop(udid, None)
return 200, "已结束"
obj["state"] = "stopping"
# 先把 event 打开,给协作退出的机会
try:
event.set()
except Exception as e:
LogManager.method_error(f"[{udid}] 设置停止事件失败: {e}", "task")
def _wait_stop():
# 高频窗口 1s很多 I/O 点会在这个窗口立刻感知到)
t0 = time.time()
while time.time() - t0 < 1.0 and thread.is_alive():
time.sleep(0.05)
# 子进程先收拾(避免后台外部程序继续卡死)
cls._kill_child_pids(udid, child_pids)
# 正常 join 窗口
if thread.is_alive():
thread.join(timeout=stop_timeout)
# 仍活着 → 多次注入 SystemExit
if thread.is_alive():
LogManager.method_info(f"[{udid}] 协作超时 -> 尝试强杀注入", "task")
for i in range(6):
ok = _async_raise(tid, SystemExit)
# 给解释器一些调度时间
time.sleep(0.06)
if not thread.is_alive():
break
# 最后等待 kill_timeout
if thread.is_alive():
thread.join(timeout=kill_timeout)
with cls._lock:
if not thread.is_alive():
LogManager.method_info(f"[{udid}] 停止成功", "task")
cls._tasks.pop(udid, None)
else:
# 彻底杀不掉:标记 zombie、释放占位
LogManager.method_error(f"[{udid}] 停止失败(线程卡死),标记为 zombie释放占位", "task")
obj = cls._tasks.get(udid)
if obj:
obj["state"] = "zombie"
cls._tasks.pop(udid, None)
threading.Thread(target=_wait_stop, daemon=True).start()
return 200, "停止请求已提交"
@classmethod
def _force_stop_locked(cls, udid: str):
"""持锁情况下的暴力停止(用于 add(force=True) 覆盖旧任务)"""
obj = cls._tasks.get(udid)
if not obj:
return
th = obj["thread"]
tid = obj["id"]
event = obj["event"]
child_pids = set(obj.get("child_pids") or [])
try:
try:
event.set()
except Exception:
pass
cls._kill_child_pids(udid, child_pids)
th.join(timeout=1.5)
if th.is_alive():
for _ in range(6):
_async_raise(tid, SystemExit)
time.sleep(0.05)
if not th.is_alive():
break
th.join(timeout=0.8)
except Exception as e:
LogManager.method_error(f"[{udid}] 强制停止失败: {e}", "task")
finally:
cls._tasks.pop(udid, None)
@classmethod
def status(cls, udid: str) -> Dict:
with cls._lock:
obj = cls._tasks.get(udid)
if not obj:
return {"exists": False}
o = {
"exists": True,
"state": obj.get("state"),
"start_time": obj.get("start_time"),
"thread_id": obj.get("id"),
"alive": obj["thread"].is_alive(),
"child_pids": list(obj.get("child_pids") or []),
}
return o
# @classmethod
# def batch_stop(cls, ids: List[str]) -> Tuple[int, str]:
# failed = []
# with ThreadPoolExecutor(max_workers=4) as executor:
# futures = {executor.submit(cls.stop, udid): udid for udid in ids}
# for future in as_completed(futures):
# udid = futures[future]
# try:
# code, msg = future.result()
# except Exception as e:
# LogManager.method_error(f"[{udid}] stop 调用异常: {e}", "task")
# failed.append(udid)
# continue
# if code != 200:
# failed.append(udid)
# if failed:
# return 207, f"部分任务停止失败: {failed}"
# return 200, "全部停止请求已提交"
@classmethod
def batch_stop(cls, ids: List[str]) -> Tuple[int, str]:
failed = []
results = []
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(cls.stop, udid): udid for udid in ids}
for future in as_completed(futures):
udid = futures[future]
try:
code, msg = future.result()
results.append((udid, code, msg))
except Exception as e:
LogManager.method_error(f"[{udid}] stop 调用异常: {e}", "task")
failed.append(udid)
continue
if code != 200:
failed.append(udid)
# 等待所有线程完全停止
for udid, code, msg in results:
if code == 200:
obj = cls._tasks.get(udid)
if obj:
thread = obj["thread"]
while thread.is_alive():
time.sleep(0.1)
if failed:
return 207, f"部分任务停止失败: {failed}"
return 200, "全部任务已成功停止"