Files
iOSAI/Module/FlaskService.py

774 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import json
import os
import socket
import threading
import time
from pathlib import Path
from queue import Queue
from typing import Any, Dict
from Entity import Variables
from Utils.AiUtils import AiUtils
from Utils.IOSAIStorage import IOSAIStorage
from Utils.LogManager import LogManager
import wda
from flask import Flask, request
from flask_cors import CORS
from Entity.ResultData import ResultData
from Utils.ControlUtils import ControlUtils
from Utils.ThreadManager import ThreadManager
from script.ScriptManager import ScriptManager
from Entity.Variables import addModelToAnchorList, wdaFunctionPort
import Entity.Variables as ev
from Utils.JsonUtils import JsonUtils
app = Flask(__name__)
CORS(app)
app.config['JSON_AS_ASCII'] = False # Flask jsonify 不转义中文/emoji
app.config['JSONIFY_MIMETYPE'] = "application/json; charset=utf-8"
# ============ 设备状态内存表 ============
listData = []
listLock = threading.Lock()
# 历史遗留不再消费队列改为socket线程直接落地
dataQueue = Queue()
# ---- 黏性快照(避免瞬时空) ----
_last_nonempty_snapshot = []
_last_snapshot_ts = 0.0
_STICKY_TTL_SEC = 10.0 # 在瞬时空时回退到上一份非空快照10秒
_empty_logged = False
_recovered_logged = False
# ===== 设备集合变化跟踪 =====
change_version = 0
_device_ids_snapshot = set()
_last_device_count = 0
def _log_device_changes(action: str):
"""记录设备集合增删变化"""
global _device_ids_snapshot, change_version
curr_ids = {d.get("deviceId") for d in listData if _is_online(d)}
added = curr_ids - _device_ids_snapshot
removed = _device_ids_snapshot - curr_ids
if added or removed:
change_version += 1
try:
LogManager.info(f"[DEVICE][CHANGED][{action}] rev={change_version} count={len(curr_ids)} added={list(added)} removed={list(removed)}")
except Exception:
print(f"[DEVICE][CHANGED][{action}] rev={change_version} count={len(curr_ids)} added={list(added)} removed={list(removed)}")
_device_ids_snapshot = curr_ids
def _normalize_type(v) -> int:
"""把各种表示在线/离线的值,规范成 1/0"""
if isinstance(v, bool):
return 1 if v else 0
if isinstance(v, (int, float)):
return 1 if int(v) == 1 else 0
if isinstance(v, str):
s = v.strip().lower()
if s.isdigit():
return 1 if int(s) == 1 else 0
if s in ("true", "online", "on", "yes"):
return 1
return 0
return 1 if v else 0
def _is_online(d: Dict[str, Any]) -> bool:
return _normalize_type(d.get("type", 1)) == 1
def _apply_device_event(obj: Dict[str, Any]):
"""把单条设备上线/下线事件落到 listData并打印关键日志"""
try:
dev_id = obj.get("deviceId")
typ = _normalize_type(obj.get("type", 1))
obj["type"] = typ # 写回规范后的值,避免后续被误判
if dev_id is None:
LogManager.warning(f"[DEVICE][WARN] missing deviceId in obj={obj}")
return
with listLock:
before = len(listData)
# 删除同 udid 旧记录
listData[:] = [d for d in listData if d.get("deviceId") != dev_id]
if typ == 1:
listData.append(obj) # 上线
LogManager.info(f"[DEVICE][UPSERT] id={dev_id} type={typ} size={len(listData)} (replaced={before - (len(listData)-1)})")
_log_device_changes("UPSERT")
else:
LogManager.warning(f"[DEVICE][REMOVE] id={dev_id} type={typ} size={len(listData)} (removed_prev={before - len(listData)})")
_log_device_changes("REMOVE")
except Exception as e:
LogManager.error(f"[DEVICE][APPLY_EVT][ERROR] {e}")
# ============ 设备事件 socket 监听 ============
def _handle_conn(conn: socket.socket, addr):
"""统一的连接处理函数(外部全局,避免内嵌函数覆盖)"""
try:
with conn:
buffer = ""
while True:
data = conn.recv(1024)
if not data: # 对端关闭
break
buffer += data.decode('utf-8', errors='ignore')
# 按行切 JSON发送端每条以 '\n' 结尾
while True:
line, sep, buffer = buffer.partition('\n')
if not sep:
break
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except json.JSONDecodeError as e:
LogManager.warning(f"[SOCKET][WARN] 非法 JSON 丢弃: {line[:120]} err={e}")
continue
dev_id = obj.get("deviceId")
typ = _normalize_type(obj.get("type", 1))
obj["type"] = typ
LogManager.info(f"[SOCKET][RECV] deviceId={dev_id} type={typ} keys={list(obj.keys())}")
_apply_device_event(obj)
LogManager.info(f"[SOCKET][APPLY] deviceId={dev_id} type={typ}")
except Exception as e:
LogManager.error(f"[SOCKET][ERROR] 连接处理异常: {e}")
def start_socket_listener():
"""启动设备事件监听(与 HTTP 端口无关,走 FLASK_COMM_PORT"""
# 统一使用 FLASK_COMM_PORT默认 34566
port = int(os.getenv('FLASK_COMM_PORT', 34566))
LogManager.info(f"Received port from environment: {port}")
print(f"Received port from environment: {port}")
if port <= 0:
LogManager.info("未获取到通信端口跳过Socket监听")
print("未获取到通信端口跳过Socket监听")
return
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
s.bind(('127.0.0.1', port))
print(f"[INFO] Socket successfully bound to port {port}")
LogManager.info(f"[INFO] Socket successfully bound to port {port}")
except Exception as bind_error:
print(f"[ERROR]端口绑定失败: {bind_error}")
LogManager.info(f"[ERROR]端口绑定失败: {bind_error}")
return
s.listen()
LogManager.info(f"[INFO] Socket listener started on port {port}, waiting for connections...")
print(f"[INFO] Socket listener started on port {port}, waiting for connections...")
while True:
try:
conn, addr = s.accept()
except Exception as e:
LogManager.error(f"[ERROR] accept 失败: {e}")
continue
threading.Thread(target=_handle_conn, args=(conn, addr), daemon=True).start()
# 独立线程启动 Socket 服务 + 看门狗
listener_thread = threading.Thread(target=start_socket_listener, daemon=True)
listener_thread.start()
# ============ API 路由 ============
@app.route('/deviceList', methods=['GET'])
def deviceList():
global _last_device_count, change_version
global _last_nonempty_snapshot, _last_snapshot_ts, _STICKY_TTL_SEC
global _empty_logged, _recovered_logged
try:
with listLock:
# 宽容判定在线(字符串'1'/'true'/True 都算)
data = [d for d in listData if _is_online(d)]
now = time.time()
# 记录最近一次非空快照
if data:
_last_nonempty_snapshot = data.copy()
_last_snapshot_ts = now
if _recovered_logged:
LogManager.info(f"[API][deviceList][RECOVERED] count={len(data)} rev={change_version}")
_recovered_logged = False
_empty_logged = False
else:
# 瞬时空:在 TTL 内回退上一份非空快照
if _last_nonempty_snapshot and (now - _last_snapshot_ts) <= _STICKY_TTL_SEC:
LogManager.warning(f"[API][deviceList][STICKY] serving last non-empty snapshot count={len(_last_nonempty_snapshot)} age={now - _last_snapshot_ts:.1f}s rev={change_version}")
return ResultData(data=_last_nonempty_snapshot).toJson()
if not _empty_logged:
LogManager.error(f"[API][deviceList][DROP_TO_EMPTY] last_count={_last_device_count} rev={change_version}")
_empty_logged = True
_recovered_logged = True
_last_device_count = len(data)
LogManager.info(f"[API][deviceList] return_count={len(data)} rev={change_version}")
return ResultData(data=data).toJson()
except Exception as e:
LogManager.error(f"[API][deviceList] error={e}")
return ResultData(data=[]).toJson()
@app.route('/passToken', methods=['POST'])
def passToken():
data = request.get_json()
print(data)
return ResultData(data="").toJson()
# 获取设备应用列表
@app.route('/deviceAppList', methods=['POST'])
def deviceAppList():
param = request.get_json()
udid = param["udid"]
apps = ControlUtils.getDeviceAppList(udid)
return ResultData(data=apps).toJson()
# 打开指定app
@app.route('/launchApp', methods=['POST'])
def launchApp():
body = request.get_json()
udid = body.get("udid")
bundleId = body.get("bundleId")
t = wda.USBClient(udid, wdaFunctionPort)
t.session().app_start(bundleId)
return ResultData(data="").toJson()
# 回到首页
@app.route('/toHome', methods=['POST'])
def toHome():
body = request.get_json()
udid = body.get("udid")
client = wda.USBClient(udid, wdaFunctionPort)
client.home()
return ResultData(data="").toJson()
# 点击事件
@app.route('/tapAction', methods=['POST'])
def tapAction():
body = request.get_json()
udid = body.get("udid")
client = wda.USBClient(udid, wdaFunctionPort)
print("-----------------------")
print(client)
print("-----------------------")
session = client.session()
session.appium_settings({"snapshotMaxDepth": 0})
x = body.get("x")
y = body.get("y")
session.tap(x, y)
return ResultData(data="").toJson()
# 拖拽事件
@app.route('/swipeAction', methods=['POST'])
def swipeAction():
body = request.get_json()
udid = body.get("udid")
duration = body.get("duration") # 时长
sx = body.get("sx") # 起始X点
sy = body.get("sy") # 起始Y点
ex = body.get("ex") # 结束X点
ey = body.get("ey") # 结束Y点
client = wda.USBClient(udid, wdaFunctionPort)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 0})
session.swipe(sx, sy, ex, ey, duration)
return ResultData(data="").toJson()
# 长按事件
@app.route('/longPressAction', methods=['POST'])
def longPressAction():
body = request.get_json()
udid = body.get("udid")
x = body.get("x")
y = body.get("y")
client = wda.USBClient(udid, wdaFunctionPort)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 5})
session.tap_hold(x, y, 1.0)
return ResultData(data="").toJson()
# 养号
@app.route('/growAccount', methods=['POST'])
def growAccount():
body = request.get_json()
udid = body.get("udid")
Variables.commentList = body.get("comment")
isComment = body.get("isComment")
manager = ScriptManager()
event = threading.Event()
# 启动脚本
thread = threading.Thread(target=manager.growAccount, args=(udid, isComment, event,))
# 添加到线程管理
code, msg = ThreadManager.add(udid, thread, event)
return ResultData(data="", code=code, message=msg).toJson()
# 观看直播
@app.route("/watchLiveForGrowth", methods=['POST'])
def watchLiveForGrowth():
body = request.get_json()
udid = body.get("udid")
manager = ScriptManager()
event = threading.Event()
thread = threading.Thread(target=manager.watchLiveForGrowth, args=(udid, event))
# 添加到线程管理
ThreadManager.add(udid, thread, event)
return ResultData(data="").toJson()
# 停止脚本
@app.route("/stopScript", methods=['POST'])
def stopScript():
body = request.get_json()
udid = body.get("udid")
LogManager.method_info(f"接口收到 /stopScript udid={udid}", method="task")
code, msg = ThreadManager.stop(udid)
return ResultData(code=code, data=[], message=msg).toJson()
# 关注打招呼
@app.route('/passAnchorData', methods=['POST'])
def passAnchorData():
try:
LogManager.method_info("关注打招呼", "关注打招呼")
data: Dict[str, Any] = request.get_json()
# 设备列表
idList = data.get("deviceList", [])
# 主播列表
acList = data.get("anchorList", [])
Variables.commentList = data.get("comment")
isComment = data.get("isComment")
LogManager.info(f"[INFO] 获取数据: {idList} {acList}")
AiUtils.save_aclist_flat_append(acList)
# 是否需要回复
needReply = data.get("needReply", False)
# 获取打招呼数据
ev.prologueList = data.get("prologueList", [])
# 添加主播数据
addModelToAnchorList(acList)
# 启动线程,执行脚本
for udid in idList:
manager = ScriptManager()
event = threading.Event()
# 启动脚本
thread = threading.Thread(target=manager.safe_greetNewFollowers,
args=(udid, needReply, isComment, event,))
# 添加到线程管理
ThreadManager.add(udid, thread, event)
return ResultData(data="").toJson()
except Exception as e:
LogManager.error(e)
return ResultData(data="", code=1001).toJson()
@app.route('/followAndGreetUnion', methods=['POST'])
def followAndGreetUnion():
try:
LogManager.method_info("关注打招呼", "关注打招呼(联盟号)")
data: Dict[str, Any] = request.get_json()
# 设备列表
idList = data.get("deviceList", [])
# 主播列表
acList = data.get("anchorList", [])
LogManager.info(f"[INFO] 获取数据: {idList} {acList}")
AiUtils.save_aclist_flat_append(acList)
# 是否需要回复
needReply = data.get("needReply", True)
# 获取打招呼数据
ev.prologueList = data.get("prologueList", [])
# 添加主播数据
addModelToAnchorList(acList)
# 启动线程,执行脚本
for udid in idList:
manager = ScriptManager()
event = threading.Event()
# 启动脚本
thread = threading.Thread(target=manager.safe_followAndGreetUnion,
args=(udid, needReply, event))
# 添加到线程管理
ThreadManager.add(udid, thread, event)
return ResultData(data="").toJson()
except Exception as e:
LogManager.error(e)
return ResultData(data="", code=1001).toJson()
# 获取私信数据
@app.route("/getPrologueList", methods=['GET'])
def getPrologueList():
import Entity.Variables as Variables
return ResultData(data=Variables.prologueList).toJson()
# 添加临时数据
# 批量追加主播到 JSON 文件
@app.route("/addTempAnchorData", methods=['POST'])
def addTempAnchorData():
"""
请求体支持:
- 单个对象:{"anchorId": "xxx", "country": "CN"}
- 对象数组:[{"anchorId": "xxx", "country": "CN"}, {"anchorId": "yyy", "country": "US"}]
"""
data = request.get_json()
if not data:
return ResultData(code=400, message="请求数据为空").toJson()
# 追加到 JSON 文件
AiUtils.save_aclist_flat_append(data, "data/acList.json")
return ResultData(data="ok").toJson()
# 获取当前屏幕上的聊天信息
@app.route("/getChatTextInfo", methods=['POST'])
def getChatTextInfo():
data = request.get_json()
udid = data.get("udid")
client = wda.USBClient(udid,wdaFunctionPort)
session = client.session()
xml = session.source()
try:
result = AiUtils.extract_messages_from_xml(xml)
last_in = None
last_out = None
for item in reversed(result): # 从后往前找
if item.get('type') != 'msg':
continue
if last_in is None and item['dir'] == 'in':
last_in = item['text']
if last_out is None and item['dir'] == 'out':
last_out = item['text']
if last_in is not None and last_out is not None:
break
print(f"检测出对方的最后一条数据:{last_in},{type(last_in)}")
print(f"检测出我的最后一条数据:{last_out},{type(last_out)}")
return ResultData(data=result).toJson()
except Exception as e:
LogManager.error(f"获取屏幕翻译出现错误:{e}", "获取屏幕翻译")
data = [
{
'type': 'massage',
'dir': 'in',
'text': '当前页面无法获取聊天记录请在tiktok聊天页面进行获取'
},
{
'type': 'massage',
'dir': 'in',
'text': 'Unable to retrieve chat messages on the current screen. Please navigate to the TikTok chat page and try again!!!'
}
]
return ResultData(data=data, message="解析失败").toJson()
# 监控消息
@app.route("/replyMessages", methods=['POST'])
def monitorMessages():
LogManager.method_info("开始监控消息,监控消息脚本启动", "监控消息")
body = request.get_json()
udid = body.get("udid")
# Variables.commentList = body.get("comment")
manager = ScriptManager()
event = threading.Event()
thread = threading.Thread(target=manager.replyMessages, args=(udid, event))
LogManager.method_info("创建监控消息脚本线程成功", "监控消息")
# 添加到线程管理
ThreadManager.add(udid, thread, event)
return ResultData(data="").toJson()
# 上传日志
@app.route("/setLoginInfo", methods=['POST'])
def upLoadLogLogs():
data = request.get_json() # 解析 JSON
token = data.get("token")
userId = data.get("userId")
tenantId = data.get("tenantId")
ok = LogManager.upload_all_logs("http://47.79.98.113:8101/api/log/upload", token, userId, tenantId)
if ok:
return ResultData(data="日志上传成功").toJson()
else:
return ResultData(data="", message="日志上传失败").toJson()
# 获取当前的主播列表数据
@app.route("/anchorList", methods=['POST'])
def queryAnchorList():
# 项目根目录(当前文件在 infos 下,回退两层到根目录)
root_dir = Path(__file__).resolve().parent.parent
file_path = root_dir / "data" / "acList.json"
data = []
if file_path.exists():
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
except Exception as e:
LogManager.error(f"[anchorList] 读取失败: {e}")
data = []
return ResultData(data=data).toJson()
# 修改当前的主播列表数据
@app.route("/updateAnchorList", methods=['POST'])
def updateAnchorList():
"""
invitationType: 1 普票 2 金票
state: 1 通行(True) / 0 不通行(False)
"""
data = request.get_json(force=True, silent=True) or {}
invitationType = data.get("invitationType")
state = bool(data.get("state")) # 转成布尔
# 要更新成的值
new_status = 1 if state else 0
# 用工具类解析路径,避免 cwd 影响
file_path = AiUtils._resolve_path("data/acList.json")
# 加载 JSON
try:
doc = json.loads(file_path.read_text(encoding="utf-8-sig"))
except Exception as e:
LogManager.error(f"[updateAnchorList] 读取失败: {e}")
return ResultData(code=1001, message=f"暂无数据").toJson()
# 定位 anchorList
if isinstance(doc, list):
acList = doc
wrapper = None
elif isinstance(doc, dict) and isinstance(doc.get("anchorList"), list):
acList = doc["anchorList"]
wrapper = doc
else:
return ResultData(code=500, message="文件格式不合法").toJson()
# 遍历并更新
updated = 0
for item in acList:
if isinstance(item, dict) and item.get("invitationType") == invitationType:
item["state"] = new_status
updated += 1
# 写回(保持原始结构)
try:
file_path.parent.mkdir(parents=True, exist_ok=True)
to_write = wrapper if wrapper is not None else acList
file_path.write_text(json.dumps(to_write, ensure_ascii=False, indent=2), encoding="utf-8")
except Exception as e:
LogManager.error(f"[updateAnchorList] 写入失败: {e}")
return ResultData(code=500, message=f"写入失败: {e}").toJson()
if updated:
return ResultData(data=updated, message=f"已更新 {updated} 条记录").toJson()
else:
return ResultData(data=0, message="未找到符合条件的记录").toJson()
# 删除主播
@app.route("/deleteAnchorWithIds", methods=['POST'])
def deleteAnchorWithIds():
ls: list[dict] = request.get_json() # [{"anchorId": "xxx"}, ...]
ids = [d.get("anchorId") for d in ls if d.get("anchorId")]
deleted = AiUtils.delete_anchors_by_ids(ids)
return ResultData(data={"deleted": deleted}).toJson()
# 配置ai人设
@app.route("/aiConfig", methods=['POST'])
def aiConfig():
data = request.get_json()
agentName = data.get("agentName")
guildName = data.get("guildName")
contactTool = data.get("contactTool")
contact = data.get("contact")
age = data.get("age")
sex = data.get("sex")
height = data.get("height")
weight = data.get("weight")
body_features = data.get("body_features")
nationality = data.get("nationality")
personality = data.get("personality")
strengths = data.get("strengths")
dict = {
"agentName": agentName,
"guildName": guildName,
"contactTool": contactTool,
"contact": contact,
"age": age,
"sex": sex,
"height": height,
"weight": weight,
"body_features": body_features,
"nationality": nationality,
"personality": personality,
"strengths": strengths,
"api-key": "app-sdRfZy2by9Kq7uJg7JdOSVr8"
}
# JsonUtils.write_json("aiConfig", dict)
IOSAIStorage.overwrite(dict, "aiConfig.json")
return ResultData(data="").toJson()
# 查询主播聊天发送的最后一条信息
@app.route("/select_last_message", methods=['GET'])
def select_last_message():
data = JsonUtils.query_all_json_items()
return ResultData(data=data).toJson()
# 修改消息(已读改成未读)
@app.route("/update_last_message", methods=['POST'])
def update_last_message():
data = request.get_json() # 解析 JSON
sender = data.get("sender")
udid = data.get("device")
text = data.get("text")
updated_count = JsonUtils.update_json_items(
match={"sender": sender, "text": text}, # 匹配条件
patch={"status": 1}, # 修改内容
filename="log/last_message.json", # 要修改的文件
multi=True # 只改第一条匹配的
)
if updated_count > 0:
return ResultData(data=updated_count, message="修改成功").toJson()
return ResultData(data=updated_count, message="修改失败").toJson()
# 删除已读消息
@app.route("/delete_last_message", methods=['POST'])
def delete_last_message():
data = request.get_json() # 解析 JSON
sender = data.get("sender")
udid = data.get("device")
text = data.get("text")
updated_count = JsonUtils.delete_json_items(
match={"sender": sender, "text": text}, # 匹配条件
filename="log/last_message.json", # 要修改的文件
multi=True # 只改第一条匹配的
)
if updated_count > 0:
return ResultData(data=updated_count, message="修改成功").toJson()
return ResultData(data=updated_count, message="修改失败").toJson()
# 停止所有任务
@app.route("/stopAllTask", methods=['POST'])
def stopAllTask():
idList = request.get_json()
code, msg = ThreadManager.batch_stop(idList)
time.sleep(2)
return ResultData(code, [], msg).toJson()
# 切换账号
@app.route('/changeAccount', methods=['POST'])
def changeAccount():
body = request.get_json()
udid = body.get("udid")
if not udid:
return ResultData(data="", code=400, message="缺少 udid").toJson()
manager = ScriptManager()
threading.Event()
# 启动脚本
code, msg = manager.changeAccount(udid)
# thread = threading.Thread(target=, args=(udid,))
# # 添加到线程管理
# thread.start()
return ResultData(data="", code=code, message=msg).toJson()
# 查看设备网络状态
@app.route('/getDeviceNetStatus', methods=['POST'])
def getDeviceNetStatus():
body = request.get_json()
udid = body.get("udid")
client = wda.USBClient(udid, wdaFunctionPort)
r = client.getNetWorkStatus()
value = r.get("value")
return ResultData(data=value, code=200).toJson()
@app.route('/test', methods=['POST'])
def test():
import wda
import cv2
import numpy as np
# 设备的UDID
udid = "00008110-000120603C13801E"
# 连接到设备
client = wda.USBClient(udid)
session = client.session()
# 设置Appium的截图深度
session.appium_settings({"snapshotMaxDepth": 15})
# 获取当前屏幕截图
screenshot = session.screenshot()
screenshot = cv2.imdecode(np.frombuffer(screenshot, np.uint8), cv2.IMREAD_COLOR)
# 读取大图和小图
large_image = screenshot # 这里使用截图作为大图
template = cv2.imread(r'E:\python\Scrcpy_test\open-cv-tk\insert_comment.png', 0) # 0 表示以灰度模式读取
# 检查图像是否成功加载
if template is None:
print("小图加载失败,请检查路径")
exit()
# 获取模板的宽度和高度
w, h = template.shape[::-1]
# 使用模板匹配方法
result = cv2.matchTemplate(large_image, template, cv2.TM_CCOEFF_NORMED)
# 设定阈值
threshold = 0.8
loc = np.where(result >= threshold)
# 遍历所有匹配点
if loc[0].size > 0: # 检查是否有匹配点
for pt in zip(*loc[::-1]): # 将坐标转换为 (x, y) 格式
cv2.rectangle(large_image, pt, (pt[0] + w, pt[1] + h), (0, 255, 0), 2)
print(f"找到匹配区域,坐标:{pt},尺寸:{(w, h)}")
else:
print("未找到匹配区域,请检查模板和大图的内容,或调整阈值")
# 保存结果
cv2.imwrite('matched_result.png', large_image)
# 显示结果
cv2.imshow('Matched Result', large_image)
cv2.waitKey(0)
cv2.destroyAllWindows()
# 关闭会话
session.close()
# 获取ai配置
@app.route("/getAiConfig", methods=['GET'])
def getAiConfig():
data = IOSAIStorage.load("aiConfig.json")
return ResultData(data=data).toJson()
# 重新开启tiktok
@app.route("/restartTikTok", methods=['POST'])
def restartTikTok():
json = request.get_json()
udid = json.get("udid")
client = wda.USBClient(udid, wdaFunctionPort)
session = client.session()
ControlUtils.closeTikTok(session, udid)
time.sleep(1)
ControlUtils.openTikTok(session, udid)
return ResultData(data="").toJson()
if __name__ == '__main__':
# 注意:这里建议 debug=False避免未来有人改成 use_reloader=True 导致多进程
app.run("0.0.0.0", port=5000, debug=False, use_reloader=False, threaded=True)