Files
iOSAI/Module/FlaskSubprocessManager.py
2025-11-05 17:07:51 +08:00

275 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import subprocess
import sys
import threading
import atexit
import json
import os
import socket
import time
from pathlib import Path
from typing import Optional, Union, Dict, List
import psutil
from Utils.LogManager import LogManager
class FlaskSubprocessManager:
"""Flask 子进程守护 + 看门狗 + 稳定增强"""
_instance: Optional['FlaskSubprocessManager'] = None
_lock = threading.Lock()
def __new__(cls):
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._init_manager()
return cls._instance
def _init_manager(self):
self.process: Optional[subprocess.Popen] = None
self.comm_port = 34566
self._stop_event = threading.Event()
self._monitor_thread: Optional[threading.Thread] = None
# 看门狗参数
self._FAIL_THRESHOLD = int(os.getenv("FLASK_WD_FAIL_THRESHOLD", "3")) # 连续失败多少次重启
self._COOLDOWN_SEC = float(os.getenv("FLASK_WD_COOLDOWN", "8.0")) # 两次重启间隔
self._MAX_RESTARTS = int(os.getenv("FLASK_WD_MAX_RESTARTS", "5")) # 10分钟最多几次重启
self._RESTART_WINDOW = 600 # 10分钟
self._restart_times: List[float] = []
self._fail_count = 0
self._last_restart_time = 0.0
# Windows 隐藏子窗口启动参数
self._si = None
if os.name == "nt":
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
si.wShowWindow = 0
self._si = si
self._kill_orphan_flask()
atexit.register(self.stop)
self._log("info", "FlaskSubprocessManager 初始化完成")
# ========= 日志工具 =========
def _log(self, level: str, msg: str, udid="system"):
"""同时写 LogManager + 控制台"""
try:
if level == "info":
LogManager.info(msg, udid=udid)
elif level in ("warn", "warning"):
LogManager.warning(msg, udid=udid)
elif level == "error":
LogManager.error(msg, udid=udid)
else:
LogManager.info(msg, udid=udid)
except Exception:
pass
print(msg)
# ========= 杀残留 Flask =========
def _kill_orphan_flask(self):
try:
if os.name == "nt":
out = subprocess.check_output(["netstat", "-ano"], text=True, startupinfo=self._si)
for line in out.splitlines():
if f"127.0.0.1:{self.comm_port}" in line and "LISTENING" in line:
pid = int(line.strip().split()[-1])
if pid != os.getpid():
subprocess.run(["taskkill", "/F", "/PID", str(pid)],
startupinfo=self._si, capture_output=True)
self._log("warn", f"[FlaskMgr] 杀死残留进程 PID={pid}")
else:
out = subprocess.check_output(["lsof", "-t", f"-iTCP:{self.comm_port}", "-sTCP:LISTEN"], text=True)
for pid in map(int, out.split()):
if pid != os.getpid():
os.kill(pid, 9)
self._log("warn", f"[FlaskMgr] 杀死残留进程 PID={pid}")
except Exception:
pass
# ========= 启动 =========
def start(self):
with self._lock:
if self._is_alive():
self._log("warn", "[FlaskMgr] 子进程已在运行,无需重复启动")
return
env = os.environ.copy()
env["FLASK_COMM_PORT"] = str(self.comm_port)
exe_path = Path(sys.executable).resolve()
if exe_path.name.lower() in ("python.exe", "pythonw.exe"):
exe_path = Path(sys.argv[0]).resolve()
is_frozen = exe_path.suffix.lower() == ".exe" and exe_path.exists()
if is_frozen:
cmd = [str(exe_path), "--role=flask"]
cwd = str(exe_path.parent)
else:
project_root = Path(__file__).resolve().parents[1]
candidates = [
project_root / "Module" / "Main.py",
project_root / "Main.py",
]
main_path = next((p for p in candidates if p.is_file()), None)
if main_path:
cmd = [sys.executable, "-u", str(main_path), "--role=flask"]
else:
cmd = [sys.executable, "-u", "-m", "Module.Main", "--role=flask"]
cwd = str(project_root)
self._log("info", f"[FlaskMgr] 启动命令: {cmd}, cwd={cwd}")
self.process = subprocess.Popen(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8",
errors="replace",
bufsize=1,
env=env,
cwd=cwd,
start_new_session=True,
startupinfo=self._si
)
threading.Thread(target=self._flush_stdout, daemon=True).start()
self._log("info", f"[FlaskMgr] Flask 子进程已启动PID={self.process.pid}")
if not self._wait_port_open(timeout=10):
self._log("error", "[FlaskMgr] 启动失败,端口未监听")
self.stop()
raise RuntimeError("Flask 启动后 10s 内未监听端口")
if not self._monitor_thread or not self._monitor_thread.is_alive():
self._monitor_thread = threading.Thread(target=self._monitor, daemon=True)
self._monitor_thread.start()
self._log("info", "[FlaskWD] 守护线程已启动")
# ========= stdout捕获 =========
def _flush_stdout(self):
if not self.process or not self.process.stdout:
return
for line in iter(self.process.stdout.readline, ""):
if line:
self._log("info", line.rstrip())
self.process.stdout.close()
# ========= 发送 =========
def send(self, data: Union[str, Dict, List]) -> bool:
if isinstance(data, (dict, list)):
data = json.dumps(data, ensure_ascii=False)
try:
with socket.create_connection(("127.0.0.1", self.comm_port), timeout=3.0) as s:
s.sendall((data + "\n").encode("utf-8"))
self._log("info", f"[FlaskMgr] 数据已发送到端口 {self.comm_port}")
return True
except Exception as e:
self._log("error", f"[FlaskMgr] 发送失败: {e}")
return False
# ========= 停止 =========
def stop(self):
with self._lock:
if not self.process:
return
pid = self.process.pid
self._log("info", f"[FlaskMgr] 正在停止子进程 PID={pid}")
try:
parent = psutil.Process(pid)
for child in parent.children(recursive=True):
try:
child.kill()
except Exception:
pass
parent.kill()
parent.wait(timeout=3)
except psutil.NoSuchProcess:
pass
except Exception as e:
self._log("error", f"[FlaskMgr] 停止子进程异常: {e}")
finally:
self.process = None
self._stop_event.set()
# ========= 看门狗 =========
def _monitor(self):
self._log("info", "[FlaskWD] 看门狗线程启动")
verbose = os.getenv("FLASK_WD_VERBOSE", "0") == "1"
last_ok = 0.0
while not self._stop_event.wait(2.0):
alive = self._port_alive()
if alive:
self._fail_count = 0
if verbose and (time.time() - last_ok) >= 60:
self._log("info", f"[FlaskWD] OK {self.comm_port} alive")
last_ok = time.time()
continue
self._fail_count += 1
self._log("warn", f"[FlaskWD] 探测失败 {self._fail_count}/{self._FAIL_THRESHOLD}")
if self._fail_count >= self._FAIL_THRESHOLD:
now = time.time()
if now - self._last_restart_time < self._COOLDOWN_SEC:
self._log("warn", "[FlaskWD] 冷却中,跳过重启")
continue
# 限速10分钟内超过MAX_RESTARTS则不再重启
self._restart_times = [t for t in self._restart_times if now - t < self._RESTART_WINDOW]
if len(self._restart_times) >= self._MAX_RESTARTS:
self._log("error", f"[FlaskWD] 10分钟内重启次数过多({len(self._restart_times)}次),暂停看门狗")
break
self._restart_times.append(now)
self._log("warn", "[FlaskWD] 端口不通,准备重启 Flask")
with self._lock:
try:
self.stop()
time.sleep(1)
self.start()
self._fail_count = 0
self._last_restart_time = now
self._log("info", "[FlaskWD] Flask 已成功重启")
from Module.DeviceInfo import DeviceInfo
info = DeviceInfo()
with info._lock:
for m in info._models.values():
try:
self.send(m.toDict())
except Exception:
pass
except Exception as e:
self._log("error", f"[FlaskWD] 自动重启失败: {e}")
time.sleep(3)
# ========= 辅助 =========
def _port_alive(self) -> bool:
try:
with socket.create_connection(("127.0.0.1", self.comm_port), timeout=0.6):
return True
except Exception:
return False
def _wait_port_open(self, timeout: float) -> bool:
start = time.time()
while time.time() - start < timeout:
if self._port_alive():
return True
time.sleep(0.2)
return False
def _is_alive(self) -> bool:
return self.process and self.process.poll() is None and self._port_alive()
@classmethod
def get_instance(cls) -> 'FlaskSubprocessManager':
return cls()