Files
iOSAI/Flask/FlaskService.py
2025-08-14 14:30:36 +08:00

253 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
import socket
import threading
import warnings
from queue import Queue
from typing import Any, Dict
from Utils.AiUtils import AiUtils
from Utils.Requester import Requester
import tidevice
import wda
from flask import Flask, request
from flask_cors import CORS
from Entity.ResultData import ResultData
from Utils.ControlUtils import ControlUtils
from Utils.ThreadManager import ThreadManager
from script.ScriptManager import ScriptManager
from Entity.Variables import accountToken
from Entity.Variables import anchorList, addModelToAnchorList
app = Flask(__name__)
CORS(app)
listData = []
dataQueue = Queue()
def start_socket_listener():
port = int(os.getenv('FLASK_COMM_PORT', 0))
print(f"Received port from environment: {port}")
if port <= 0:
print("⚠️ 未获取到通信端口跳过Socket监听")
return
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
# 设置端口复用,避免端口被占用时无法绑定
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 尝试绑定端口
try:
s.bind(('127.0.0.1', port))
print(f"[INFO] Socket successfully bound to port {port}")
except Exception as bind_error:
print(f"[ERROR] ❌ 端口绑定失败: {bind_error}")
return
# 开始监听
s.listen()
print(f"[INFO] Socket listener started on port {port}, waiting for connections...")
while True:
try:
print(f"[INFO] Waiting for a new connection on port {port}...")
conn, addr = s.accept()
print(f"[INFO] Connection accepted from: {addr}")
raw_data = conn.recv(1024).decode('utf-8').strip()
print(f"[INFO] Raw data received: {raw_data}")
data = json.loads(raw_data)
print(f"[INFO] Parsed data: {data}")
dataQueue.put(data)
except Exception as conn_error:
print(f"[ERROR] ❌ 连接处理失败: {conn_error}")
except Exception as e:
print(f"[ERROR] ❌ Socket服务启动失败: {e}")
# 在独立线程中启动Socket服务
listener_thread = threading.Thread(target=start_socket_listener, daemon=True)
listener_thread.start()
@app.route('/passToken', methods=['POST'])
def passToken():
data = request.get_json()
accountToken = data['token']
print(accountToken)
Requester.requestComments()
return ResultData(data="").toJson()
# 获取设备列表
@app.route('/deviceList', methods=['GET'])
def deviceList():
while not dataQueue.empty():
obj = dataQueue.get()
type = obj["type"]
if type == 1:
listData.append(obj)
else:
for data in listData:
if data.get("deviceId") == obj.get("deviceId") and data.get("screenPort") == obj.get("screenPort"):
listData.remove(data)
return ResultData(data=listData).toJson()
# 获取设备应用列表
@app.route('/deviceAppList', methods=['POST'])
def deviceAppList():
param = request.get_json()
udid = param["udid"]
apps = ControlUtils.getDeviceAppList(udid)
return ResultData(data=apps).toJson()
# 打开指定app
@app.route('/launchApp', methods=['POST'])
def launchApp():
body = request.get_json()
udid = body.get("udid")
bundleId = body.get("bundleId")
t = tidevice.Device(udid)
t.app_start(bundleId)
return ResultData(data="").toJson()
# 回到首页
@app.route('/toHome', methods=['POST'])
def toHome():
body = request.get_json()
udid = body.get("udid")
client = wda.USBClient(udid)
client.home()
return ResultData(data="").toJson()
# 点击事件
@app.route('/tapAction', methods=['POST'])
def tapAction():
body = request.get_json()
udid = body.get("udid")
client = wda.USBClient(udid)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 0})
x = body.get("x")
y = body.get("y")
session.tap(x, y)
return ResultData(data="").toJson()
# 拖拽事件
@app.route('/swipeAction', methods=['POST'])
def swipeAction():
body = request.get_json()
udid = body.get("udid")
direction = body.get("direction")
client = wda.USBClient(udid)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 0})
if direction == 1:
session.swipe_up()
elif direction == 2:
session.swipe_left()
elif direction == 3:
session.swipe_down()
else:
session.swipe_right()
return ResultData(data="").toJson()
# 长按事件
@app.route('/longPressAction', methods=['POST'])
def longPressAction():
body = request.get_json()
udid = body.get("udid")
x = body.get("x")
y = body.get("y")
client = wda.USBClient(udid)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 5})
session.tap_hold(x,y,1.0)
return ResultData(data="").toJson()
# 养号
@app.route('/growAccount', methods=['POST'])
def growAccount():
body = request.get_json()
udid = body.get("udid")
manager = ScriptManager()
event = threading.Event()
# 启动脚本
thread = threading.Thread(target=manager.growAccount, args=(udid,event))
thread.start()
# 添加到线程管理
ThreadManager.add(udid, thread, event)
return ResultData(data="").toJson()
# 观看直播
@app.route("/watchLiveForGrowth", methods=['POST'])
def watchLiveForGrowth():
body = request.get_json()
udid = body.get("udid")
manager = ScriptManager()
event = threading.Event()
thread = threading.Thread(target=manager.watchLiveForGrowth, args=(udid, event))
thread.start()
# 添加到线程管理
ThreadManager.add(udid, thread, event)
return ResultData(data="").toJson()
# 停止脚本
@app.route("/stopScript", methods=['POST'])
def stopScript():
body = request.get_json()
udid = body.get("udid")
ThreadManager.stop(udid)
return ResultData(data="").toJson()
# 传递主播数据
@app.route('/passAnchorData', methods=['POST'])
def passAnchorData():
data: Dict[str, Any] = request.get_json()
# 设备列表
idList = data.get("deviceList", [])
# 主播列表
acList = data.get("anchorList", [])
# 是否需要回复
needReply = data.get("needReply", False)
# 添加主播数据
addModelToAnchorList(acList)
# 启动线程,执行脚本
for udid in idList:
manager = ScriptManager()
event = threading.Event()
# 启动脚本
thread = threading.Thread(target=manager.greetNewFollowers, args=(udid, needReply, event))
thread.start()
# 添加到线程管理
ThreadManager.add(udid, thread, event)
return ResultData(data="").toJson()
# 添加临时数据
@app.route("/addTempAnchorData", methods=['POST'])
def addTempAnchorData():
data = request.get_json()
addModelToAnchorList(data)
return ResultData(data="").toJson()
# 获取当前屏幕上的聊天信息
@app.route("/getChatTextInfo", methods=['POST'])
def getChatTextInfo():
data = request.get_json()
udid = data.get("udid")
client = wda.USBClient(udid)
session = client.session()
xml = session.source()
result = AiUtils.extract_messages_from_xml(xml)
return ResultData(data=result).toJson()
if __name__ == '__main__':
app.run("0.0.0.0", port=5000, debug=True, use_reloader=False)