Files
iOSAI/Module/FlaskSubprocessManager.py
2025-09-17 22:23:57 +08:00

206 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import subprocess
import sys
import threading
import atexit
import json
import os
import socket
import time
from pathlib import Path
from typing import Optional, Union, Dict, List
import psutil
from Utils.LogManager import LogManager
class FlaskSubprocessManager:
_instance: Optional['FlaskSubprocessManager'] = None
_lock: threading.Lock = threading.Lock()
def __new__(cls):
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._init_manager()
return cls._instance
def _init_manager(self):
self.process: Optional[subprocess.Popen] = None
self.comm_port = 34566
self._stop_event = threading.Event()
self._monitor_thread: Optional[threading.Thread] = None
# 新增:启动前先把可能残留的 Flask 干掉
self._kill_orphan_flask()
atexit.register(self.stop)
LogManager.info("FlaskSubprocessManager 单例已初始化", udid="system")
def _kill_orphan_flask(self):
"""根据端口 34566 把遗留进程全部杀掉"""
try:
if os.name == "nt":
# Windows
out = subprocess.check_output(
["netstat", "-ano"],
text=True, startupinfo=self._si
)
for line in out.splitlines():
if f"127.0.0.1:{self.comm_port}" in line and "LISTENING" in line:
pid = int(line.strip().split()[-1])
if pid != os.getpid():
subprocess.run(["taskkill", "/F", "/PID", str(pid)],
startupinfo=self._si,
capture_output=True)
else:
# macOS / Linux
out = subprocess.check_output(
["lsof", "-t", f"-iTCP:{self.comm_port}", "-sTCP:LISTEN"],
text=True
)
for pid in map(int, out.split()):
if pid != os.getpid():
os.kill(pid, 9)
except Exception:
pass
# ---------- 启动 ----------
def start(self):
with self._lock:
if self._is_alive():
LogManager.warning("子进程已在运行,无需重复启动", udid="system")
return
env = os.environ.copy()
env["FLASK_COMM_PORT"] = str(self.comm_port)
exe_path = Path(sys.executable).resolve()
if exe_path.name.lower() in ("python.exe", "pythonw.exe"):
exe_path = Path(sys.argv[0]).resolve()
is_frozen = exe_path.suffix.lower() == ".exe" and exe_path.exists()
if is_frozen:
cmd = [str(exe_path), "--role=flask"]
cwd = str(exe_path.parent)
else:
cmd = [sys.executable, "-u", "-m", "Module.Main", "--role=flask"]
cwd = str(Path(__file__).resolve().parent)
LogManager.info(f"准备启动 Flask 子进程: {cmd} cwd={cwd}", udid="system")
# 关键:不再自己 open 文件,直接走 LogManager
# 用 PIPE 捕获,再转存到 system 级日志
self.process = subprocess.Popen(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8",
errors="replace",
bufsize=1,
env=env,
cwd=cwd,
start_new_session=True
)
# 守护线程:把子进程 stdout → LogManager.info/system
threading.Thread(target=self._flush_stdout, daemon=True).start()
LogManager.info(f"Flask 子进程已启动PID={self.process.pid},端口={self.comm_port}", udid="system")
if not self._wait_port_open(timeout=10):
LogManager.error("等待端口监听超时,启动失败", udid="system")
self.stop()
raise RuntimeError("Flask 启动后 10 s 内未监听端口")
self._monitor_thread = threading.Thread(target=self._monitor, daemon=True)
self._monitor_thread.start()
LogManager.info("端口守护线程已启动", udid="system")
# ---------- 实时把子进程 stdout 刷到 system 日志 ----------
def _flush_stdout(self):
for line in iter(self.process.stdout.readline, ""):
if line:
LogManager.info(line.rstrip(), udid="system")
self.process.stdout.close()
# ---------- 发送 ----------
def send(self, data: Union[str, Dict, List]) -> bool:
if isinstance(data, (dict, list)):
data = json.dumps(data, ensure_ascii=False)
try:
with socket.create_connection(("127.0.0.1", self.comm_port), timeout=3.0) as s:
s.sendall((data + "\n").encode("utf-8"))
LogManager.info(f"数据已成功发送到 Flask 端口:{self.comm_port}", udid="system")
return True
except Exception as e:
LogManager.error(f"发送失败:{e}", udid="system")
return False
# ---------- 停止 ----------
def stop(self):
with self._lock:
if not self.process:
return
pid = self.process.pid
LogManager.info(f"正在停止 Flask 子进程 PID={pid}", udid="system")
try:
# 1. 杀整棵树Windows 也适用)
parent = psutil.Process(pid)
for child in parent.children(recursive=True):
child.kill()
parent.kill()
gone, alive = psutil.wait_procs([parent] + parent.children(), timeout=3)
for p in alive:
p.kill() # 保险再补一刀
self.process.wait()
except psutil.NoSuchProcess:
pass
except Exception as e:
LogManager.error(f"停止子进程异常:{e}", udid="system")
finally:
self.process = None
self._stop_event.set()
# ---------- 端口守护 ----------
def _monitor(self):
LogManager.info("守护线程开始运行,周期性检查端口存活", udid="system")
while not self._stop_event.wait(1.0):
if not self._port_alive():
LogManager.error("检测到端口不通,准备重启 Flask", udid="system")
with self._lock:
if self.process and self.process.poll() is None:
self.stop()
try:
self.start()
except Exception as e:
LogManager.error(f"自动重启失败:{e}", udid="system")
time.sleep(2)
# ---------- 辅助 ----------
def _is_port_busy(self, port: int) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(0.2)
return s.connect_ex(("127.0.0.1", port)) == 0
def _port_alive(self) -> bool:
try:
with socket.create_connection(("127.0.0.1", self.comm_port), timeout=0.5):
return True
except Exception:
return False
def _wait_port_open(self, timeout: float) -> bool:
t0 = time.time()
while time.time() - t0 < timeout:
if self._port_alive():
return True
time.sleep(0.2)
return False
def _is_alive(self) -> bool:
return self.process is not None and self.process.poll() is None and self._port_alive()
@classmethod
def get_instance(cls) -> 'FlaskSubprocessManager':
return cls()