import subprocess import threading import atexit import json import os import socket import time from typing import Optional, Union, Dict, List class FlaskSubprocessManager: _instance: Optional['FlaskSubprocessManager'] = None _lock: threading.Lock = threading.Lock() def __new__(cls): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._init_manager() return cls._instance def _init_manager(self): self.process: Optional[subprocess.Popen] = None self.comm_port = self._find_available_port() self._stop_event = threading.Event() atexit.register(self.stop) def _find_available_port(self): """动态获取可用端口""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('0.0.0.0', 0)) return s.getsockname()[1] def start(self): """启动子进程(Windows兼容方案)""" with self._lock: if self.process is not None: raise RuntimeError("子进程已在运行中!") # 通过环境变量传递通信端口 env = os.environ.copy() env['FLASK_COMM_PORT'] = str(self.comm_port) self.process = subprocess.Popen( ['python', 'Flask/FlaskService.py'], # 启动一个子进程 FlaskService.py stdin=subprocess.PIPE, # 标准输入流,用于向子进程发送数据 stdout=subprocess.PIPE, # 标准输出流,用于接收子进程的输出 stderr=subprocess.PIPE, # 标准错误流,用于接收子进程的错误信息 text=True, # 以文本模式打开流,否则以二进制模式打开 bufsize=1, # 缓冲区大小设置为 1,表示行缓冲 encoding='utf-8', # 指定编码为 UTF-8,确保控制台输出不会报错 env=env # 指定子进程的环境变量 ) print(f"Flask子进程启动 (PID: {self.process.pid}, 通信端口: {self.comm_port})") # 将日志通过主进程输出 def print_output(stream, stream_name): while True: line = stream.readline() if not line: break print(f"{stream_name}: {line.strip()}") # 启动两个线程分别处理 stdout 和 stderr threading.Thread(target=print_output, args=(self.process.stdout, "STDOUT"), daemon=True).start() threading.Thread(target=print_output, args=(self.process.stderr, "STDERR"), daemon=True).start() def send(self, data: Union[str, Dict, List]) -> bool: """通过Socket发送数据""" try: if not isinstance(data, str): data = json.dumps(data) # 等待子进程启动并准备好 time.sleep(1) # 延时1秒,根据实际情况调整 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.connect(('127.0.0.1', self.comm_port)) s.sendall((data + "\n").encode('utf-8')) return True except ConnectionRefusedError: print(f"连接被拒绝,确保子进程在端口 {self.comm_port} 上监听") return False except Exception as e: print(f"发送失败: {e}") return False def stop(self): with self._lock: if self.process and self.process.poll() is None: print(f"[INFO] Stopping Flask child process (PID: {self.process.pid})...") self.process.terminate() self.process.wait() print("[INFO] Flask child process stopped.") self._stop_event.set() else: print("[INFO] No Flask child process to stop.") @classmethod def get_instance(cls) -> 'FlaskSubprocessManager': return cls()