# -*- coding: utf-8 -*- import subprocess import sys import threading import atexit import json import os import socket import time from pathlib import Path from typing import Optional, Union, Dict, List import psutil from Utils.LogManager import LogManager class FlaskSubprocessManager: """Flask 子进程守护 + 看门狗 + 稳定增强""" _instance: Optional['FlaskSubprocessManager'] = None _lock = threading.RLock() def __new__(cls): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._init_manager() return cls._instance def _init_manager(self): self.process: Optional[subprocess.Popen] = None self.comm_port = 34566 self._stop_event = threading.Event() self._monitor_thread: Optional[threading.Thread] = None # 看门狗参数 self._FAIL_THRESHOLD = int(os.getenv("FLASK_WD_FAIL_THRESHOLD", "5")) # 连续失败多少次重启 self._COOLDOWN_SEC = float(os.getenv("FLASK_WD_COOLDOWN", "10")) # 两次重启间隔 self._MAX_RESTARTS = int(os.getenv("FLASK_WD_MAX_RESTARTS", "5")) # 10分钟最多几次重启 self._RESTART_WINDOW = 600 # 10分钟 self._restart_times: List[float] = [] self._fail_count = 0 self._last_restart_time = 0.0 self._watchdog_thread = None # ✅ 初始化 self._running = False # ✅ 初始化 # Windows 隐藏子窗口启动参数 self._si = None if os.name == "nt": si = subprocess.STARTUPINFO() si.dwFlags |= subprocess.STARTF_USESHOWWINDOW si.wShowWindow = 0 self._si = si self._kill_orphan_flask() atexit.register(self.stop) self._log("info", "FlaskSubprocessManager 初始化完成") # ========= 日志工具 ========= def _log(self, level: str, msg: str, udid="system"): """同时写 LogManager + 控制台""" try: if level == "info": LogManager.info(msg, udid=udid) elif level in ("warn", "warning"): LogManager.warning(msg, udid=udid) elif level == "error": LogManager.error(msg, udid=udid) else: LogManager.info(msg, udid=udid) except Exception: pass print(msg) # ========= 杀残留 Flask ========= def _kill_orphan_flask(self): try: if os.name == "nt": out = subprocess.check_output(["netstat", "-ano"], text=True, startupinfo=self._si) for line in out.splitlines(): if f"127.0.0.1:{self.comm_port}" in line and "LISTENING" in line: pid = int(line.strip().split()[-1]) if pid != os.getpid(): subprocess.run(["taskkill", "/F", "/PID", str(pid)], startupinfo=self._si, capture_output=True) self._log("warn", f"[FlaskMgr] 杀死残留进程 PID={pid}") else: out = subprocess.check_output(["lsof", "-t", f"-iTCP:{self.comm_port}", "-sTCP:LISTEN"], text=True) for pid in map(int, out.split()): if pid != os.getpid(): os.kill(pid, 9) self._log("warn", f"[FlaskMgr] 杀死残留进程 PID={pid}") except Exception: pass # ========= 启动 ========= def start(self): with self._lock: if self._is_alive(): self._log("warn", "[FlaskMgr] 子进程已在运行,无需重复启动") return env = os.environ.copy() env["FLASK_COMM_PORT"] = str(self.comm_port) exe_path = Path(sys.executable).resolve() if exe_path.name.lower() in ("python.exe", "pythonw.exe"): exe_path = Path(sys.argv[0]).resolve() is_frozen = exe_path.suffix.lower() == ".exe" and exe_path.exists() if is_frozen: cmd = [str(exe_path), "--role=flask"] cwd = str(exe_path.parent) else: project_root = Path(__file__).resolve().parents[1] candidates = [ project_root / "Module" / "Main.py", project_root / "Main.py", ] main_path = next((p for p in candidates if p.is_file()), None) if main_path: cmd = [sys.executable, "-u", str(main_path), "--role=flask"] else: cmd = [sys.executable, "-u", "-m", "Module.Main", "--role=flask"] cwd = str(project_root) self._log("info", f"[FlaskMgr] 启动命令: {cmd}, cwd={cwd}") self.process = subprocess.Popen( cmd, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding="utf-8", errors="replace", bufsize=1, env=env, cwd=cwd, start_new_session=True, startupinfo=self._si ) threading.Thread(target=self._flush_stdout, daemon=True).start() self._log("info", f"[FlaskMgr] Flask 子进程已启动,PID={self.process.pid}") if not self._wait_port_open(timeout=10): self._log("error", "[FlaskMgr] 启动失败,端口未监听") self.stop() raise RuntimeError("Flask 启动后 10s 内未监听端口") if not self._monitor_thread or not self._monitor_thread.is_alive(): self._monitor_thread = threading.Thread(target=self._monitor, daemon=True) self._monitor_thread.start() self._log("info", "[FlaskWD] 守护线程已启动") # ========= stdout捕获 ========= def _flush_stdout(self): if not self.process or not self.process.stdout: return for line in iter(self.process.stdout.readline, ""): if line: self._log("info", line.rstrip()) self.process.stdout.close() # ========= 发送 ========= def send(self, data: Union[str, Dict, List]) -> bool: if isinstance(data, (dict, list)): data = json.dumps(data, ensure_ascii=False) try: with socket.create_connection(("127.0.0.1", self.comm_port), timeout=3.0) as s: s.sendall((data + "\n").encode("utf-8")) self._log("info", f"[FlaskMgr] 数据已发送到端口 {self.comm_port}") return True except Exception as e: self._log("error", f"[FlaskMgr] 发送失败: {e}") return False # ========= 停止 ========= def stop(self, *, stop_watchdog: bool = True): with self._lock: # 1) 先停子进程 if self.process: try: self.process.terminate() except Exception: pass try: self.process.wait(timeout=3) except Exception: pass if self.process and self.process.poll() is None: try: self.process.kill() except Exception: pass self.process = None # 2) 再考虑是否停 watchdog if stop_watchdog and self._watchdog_thread and self._watchdog_thread.is_alive(): # 关键:不要 join 自己 if threading.current_thread() is not self._watchdog_thread: self._running = False try: self._watchdog_thread.join(timeout=2.0) except Exception: pass self._watchdog_thread = None else: # 如果是 watchdog 自己触发的 stop,绝不 join 自己 # 也不要把句柄清空,保持线程继续执行后面的重启流程 self._running = True # ========= 看门狗 ========= def _monitor(self): self._log("info", "[FlaskWD] 看门狗线程启动") verbose = os.getenv("FLASK_WD_VERBOSE", "0") == "1" last_ok = 0.0 while not self._stop_event.wait(2.0): alive = self._port_alive() if alive: self._fail_count = 0 if verbose and (time.time() - last_ok) >= 60: self._log("info", f"[FlaskWD] OK {self.comm_port} alive") last_ok = time.time() continue self._fail_count += 1 self._log("warn", f"[FlaskWD] 探测失败 {self._fail_count}/{self._FAIL_THRESHOLD}") if self._fail_count >= self._FAIL_THRESHOLD: now = time.time() if now - self._last_restart_time < self._COOLDOWN_SEC: self._log("warn", "[FlaskWD] 冷却中,跳过重启") continue # 限速:10分钟内超过MAX_RESTARTS则不再重启 self._restart_times = [t for t in self._restart_times if now - t < self._RESTART_WINDOW] if len(self._restart_times) >= self._MAX_RESTARTS: self._log("error", f"[FlaskWD] 10分钟内重启次数过多({len(self._restart_times)}次),暂停看门狗") break self._restart_times.append(now) self._log("warn", "[FlaskWD] 端口不通,准备重启 Flask") with self._lock: try: self.stop() time.sleep(1) self.start() self._fail_count = 0 self._last_restart_time = now self._log("info", "[FlaskWD] Flask 已成功重启") from Module.DeviceInfo import DeviceInfo info = DeviceInfo() with info._lock: for m in info._models.values(): try: self.send(m.toDict()) except Exception: pass except Exception as e: self._log("error", f"[FlaskWD] 自动重启失败: {e}") time.sleep(3) # ========= 辅助 ========= def _port_alive(self) -> bool: def ping(p): try: with socket.create_connection(("127.0.0.1", p), timeout=0.6): return True except Exception: return False p1 = self.comm_port p2 = self.comm_port + 1 return ping(p1) or ping(p2) def _wait_port_open(self, timeout: float) -> bool: start = time.time() while time.time() - start < timeout: if self._port_alive(): return True time.sleep(0.2) return False def _is_alive(self) -> bool: return self.process and self.process.poll() is None and self._port_alive() @classmethod def get_instance(cls) -> 'FlaskSubprocessManager': return cls()