临时提交
This commit is contained in:
@@ -1,16 +1,23 @@
|
||||
import ctypes
|
||||
import threading
|
||||
import time
|
||||
import os
|
||||
import signal
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from typing import Dict, Tuple, List
|
||||
from typing import Dict, Tuple, List, Optional
|
||||
|
||||
from Utils.LogManager import LogManager
|
||||
|
||||
try:
|
||||
import psutil # 可选:用来级联杀子进程
|
||||
except Exception:
|
||||
psutil = None
|
||||
|
||||
def _async_raise(tid: int, exc_type=KeyboardInterrupt) -> bool:
|
||||
"""向指定线程抛异常"""
|
||||
|
||||
def _async_raise(tid: int, exc_type=SystemExit) -> bool:
|
||||
"""向指定线程异步注入异常(仅对 Python 解释器栈可靠)"""
|
||||
if not tid:
|
||||
LogManager.method_error(f"强杀失败: 线程ID为空", "task")
|
||||
LogManager.method_error("强杀失败: 线程ID为空", "task")
|
||||
return False
|
||||
try:
|
||||
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
|
||||
@@ -30,15 +37,70 @@ def _async_raise(tid: int, exc_type=KeyboardInterrupt) -> bool:
|
||||
|
||||
|
||||
class ThreadManager:
|
||||
"""
|
||||
维持你的 add(udid, thread, event) 调用方式不变。
|
||||
- 线程统一设为 daemon
|
||||
- 停止:协作 -> 多次强杀注入 -> zombie 放弃占位
|
||||
- 可选:注册并级联杀掉业务里创建的外部子进程
|
||||
"""
|
||||
_tasks: Dict[str, Dict] = {}
|
||||
_lock = threading.RLock()
|
||||
|
||||
@classmethod
|
||||
def _cleanup_if_dead(cls, udid: str):
|
||||
obj = cls._tasks.get(udid)
|
||||
if obj and not obj["thread"].is_alive():
|
||||
cls._tasks.pop(udid, None)
|
||||
LogManager.method_info(f"检测到 [{udid}] 线程已结束,自动清理。", "task")
|
||||
if obj:
|
||||
th = obj.get("thread")
|
||||
if th and not th.is_alive():
|
||||
cls._tasks.pop(udid, None)
|
||||
LogManager.method_info(f"检测到 [{udid}] 线程已结束,自动清理。", "task")
|
||||
|
||||
@classmethod
|
||||
def register_child_pid(cls, udid: str, pid: int):
|
||||
"""业务里如果起了 adb/scrcpy/ffmpeg 等外部进程,请在启动后调用这个登记,便于 stop 时一起杀掉。"""
|
||||
with cls._lock:
|
||||
obj = cls._tasks.get(udid)
|
||||
if not obj:
|
||||
return
|
||||
pids: set = obj.setdefault("child_pids", set())
|
||||
pids.add(int(pid))
|
||||
LogManager.method_info(f"[{udid}] 记录子进程 PID={pid}", "task")
|
||||
|
||||
@classmethod
|
||||
def _kill_child_pids(cls, udid: str, child_pids: Optional[set]):
|
||||
if not child_pids:
|
||||
return
|
||||
for pid in list(child_pids):
|
||||
try:
|
||||
if psutil:
|
||||
if psutil.pid_exists(pid):
|
||||
proc = psutil.Process(pid)
|
||||
# 先温柔 terminate,再等 0.5 秒,仍活则 kill;并级联子进程
|
||||
for c in proc.children(recursive=True):
|
||||
try:
|
||||
c.terminate()
|
||||
except Exception:
|
||||
pass
|
||||
proc.terminate()
|
||||
gone, alive = psutil.wait_procs([proc], timeout=0.5)
|
||||
for a in alive:
|
||||
try:
|
||||
a.kill()
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
if os.name == "nt":
|
||||
os.system(f"taskkill /PID {pid} /T /F >NUL 2>&1")
|
||||
else:
|
||||
try:
|
||||
os.kill(pid, signal.SIGTERM)
|
||||
time.sleep(0.2)
|
||||
os.kill(pid, signal.SIGKILL)
|
||||
except Exception:
|
||||
pass
|
||||
LogManager.method_info(f"[{udid}] 已尝试结束子进程 PID={pid}", "task")
|
||||
except Exception as e:
|
||||
LogManager.method_error(f"[{udid}] 结束子进程 {pid} 异常: {e}", "task")
|
||||
|
||||
@classmethod
|
||||
def add(cls, udid: str, thread: threading.Thread, event: threading.Event, force: bool = False) -> Tuple[int, str]:
|
||||
@@ -51,19 +113,27 @@ class ThreadManager:
|
||||
LogManager.method_info(f"[{udid}] 检测到旧任务,尝试强制停止", "task")
|
||||
cls._force_stop_locked(udid)
|
||||
|
||||
# 强制守护线程,防止进程被挂死
|
||||
try:
|
||||
thread.daemon = True
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
thread.start()
|
||||
# 等待 ident 初始化
|
||||
for _ in range(10):
|
||||
# 等 ident 初始化
|
||||
for _ in range(20):
|
||||
if thread.ident:
|
||||
break
|
||||
time.sleep(0.05)
|
||||
time.sleep(0.02)
|
||||
|
||||
cls._tasks[udid] = {
|
||||
"id": thread.ident,
|
||||
"thread": thread,
|
||||
"event": event,
|
||||
"start_time": time.time(),
|
||||
"state": "running",
|
||||
"child_pids": set(),
|
||||
}
|
||||
LogManager.method_info(f"创建任务成功 [{udid}],线程ID={thread.ident}", "task")
|
||||
return 200, "创建成功"
|
||||
@@ -81,6 +151,7 @@ class ThreadManager:
|
||||
thread = obj["thread"]
|
||||
event = obj["event"]
|
||||
tid = obj["id"]
|
||||
child_pids = set(obj.get("child_pids") or [])
|
||||
|
||||
LogManager.method_info(f"请求停止 [{udid}] 线程ID={tid}", "task")
|
||||
|
||||
@@ -88,56 +159,101 @@ class ThreadManager:
|
||||
cls._tasks.pop(udid, None)
|
||||
return 200, "已结束"
|
||||
|
||||
# 协作式停止
|
||||
obj["state"] = "stopping"
|
||||
|
||||
# 先把 event 打开,给协作退出的机会
|
||||
try:
|
||||
event.set()
|
||||
except Exception as e:
|
||||
LogManager.method_error(f"[{udid}] 设置停止事件失败: {e}", "task")
|
||||
|
||||
def _wait_stop():
|
||||
# 先给 1 秒高频检查机会(很多 I/O 点会在这个窗口立刻感知到)
|
||||
# 高频窗口 1s(很多 I/O 点会在这个窗口立刻感知到)
|
||||
t0 = time.time()
|
||||
while time.time() - t0 < 1.0 and thread.is_alive():
|
||||
time.sleep(0.05)
|
||||
|
||||
# 再进入原有的 join 窗口
|
||||
thread.join(timeout=stop_timeout)
|
||||
if thread.is_alive():
|
||||
LogManager.method_info(f"[{udid}] 协作超时 -> 尝试强杀", "task")
|
||||
try:
|
||||
_async_raise(tid) # 兜底:依然保留你的策略
|
||||
except Exception as e:
|
||||
LogManager.method_error(f"[{udid}] 强杀触发失败: {e}", "task")
|
||||
thread.join(timeout=kill_timeout)
|
||||
# 子进程先收拾(避免后台外部程序继续卡死)
|
||||
cls._kill_child_pids(udid, child_pids)
|
||||
|
||||
if not thread.is_alive():
|
||||
LogManager.method_info(f"[{udid}] 停止成功", "task")
|
||||
else:
|
||||
LogManager.method_error(f"[{udid}] 停止失败(线程卡死),已清理占位", "task")
|
||||
# 正常 join 窗口
|
||||
if thread.is_alive():
|
||||
thread.join(timeout=stop_timeout)
|
||||
|
||||
# 仍活着 → 多次注入 SystemExit
|
||||
if thread.is_alive():
|
||||
LogManager.method_info(f"[{udid}] 协作超时 -> 尝试强杀注入", "task")
|
||||
for i in range(6):
|
||||
ok = _async_raise(tid, SystemExit)
|
||||
# 给解释器一些调度时间
|
||||
time.sleep(0.06)
|
||||
if not thread.is_alive():
|
||||
break
|
||||
|
||||
# 最后等待 kill_timeout
|
||||
if thread.is_alive():
|
||||
thread.join(timeout=kill_timeout)
|
||||
|
||||
with cls._lock:
|
||||
cls._tasks.pop(udid, None)
|
||||
if not thread.is_alive():
|
||||
LogManager.method_info(f"[{udid}] 停止成功", "task")
|
||||
cls._tasks.pop(udid, None)
|
||||
else:
|
||||
# 彻底杀不掉:标记 zombie、释放占位
|
||||
LogManager.method_error(f"[{udid}] 停止失败(线程卡死),标记为 zombie,释放占位", "task")
|
||||
obj = cls._tasks.get(udid)
|
||||
if obj:
|
||||
obj["state"] = "zombie"
|
||||
cls._tasks.pop(udid, None)
|
||||
|
||||
threading.Thread(target=_wait_stop, daemon=True).start()
|
||||
return 200, "停止请求已提交"
|
||||
|
||||
@classmethod
|
||||
def _force_stop_locked(cls, udid: str):
|
||||
"""持锁情况下的暴力停止(用于 add(force=True) 覆盖旧任务)"""
|
||||
obj = cls._tasks.get(udid)
|
||||
if not obj:
|
||||
return
|
||||
th = obj["thread"]
|
||||
tid = obj["id"]
|
||||
event = obj["event"]
|
||||
child_pids = set(obj.get("child_pids") or [])
|
||||
try:
|
||||
event = obj["event"]
|
||||
event.set()
|
||||
obj["thread"].join(timeout=2)
|
||||
if obj["thread"].is_alive():
|
||||
_async_raise(obj["id"])
|
||||
obj["thread"].join(timeout=1)
|
||||
try:
|
||||
event.set()
|
||||
except Exception:
|
||||
pass
|
||||
cls._kill_child_pids(udid, child_pids)
|
||||
th.join(timeout=1.5)
|
||||
if th.is_alive():
|
||||
for _ in range(6):
|
||||
_async_raise(tid, SystemExit)
|
||||
time.sleep(0.05)
|
||||
if not th.is_alive():
|
||||
break
|
||||
th.join(timeout=0.8)
|
||||
except Exception as e:
|
||||
LogManager.method_error(f"[{udid}] 强制停止失败: {e}", "task")
|
||||
finally:
|
||||
cls._tasks.pop(udid, None)
|
||||
|
||||
@classmethod
|
||||
def status(cls, udid: str) -> Dict:
|
||||
with cls._lock:
|
||||
obj = cls._tasks.get(udid)
|
||||
if not obj:
|
||||
return {"exists": False}
|
||||
o = {
|
||||
"exists": True,
|
||||
"state": obj.get("state"),
|
||||
"start_time": obj.get("start_time"),
|
||||
"thread_id": obj.get("id"),
|
||||
"alive": obj["thread"].is_alive(),
|
||||
"child_pids": list(obj.get("child_pids") or []),
|
||||
}
|
||||
return o
|
||||
|
||||
@classmethod
|
||||
def batch_stop(cls, ids: List[str]) -> Tuple[int, str]:
|
||||
failed = []
|
||||
|
||||
Reference in New Issue
Block a user