Files
iOSAI/Module/FlaskSubprocessManager.py
2025-09-15 16:01:27 +08:00

177 lines
6.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import subprocess
import sys
import threading
import atexit
import json
import os
import socket
import time
from pathlib import Path
from typing import Optional, Union, Dict, List
from Utils.LogManager import LogManager
class FlaskSubprocessManager:
_instance: Optional['FlaskSubprocessManager'] = None
_lock: threading.Lock = threading.Lock()
def __new__(cls):
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._init_manager()
return cls._instance
def _init_manager(self):
self.process: Optional[subprocess.Popen] = None
self.comm_port = 34566
self._stop_event = threading.Event()
self._monitor_thread: Optional[threading.Thread] = None
atexit.register(self.stop)
LogManager.info("FlaskSubprocessManager 单例已初始化", udid="system")
# ---------- 启动 ----------
def start(self):
with self._lock:
if self._is_alive():
LogManager.warning("子进程已在运行,无需重复启动", udid="system")
return
env = os.environ.copy()
env["FLASK_COMM_PORT"] = str(self.comm_port)
exe_path = Path(sys.executable).resolve()
if exe_path.name.lower() in ("python.exe", "pythonw.exe"):
exe_path = Path(sys.argv[0]).resolve()
is_frozen = exe_path.suffix.lower() == ".exe" and exe_path.exists()
if is_frozen:
cmd = [str(exe_path), "--role=flask"]
cwd = str(exe_path.parent)
else:
cmd = [sys.executable, "-u", "-m", "Module.Main", "--role=flask"]
cwd = str(Path(__file__).resolve().parent)
LogManager.info(f"准备启动 Flask 子进程: {cmd} cwd={cwd}", udid="system")
# 关键:不再自己 open 文件,直接走 LogManager
# 用 PIPE 捕获,再转存到 system 级日志
self.process = subprocess.Popen(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8",
errors="replace",
bufsize=1,
env=env,
cwd=cwd,
start_new_session=True
)
# 守护线程:把子进程 stdout → LogManager.info/system
threading.Thread(target=self._flush_stdout, daemon=True).start()
LogManager.info(f"Flask 子进程已启动PID={self.process.pid},端口={self.comm_port}", udid="system")
if not self._wait_port_open(timeout=10):
LogManager.error("等待端口监听超时,启动失败", udid="system")
self.stop()
raise RuntimeError("Flask 启动后 10 s 内未监听端口")
self._monitor_thread = threading.Thread(target=self._monitor, daemon=True)
self._monitor_thread.start()
LogManager.info("端口守护线程已启动", udid="system")
# ---------- 实时把子进程 stdout 刷到 system 日志 ----------
def _flush_stdout(self):
for line in iter(self.process.stdout.readline, ""):
if line:
LogManager.info(line.rstrip(), udid="system")
self.process.stdout.close()
# ---------- 发送 ----------
def send(self, data: Union[str, Dict, List]) -> bool:
if isinstance(data, (dict, list)):
data = json.dumps(data, ensure_ascii=False)
try:
with socket.create_connection(("127.0.0.1", self.comm_port), timeout=3.0) as s:
s.sendall((data + "\n").encode("utf-8"))
LogManager.info(f"数据已成功发送到 Flask 端口:{self.comm_port}", udid="system")
return True
except Exception as e:
LogManager.error(f"发送失败:{e}", udid="system")
return False
# ---------- 停止 ----------
def stop(self):
with self._lock:
if getattr(self, 'process', None) is None:
LogManager.info("无子进程需要停止", udid="system")
return
pid = self.process.pid
LogManager.info(f"正在停止 Flask 子进程 PID={pid}", udid="system")
try:
self.process.terminate()
try:
self.process.wait(timeout=3)
except subprocess.TimeoutExpired:
LogManager.warning("软杀超时,强制杀进程树", udid="system")
import psutil
parent = psutil.Process(pid)
for child in parent.children(recursive=True):
child.kill()
parent.kill()
self.process.wait()
LogManager.info("Flask 子进程已停止", udid="system")
except Exception as e:
LogManager.error(f"停止子进程时异常:{e}", udid="system")
finally:
self.process = None
self._stop_event.set()
# ---------- 端口守护 ----------
def _monitor(self):
LogManager.info("守护线程开始运行,周期性检查端口存活", udid="system")
while not self._stop_event.wait(1.0):
if not self._port_alive():
LogManager.error("检测到端口不通,准备重启 Flask", udid="system")
with self._lock:
if self.process and self.process.poll() is None:
self.stop()
try:
self.start()
except Exception as e:
LogManager.error(f"自动重启失败:{e}", udid="system")
time.sleep(2)
# ---------- 辅助 ----------
def _is_port_busy(self, port: int) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(0.2)
return s.connect_ex(("127.0.0.1", port)) == 0
def _port_alive(self) -> bool:
try:
with socket.create_connection(("127.0.0.1", self.comm_port), timeout=0.5):
return True
except Exception:
return False
def _wait_port_open(self, timeout: float) -> bool:
t0 = time.time()
while time.time() - t0 < timeout:
if self._port_alive():
return True
time.sleep(0.2)
return False
def _is_alive(self) -> bool:
return self.process is not None and self.process.poll() is None and self._port_alive()
@classmethod
def get_instance(cls) -> 'FlaskSubprocessManager':
return cls()