import subprocess import sys import threading import atexit import json import os import socket import time from pathlib import Path from typing import Optional, Union, Dict, List class FlaskSubprocessManager: _instance: Optional['FlaskSubprocessManager'] = None _lock: threading.Lock = threading.Lock() def __new__(cls): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._init_manager() return cls._instance def _init_manager(self): self.process: Optional[subprocess.Popen] = None self.comm_port = 34567 self._stop_event = threading.Event() atexit.register(self.stop) # 可以把 _find_available_port 留着备用,但 start 前先校验端口是否被占用 def _is_port_busy(self, port: int) -> bool: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(0.2) return s.connect_ex(("127.0.0.1", port)) == 0 # 启动flask def start(self): """启动 Flask 子进程(兼容打包后的 exe 和源码运行)""" with self._lock: if self.process is not None: raise RuntimeError("子进程已在运行中!") env = os.environ.copy() env["FLASK_COMM_PORT"] = str(self.comm_port) # —— 解析打包 exe 的稳健写法 —— exe_path = Path(sys.executable).resolve() if exe_path.name.lower() in ("python.exe", "pythonw.exe"): # Nuitka 某些场景里 sys.executable 可能指向 dist\python.exe(并不存在) exe_path = Path(sys.argv[0]).resolve() is_frozen = exe_path.suffix.lower() == ".exe" and exe_path.exists() if is_frozen: # 打包后的 exe:用当前 exe 自举 cmd = [str(exe_path), "--role=flask"] cwd = str(exe_path.parent) else: # 源码运行:模块方式更稳 cmd = [sys.executable, "-m", "Module.Main", "--role=flask"] cwd = str(Path(__file__).resolve().parent) # Module 目录 print(f"[DEBUG] spawn: {cmd} (cwd={cwd}) exists(exe)={os.path.exists(cmd[0])}") self.process = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding="utf-8", errors="replace", # 新增:遇到非 UTF-8 字节用 � 代替,避免崩溃 bufsize=1, env=env, cwd=cwd, ) print(f"Flask子进程启动 (PID: {self.process.pid}, 端口: {self.comm_port})") def print_output(stream, stream_name): while True: line = stream.readline() if not line: break print(f"{stream_name}: {line.strip()}") threading.Thread(target=print_output, args=(self.process.stdout, "STDOUT"), daemon=True).start() threading.Thread(target=print_output, args=(self.process.stderr, "STDERR"), daemon=True).start() def send(self, data: Union[str, Dict, List]) -> bool: """通过Socket发送数据""" try: if not isinstance(data, str): data = json.dumps(data) # 等待子进程启动并准备好 time.sleep(1) # 延时1秒,根据实际情况调整 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.connect(('127.0.0.1', self.comm_port)) s.sendall((data + "\n").encode('utf-8')) return True except ConnectionRefusedError: print(f"连接被拒绝,确保子进程在端口 {self.comm_port} 上监听") return False except Exception as e: print(f"发送失败: {e}") return False def stop(self): with self._lock: if self.process and self.process.poll() is None: print(f"[INFO] Stopping Flask child process (PID: {self.process.pid})...") self.process.terminate() self.process.wait() print("[INFO] Flask child process stopped.") self._stop_event.set() else: print("[INFO] No Flask child process to stop.") @classmethod def get_instance(cls) -> 'FlaskSubprocessManager': return cls()