Merge remote-tracking branch 'origin/main'

# Conflicts:
#	.idea/workspace.xml
This commit is contained in:
2025-09-15 22:41:12 +08:00
8 changed files with 284 additions and 155 deletions

View File

@@ -119,7 +119,6 @@ class Deviceinfo(object):
continue
if len(self.deviceModelList) >= self.maxDeviceCount:
continue
port = self._alloc_port()
try:
self.connectDevice(udid) # 内部会 _add_model
except Exception as e:

View File

@@ -62,27 +62,66 @@ def start_socket_listener():
print(f"[INFO] Socket listener started on port {port}, waiting for connections...")
while True:
try:
LogManager.info(f"[INFO] Waiting for a new connection on port {port}...")
print(f"[INFO] Waiting for a new connection on port {port}...")
conn, addr = s.accept()
LogManager.info(f"[INFO] Connection accepted from: {addr}")
print(f"[INFO] Connection accepted from: {addr}")
LogManager.info(f"[INFO] Connection from {addr}")
except Exception as e:
LogManager.error(f"[ERROR] accept 失败: {e}")
continue
raw_data = conn.recv(1024).decode('utf-8').strip()
LogManager.info(f"[INFO] Raw data received: {raw_data}")
print(f"[INFO] Raw data received: {raw_data}")
data = json.loads(raw_data)
LogManager.info(f"[INFO] Parsed data: {data}")
print(f"[INFO] Parsed data: {data}")
dataQueue.put(data)
except Exception as conn_error:
LogManager.error(f"[ERROR]连接处理失败: {conn_error}")
print(f"[ERROR]连接处理失败: {conn_error}")
# 独立线程处理单条连接,避免单客户端异常拖垮监听线程
threading.Thread(target=_handle_conn, args=(conn, addr), daemon=True).start()
# while True:
# try:
# LogManager.info(f"[INFO] Waiting for a new connection on port {port}...")
# print(f"[INFO] Waiting for a new connection on port {port}...")
# conn, addr = s.accept()
# LogManager.info(f"[INFO] Connection accepted from: {addr}")
# print(f"[INFO] Connection accepted from: {addr}")
#
# raw_data = conn.recv(1024).decode('utf-8').strip()
# LogManager.info(f"[INFO] Raw data received: {raw_data}")
# print(f"[INFO] Raw data received: {raw_data}")
#
# data = json.loads(raw_data)
# LogManager.info(f"[INFO] Parsed data: {data}")
# print(f"[INFO] Parsed data: {data}")
# dataQueue.put(data)
# except Exception as conn_error:
# LogManager.error(f"[ERROR]连接处理失败: {conn_error}")
# print(f"[ERROR]连接处理失败: {conn_error}")
except Exception as e:
LogManager.error(f"[ERROR]Socket服务启动失败: {e}")
print(f"[ERROR]Socket服务启动失败: {e}")
def _handle_conn(conn: socket.socket, addr):
try:
with conn:
# 1. 循环收包直到拿到完整 JSON
buffer = ""
while True:
data = conn.recv(1024)
if not data: # 对端关闭
break
buffer += data.decode('utf-8', errors='ignore')
# 2. 尝试切出完整 JSON简单按行也可按长度头、分隔符
while True:
line, sep, buffer = buffer.partition('\n')
if not sep: # 没找到完整行
break
line = line.strip()
if not line: # 空行跳过
continue
try:
obj = json.loads(line)
except json.JSONDecodeError as e:
LogManager.warning(f"[WARN] 非法 JSON 丢弃: {line[:100]} {e}")
continue
# 3. 收到合法数据,塞进队列
dataQueue.put(obj)
LogManager.info(f"[INFO] 收到合法消息: {obj}")
except Exception as e:
LogManager.error(f"[ERROR] 连接处理异常: {e}")
# 在独立线程中启动Socket服务
listener_thread = threading.Thread(target=start_socket_listener, daemon=True)

View File

