import subprocess import sys import threading import atexit import json import os import socket import time from pathlib import Path from typing import Optional, Union, Dict, List import psutil from Utils.LogManager import LogManager class FlaskSubprocessManager: _instance: Optional['FlaskSubprocessManager'] = None _lock: threading.Lock = threading.Lock() def __new__(cls): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._init_manager() return cls._instance def _init_manager(self): self.process: Optional[subprocess.Popen] = None self.comm_port = 34566 self._stop_event = threading.Event() self._monitor_thread: Optional[threading.Thread] = None # 新增:启动前先把可能残留的 Flask 干掉 self._kill_orphan_flask() atexit.register(self.stop) LogManager.info("FlaskSubprocessManager 单例已初始化", udid="system") def _kill_orphan_flask(self): """根据端口 34566 把遗留进程全部杀掉""" try: if os.name == "nt": # Windows out = subprocess.check_output( ["netstat", "-ano"], text=True, startupinfo=self._si ) for line in out.splitlines(): if f"127.0.0.1:{self.comm_port}" in line and "LISTENING" in line: pid = int(line.strip().split()[-1]) if pid != os.getpid(): subprocess.run(["taskkill", "/F", "/PID", str(pid)], startupinfo=self._si, capture_output=True) else: # macOS / Linux out = subprocess.check_output( ["lsof", "-t", f"-iTCP:{self.comm_port}", "-sTCP:LISTEN"], text=True ) for pid in map(int, out.split()): if pid != os.getpid(): os.kill(pid, 9) except Exception: pass # ---------- 启动 ---------- def start(self): with self._lock: if self._is_alive(): LogManager.warning("子进程已在运行,无需重复启动", udid="system") return env = os.environ.copy() env["FLASK_COMM_PORT"] = str(self.comm_port) exe_path = Path(sys.executable).resolve() if exe_path.name.lower() in ("python.exe", "pythonw.exe"): exe_path = Path(sys.argv[0]).resolve() is_frozen = exe_path.suffix.lower() == ".exe" and exe_path.exists() if is_frozen: cmd = [str(exe_path), "--role=flask"] cwd = str(exe_path.parent) else: cmd = [sys.executable, "-u", "-m", "Module.Main", "--role=flask"] cwd = str(Path(__file__).resolve().parent) LogManager.info(f"准备启动 Flask 子进程: {cmd} cwd={cwd}", udid="system") # 关键:不再自己 open 文件,直接走 LogManager # 用 PIPE 捕获,再转存到 system 级日志 self.process = subprocess.Popen( cmd, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding="utf-8", errors="replace", bufsize=1, env=env, cwd=cwd, start_new_session=True ) # 守护线程:把子进程 stdout → LogManager.info/system threading.Thread(target=self._flush_stdout, daemon=True).start() LogManager.info(f"Flask 子进程已启动,PID={self.process.pid},端口={self.comm_port}", udid="system") if not self._wait_port_open(timeout=10): LogManager.error("等待端口监听超时,启动失败", udid="system") self.stop() raise RuntimeError("Flask 启动后 10 s 内未监听端口") self._monitor_thread = threading.Thread(target=self._monitor, daemon=True) self._monitor_thread.start() LogManager.info("端口守护线程已启动", udid="system") # ---------- 实时把子进程 stdout 刷到 system 日志 ---------- def _flush_stdout(self): for line in iter(self.process.stdout.readline, ""): if line: LogManager.info(line.rstrip(), udid="system") # 同时输出到控制台 print(line.rstrip()) # 打印到主进程的控制台 self.process.stdout.close() # ---------- 发送 ---------- def send(self, data: Union[str, Dict, List]) -> bool: if isinstance(data, (dict, list)): data = json.dumps(data, ensure_ascii=False) try: with socket.create_connection(("127.0.0.1", self.comm_port), timeout=3.0) as s: s.sendall((data + "\n").encode("utf-8")) LogManager.info(f"数据已成功发送到 Flask 端口:{self.comm_port}", udid="system") return True except Exception as e: LogManager.error(f"发送失败:{e}", udid="system") return False # ---------- 停止 ---------- def stop(self): with self._lock: if not self.process: return pid = self.process.pid LogManager.info(f"正在停止 Flask 子进程 PID={pid}", udid="system") try: # 1. 杀整棵树(Windows 也适用) parent = psutil.Process(pid) for child in parent.children(recursive=True): child.kill() parent.kill() gone, alive = psutil.wait_procs([parent] + parent.children(), timeout=3) for p in alive: p.kill() # 保险再补一刀 self.process.wait() except psutil.NoSuchProcess: pass except Exception as e: LogManager.error(f"停止子进程异常:{e}", udid="system") finally: self.process = None self._stop_event.set() # ---------- 端口守护 ---------- def _monitor(self): LogManager.info("守护线程开始运行,周期性检查端口存活", udid="system") while not self._stop_event.wait(1.0): if not self._port_alive(): LogManager.error("检测到端口不通,准备重启 Flask", udid="system") with self._lock: if self.process and self.process.poll() is None: self.stop() try: self.start() from Module.DeviceInfo import DeviceInfo # 重新发送设备相关数据到flask info = DeviceInfo() for model in info._models.keys(): self.send(model) except Exception as e: LogManager.error(f"自动重启失败:{e}", udid="system") time.sleep(2) # ---------- 辅助 ---------- def _is_port_busy(self, port: int) -> bool: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(0.2) return s.connect_ex(("127.0.0.1", port)) == 0 def _port_alive(self) -> bool: try: with socket.create_connection(("127.0.0.1", self.comm_port), timeout=0.5): return True except Exception: return False def _wait_port_open(self, timeout: float) -> bool: t0 = time.time() while time.time() - t0 < timeout: if self._port_alive(): return True time.sleep(0.2) return False def _is_alive(self) -> bool: return self.process is not None and self.process.poll() is None and self._port_alive() @classmethod def get_instance(cls) -> 'FlaskSubprocessManager': return cls()