@@ -27,104 +27,150 @@ class FlaskSubprocessManager:
self.process: Optional[subprocess.Popen] = None
self.comm_port = 34566
self._stop_event = threading.Event()
self._monitor_thread: Optional[threading.Thread] = None
atexit.register(self.stop)
LogManager.info("FlaskSubprocessManager 单例已初始化", udid="system")
# 可以把 _find_available_port 留着备用,但 start 前先校验端口是否被占用
# ---------- 启动 ----------
def start(self):
with self._lock:
if self._is_alive():
LogManager.warning("子进程已在运行,无需重复启动", udid="system")
return
env = os.environ.copy()
env["FLASK_COMM_PORT"] = str(self.comm_port)
exe_path = Path(sys.executable).resolve()
if exe_path.name.lower() in ("python.exe", "pythonw.exe"):
exe_path = Path(sys.argv[0]).resolve()
is_frozen = exe_path.suffix.lower() == ".exe" and exe_path.exists()
if is_frozen:
cmd = [str(exe_path), "--role=flask"]
cwd = str(exe_path.parent)
else:
cmd = [sys.executable, "-u", "-m", "Module.Main", "--role=flask"]
cwd = str(Path(__file__).resolve().parent)
LogManager.info(f"准备启动 Flask 子进程: {cmd} cwd={cwd}", udid="system")
# 关键:不再自己 open 文件,直接走 LogManager
# 用 PIPE 捕获,再转存到 system 级日志
self.process = subprocess.Popen(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8",
errors="replace",
bufsize=1,
env=env,
cwd=cwd,
start_new_session=True
)
# 守护线程:把子进程 stdout → LogManager.info/system
threading.Thread(target=self._flush_stdout, daemon=True).start()
LogManager.info(f"Flask 子进程已启动PID={self.process.pid},端口={self.comm_port}", udid="system")
if not self._wait_port_open(timeout=10):
LogManager.error("等待端口监听超时,启动失败", udid="system")
self.stop()
raise RuntimeError("Flask 启动后 10 s 内未监听端口")
self._monitor_thread = threading.Thread(target=self._monitor, daemon=True)
self._monitor_thread.start()
LogManager.info("端口守护线程已启动", udid="system")
# ---------- 实时把子进程 stdout 刷到 system 日志 ----------
def _flush_stdout(self):
for line in iter(self.process.stdout.readline, ""):
if line:
LogManager.info(line.rstrip(), udid="system")
self.process.stdout.close()
# ---------- 发送 ----------
def send(self, data: Union[str, Dict, List]) -> bool:
if isinstance(data, (dict, list)):
data = json.dumps(data, ensure_ascii=False)
try:
with socket.create_connection(("127.0.0.1", self.comm_port), timeout=3.0) as s:
s.sendall((data + "\n").encode("utf-8"))
LogManager.info(f"数据已成功发送到 Flask 端口:{self.comm_port}", udid="system")
return True
except Exception as e:
LogManager.error(f"发送失败:{e}", udid="system")
return False
# ---------- 停止 ----------
def stop(self):
with self._lock:
if getattr(self, 'process', None) is None:
LogManager.info("无子进程需要停止", udid="system")
return
pid = self.process.pid
LogManager.info(f"正在停止 Flask 子进程 PID={pid}", udid="system")
try:
self.process.terminate()
try:
self.process.wait(timeout=3)
except subprocess.TimeoutExpired:
LogManager.warning("软杀超时,强制杀进程树", udid="system")
import psutil
parent = psutil.Process(pid)
for child in parent.children(recursive=True):
child.kill()
parent.kill()
self.process.wait()
LogManager.info("Flask 子进程已停止", udid="system")
except Exception as e:
LogManager.error(f"停止子进程时异常:{e}", udid="system")
finally:
self.process = None
self._stop_event.set()
# ---------- 端口守护 ----------
def _monitor(self):
LogManager.info("守护线程开始运行,周期性检查端口存活", udid="system")
while not self._stop_event.wait(1.0):
if not self._port_alive():
LogManager.error("检测到端口不通,准备重启 Flask", udid="system")
with self._lock:
if self.process and self.process.poll() is None:
self.stop()
try:
self.start()
except Exception as e:
LogManager.error(f"自动重启失败:{e}", udid="system")
time.sleep(2)
# ---------- 辅助 ----------
def _is_port_busy(self, port: int) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(0.2)
return s.connect_ex(("127.0.0.1", port)) == 0
# 启动flask
def start(self):
"""启动 Flask 子进程(兼容打包后的 exe 和源码运行)"""
with self._lock:
if self.process is not None:
LogManager.warning("子进程正在运行中!")
raise RuntimeError("子进程已在运行中!")
env = os.environ.copy()
env["FLASK_COMM_PORT"] = str(self.comm_port)
# —— 解析打包 exe 的稳健写法 ——
exe_path = Path(sys.executable).resolve()
if exe_path.name.lower() in ("python.exe", "pythonw.exe"):
# Nuitka 某些场景里 sys.executable 可能指向 dist\python.exe并不存在
exe_path = Path(sys.argv[0]).resolve()
is_frozen = exe_path.suffix.lower() == ".exe" and exe_path.exists()
if is_frozen:
# 打包后的 exe用当前 exe 自举
cmd = [str(exe_path), "--role=flask"]
cwd = str(exe_path.parent)
else:
# 源码运行:模块方式更稳
cmd = [sys.executable, "-m", "Module.Main", "--role=flask"]
cwd = str(Path(__file__).resolve().parent) # Module 目录
LogManager.info(f"[DEBUG] spawn: {cmd} (cwd={cwd}) exists(exe)={os.path.exists(cmd[0])}")
print(f"[DEBUG] spawn: {cmd} (cwd={cwd}) exists(exe)={os.path.exists(cmd[0])}")
self.process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding="utf-8",
errors="replace", # 新增:遇到非 UTF-8 字节用 <20> 代替,避免崩溃
bufsize=1,
env=env,
cwd=cwd,
)
LogManager.info(f"Flask子进程启动 (PID: {self.process.pid}, 端口: {self.comm_port})")
print(f"Flask子进程启动 (PID: {self.process.pid}, 端口: {self.comm_port})")
def print_output(stream, stream_name):
while True:
line = stream.readline()
if not line:
break
print(f"{stream_name}: {line.strip()}")
threading.Thread(target=print_output, args=(self.process.stdout, "STDOUT"), daemon=True).start()
threading.Thread(target=print_output, args=(self.process.stderr, "STDERR"), daemon=True).start()
def send(self, data: Union[str, Dict, List]) -> bool:
"""通过Socket发送数据"""
def _port_alive(self) -> bool:
try:
if not isinstance(data, str):
data = json.dumps(data)
# 等待子进程启动并准备好
time.sleep(1) # 延时1秒根据实际情况调整
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(('127.0.0.1', self.comm_port))
s.sendall((data + "\n").encode('utf-8'))
return True
except ConnectionRefusedError:
LogManager.error(f"连接被拒绝,确保子进程在端口 {self.comm_port} 上监听")
print(f"连接被拒绝,确保子进程在端口 {self.comm_port} 上监听")
return False
except Exception as e:
LogManager.error(f"发送失败: {e}")
print(f"发送失败: {e}")
with socket.create_connection(("127.0.0.1", self.comm_port), timeout=0.5):
return True
except Exception:
return False
def stop(self):
with self._lock:
if self.process and self.process.poll() is None:
print(f"[INFO] Stopping Flask child process (PID: {self.process.pid})...")
LogManager.info(f"[INFO] Stopping Flask child process (PID: {self.process.pid})...")
self.process.terminate()
self.process.wait()
print("[INFO] Flask child process stopped.")
LogManager.info("[INFO] Flask child process stopped.")
self._stop_event.set()
else:
LogManager.info("[INFO] No Flask child process to stop.")
print("[INFO] No Flask child process to stop.")
def _wait_port_open(self, timeout: float) -> bool:
t0 = time.time()
while time.time() - t0 < timeout:
if self._port_alive():
return True
time.sleep(0.2)
return False
def _is_alive(self) -> bool:
return self.process is not None and self.process.poll() is None and self._port_alive()
@classmethod
def get_instance(cls) -> 'FlaskSubprocessManager':