import json import os import socket import threading import time from pathlib import Path from queue import Queue from typing import Any, Dict from Entity.AnchorModel import AnchorModel from Utils.AiUtils import AiUtils from Utils.LogManager import LogManager import tidevice import wda from flask import Flask, request from flask_cors import CORS from Entity.ResultData import ResultData from Utils.ControlUtils import ControlUtils from Utils.ThreadManager import ThreadManager from script.ScriptManager import ScriptManager from Entity.Variables import anchorList, prologueList, addModelToAnchorList, removeModelFromAnchorList import Entity.Variables as ev from Utils.JsonUtils import JsonUtils app = Flask(__name__) CORS(app) app.config['JSON_AS_ASCII'] = False # Flask jsonify 不转义中文/emoji app.config['JSONIFY_MIMETYPE'] = "application/json; charset=utf-8" listData = [] listLock = threading.Lock() dataQueue = Queue() def start_socket_listener(): port = int(os.getenv('FLASK_COMM_PORT', 0)) LogManager.info(f"Received port from environment: {port}") print(f"Received port from environment: {port}") if port <= 0: LogManager.info("未获取到通信端口,跳过Socket监听") print("未获取到通信端口,跳过Socket监听") return try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: # 设置端口复用,避免端口被占用时无法绑定 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 尝试绑定端口 try: s.bind(('127.0.0.1', port)) print(f"[INFO] Socket successfully bound to port {port}") LogManager.info(f"[INFO] Socket successfully bound to port {port}") except Exception as bind_error: print(f"[ERROR]端口绑定失败: {bind_error}") LogManager.info(f"[ERROR]端口绑定失败: {bind_error}") return # 开始监听 s.listen() LogManager.info(f"[INFO] Socket listener started on port {port}, waiting for connections...") print(f"[INFO] Socket listener started on port {port}, waiting for connections...") while True: try: conn, addr = s.accept() except Exception as e: LogManager.error(f"[ERROR] accept 失败: {e}") continue # 独立线程处理单条连接,避免单客户端异常拖垮监听线程 threading.Thread(target=_handle_conn, args=(conn, addr), daemon=True).start() # while True: # try: # LogManager.info(f"[INFO] Waiting for a new connection on port {port}...") # print(f"[INFO] Waiting for a new connection on port {port}...") # conn, addr = s.accept() # LogManager.info(f"[INFO] Connection accepted from: {addr}") # print(f"[INFO] Connection accepted from: {addr}") # # raw_data = conn.recv(1024).decode('utf-8').strip() # LogManager.info(f"[INFO] Raw data received: {raw_data}") # print(f"[INFO] Raw data received: {raw_data}") # # data = json.loads(raw_data) # LogManager.info(f"[INFO] Parsed data: {data}") # print(f"[INFO] Parsed data: {data}") # dataQueue.put(data) # except Exception as conn_error: # LogManager.error(f"[ERROR]连接处理失败: {conn_error}") # print(f"[ERROR]连接处理失败: {conn_error}") except Exception as e: LogManager.error(f"[ERROR]Socket服务启动失败: {e}") print(f"[ERROR]Socket服务启动失败: {e}") def _handle_conn(conn: socket.socket, addr): try: with conn: # 1. 循环收包直到拿到完整 JSON buffer = "" while True: data = conn.recv(1024) if not data: # 对端关闭 break buffer += data.decode('utf-8', errors='ignore') # 2. 尝试切出完整 JSON(简单按行,也可按长度头、分隔符) while True: line, sep, buffer = buffer.partition('\n') if not sep: # 没找到完整行 break line = line.strip() if not line: # 空行跳过 continue try: obj = json.loads(line) except json.JSONDecodeError as e: LogManager.warning(f"[WARN] 非法 JSON 丢弃: {line[:100]} {e}") continue # 3. 收到合法数据,塞进队列 dataQueue.put(obj) LogManager.info(f"[INFO] 收到合法消息: {obj}") except Exception as e: LogManager.error(f"[ERROR] 连接处理异常: {e}") # 在独立线程中启动Socket服务 listener_thread = threading.Thread(target=start_socket_listener, daemon=True) listener_thread.start() # 获取设备列表 @app.route('/deviceList', methods=['GET']) def deviceList(): try: with listLock: # 1. 消费完队列 while not dataQueue.empty(): obj = dataQueue.get() if obj["type"] == 1: # 上线:先踢掉同 deviceId 的旧记录(端口可能变) listData[:] = [d for d in listData if d.get("deviceId") != obj.get("deviceId")] listData.append(obj) else: # 下线:只要同 deviceId 就删,不管端口 listData[:] = [d for d in listData if d.get("deviceId") != obj.get("deviceId")] # 2. 兜底:只保留在线 listData[:] = [d for d in listData if d.get('type') == 1] return ResultData(data=listData.copy()).toJson() except Exception as e: LogManager.error("获取设备列表失败:", e) return ResultData(data=[]).toJson() # 获取设备应用列表 @app.route('/deviceAppList', methods=['POST']) def deviceAppList(): param = request.get_json() udid = param["udid"] apps = ControlUtils.getDeviceAppList(udid) return ResultData(data=apps).toJson() # 打开指定app @app.route('/launchApp', methods=['POST']) def launchApp(): body = request.get_json() udid = body.get("udid") bundleId = body.get("bundleId") t = tidevice.Device(udid) t.app_start(bundleId) return ResultData(data="").toJson() # 回到首页 @app.route('/toHome', methods=['POST']) def toHome(): body = request.get_json() udid = body.get("udid") client = wda.USBClient(udid) client.home() return ResultData(data="").toJson() # 点击事件 @app.route('/tapAction', methods=['POST']) def tapAction(): body = request.get_json() udid = body.get("udid") client = wda.USBClient(udid) session = client.session() session.appium_settings({"snapshotMaxDepth": 0}) x = body.get("x") y = body.get("y") session.tap(x, y) return ResultData(data="").toJson() # 拖拽事件 @app.route('/swipeAction', methods=['POST']) def swipeAction(): body = request.get_json() udid = body.get("udid") direction = body.get("direction") client = wda.USBClient(udid) session = client.session() session.appium_settings({"snapshotMaxDepth": 0}) if direction == 1: session.swipe_up() elif direction == 2: session.swipe_left() elif direction == 3: session.swipe_down() else: session.swipe_right() return ResultData(data="").toJson() # 长按事件 @app.route('/longPressAction', methods=['POST']) def longPressAction(): body = request.get_json() udid = body.get("udid") x = body.get("x") y = body.get("y") client = wda.USBClient(udid) session = client.session() session.appium_settings({"snapshotMaxDepth": 5}) session.tap_hold(x, y, 1.0) return ResultData(data="").toJson() # 养号 @app.route('/growAccount', methods=['POST']) def growAccount(): body = request.get_json() udid = body.get("udid") manager = ScriptManager() event = threading.Event() # 启动脚本 thread = threading.Thread(target=manager.growAccount, args=(udid, event)) thread.start() # 添加到线程管理 ThreadManager.add(udid, thread, event) return ResultData(data="").toJson() # 观看直播 @app.route("/watchLiveForGrowth", methods=['POST']) def watchLiveForGrowth(): body = request.get_json() udid = body.get("udid") manager = ScriptManager() event = threading.Event() thread = threading.Thread(target=manager.watchLiveForGrowth, args=(udid, event)) thread.start() # 添加到线程管理 ThreadManager.add(udid, thread, event) return ResultData(data="").toJson() # 停止脚本 @app.route("/stopScript", methods=['POST']) def stopScript(): body = request.get_json() udid = body.get("udid") LogManager.method_info(f"接口收到 /stopScript udid={udid}", method="task") code, msg = ThreadManager.stop(udid) return ResultData(code=code, data="", massage=msg).toJson() # 关注打招呼 @app.route('/passAnchorData', methods=['POST']) def passAnchorData(): try: LogManager.method_info("关注打招呼", "关注打招呼") data: Dict[str, Any] = request.get_json() # 设备列表 idList = data.get("deviceList", []) # 主播列表 acList = data.get("anchorList", []) LogManager.info(f"[INFO] 获取数据: {idList} {acList}") AiUtils.save_aclist_flat_append(acList) # 是否需要回复 needReply = data.get("needReply", True) # 获取打招呼数据 ev.prologueList = data.get("prologueList", []) # 添加主播数据 addModelToAnchorList(acList) # 启动线程,执行脚本 for udid in idList: manager = ScriptManager() event = threading.Event() # 启动脚本 thread = threading.Thread(target=manager.safe_greetNewFollowers, args=(udid, needReply, event)) thread.start() # 添加到线程管理 ThreadManager.add(udid, thread, event) return ResultData(data="").toJson() except Exception as e: LogManager.error(e) # 获取私信数据 @app.route("/getPrologueList", methods=['GET']) def getPrologueList(): import Entity.Variables as Variables return ResultData(data=Variables.prologueList).toJson() # 添加临时数据 # 批量追加主播到 JSON 文件 @app.route("/addTempAnchorData", methods=['POST']) def addTempAnchorData(): """ 请求体支持: - 单个对象:{"anchorId": "xxx", "country": "CN"} - 对象数组:[{"anchorId": "xxx", "country": "CN"}, {"anchorId": "yyy", "country": "US"}] """ data = request.get_json() if not data: return ResultData(code=400, massage="请求数据为空").toJson() # 追加到 JSON 文件 AiUtils.save_aclist_flat_append(data, "log/acList.json") return ResultData(data="ok").toJson() # 获取当前屏幕上的聊天信息 @app.route("/getChatTextInfo", methods=['POST']) def getChatTextInfo(): data = request.get_json() udid = data.get("udid") client = wda.USBClient(udid) session = client.session() xml = session.source() try: result = AiUtils.extract_messages_from_xml(xml) print(result) return ResultData(data=result).toJson() except Exception as e: LogManager.error(f"获取屏幕翻译出现错误:{e}", "获取屏幕翻译") data = [ { 'type': 'massage', 'dir': 'in', 'text': '当前页面无法获取聊天记录,请在tiktok聊天页面进行获取!!!' }, { 'type': 'massage', 'dir': 'in', 'text': 'Unable to retrieve chat messages on the current screen. Please navigate to the TikTok chat page and try again!!!' } ] return ResultData(data=data, massage="解析失败").toJson() # 监控消息 @app.route("/replyMessages", methods=['POST']) def monitorMessages(): body = request.get_json() udid = body.get("udid") manager = ScriptManager() event = threading.Event() thread = threading.Thread(target=manager.replyMessages, args=(udid, event)) thread.start() # 添加到线程管理 ThreadManager.add(udid, thread, event) return ResultData(data="").toJson() @app.route("/setLoginInfo", methods=['POST']) def upLoadLogLogs(): data = request.get_json() # 解析 JSON token = data.get("token") userId = data.get("userId") tenantId = data.get("tenantId") ok = LogManager.upload_all_logs("http://47.79.98.113:8101/api/log/upload", token, userId, tenantId) if ok: return ResultData(data="日志上传成功").toJson() else: return ResultData(data="", massage="日志上传失败").toJson() # 获取当前的主播列表数据 @app.route("/anchorList", methods=['POST']) def queryAnchorList(): file_path = "log/acList.json" data = [] if Path(file_path).exists(): try: with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) except Exception as e: LogManager.error(f"[anchorList] 读取失败: {e}") data = [] return ResultData(data=data).toJson() # 修改当前的主播列表数据 @app.route("/updateAnchorList", methods=['POST']) def updateAnchorList(): """ invitationType: 1 普票 2 金票 state: 1 通行(True) / 0 不通行(False) """ data = request.get_json(force=True, silent=True) or {} invitationType = data.get("invitationType") state = bool(data.get("state")) # 转成布尔 # 要更新成的值 new_status = 1 if state else 0 # 用工具类解析路径,避免 cwd 影响 file_path = AiUtils._resolve_path("Module/log/acList.json") # 加载 JSON try: doc = json.loads(file_path.read_text(encoding="utf-8-sig")) except Exception as e: LogManager.error(f"[updateAnchorList] 读取失败: {e}") return ResultData(code=500, massage=f"读取失败: {e}").toJson() # 定位 anchorList if isinstance(doc, list): acList = doc wrapper = None elif isinstance(doc, dict) and isinstance(doc.get("anchorList"), list): acList = doc["anchorList"] wrapper = doc else: return ResultData(code=500, massage="文件格式不合法").toJson() # 遍历并更新 updated = 0 for item in acList: if isinstance(item, dict) and item.get("invitationType") == invitationType: item["status"] = new_status updated += 1 # 写回(保持原始结构) try: file_path.parent.mkdir(parents=True, exist_ok=True) to_write = wrapper if wrapper is not None else acList file_path.write_text(json.dumps(to_write, ensure_ascii=False, indent=2), encoding="utf-8") except Exception as e: LogManager.error(f"[updateAnchorList] 写入失败: {e}") return ResultData(code=500, massage=f"写入失败: {e}").toJson() if updated: return ResultData(data=updated, massage=f"已更新 {updated} 条记录").toJson() else: return ResultData(data=0, massage="未找到符合条件的记录").toJson() # 删除主播 @app.route("/deleteAnchorWithIds", methods=['POST']) def deleteAnchorWithIds(): ls: list[dict] = request.get_json() # [{"anchorId": "xxx"}, ...] ids = [d.get("anchorId") for d in ls if d.get("anchorId")] deleted = AiUtils.delete_anchors_by_ids(ids) return ResultData(data={"deleted": deleted}).toJson() @app.route("/aiConfig", methods=['POST']) def aiConfig(): data = request.get_json() agentName = data.get("agentName") guildName = data.get("guildName") contactTool = data.get("contactTool") contact = data.get("contact") dict = { "agentName": agentName, "guildName": guildName, "contactTool": contactTool, "contact": contact } JsonUtils.write_json("aiConfig", dict) return ResultData(data="").toJson() # 查询主播聊天发送的最后一条信息 @app.route("/select_last_message", methods=['GET']) def select_last_message(): data = JsonUtils.query_all_json_items() return ResultData(data=data).toJson() @app.route("/update_last_message", methods=['POST']) def update_last_message(): data = request.get_json() # 解析 JSON sender = data.get("sender") udid = data.get("device") text = data.get("text") updated_count = JsonUtils.update_json_items( match={"sender": sender, "text": text}, # 匹配条件 patch={"status": 1}, # 修改内容 filename="log/last_message.json", # 要修改的文件 multi=False # 只改第一条匹配的 ) if updated_count > 0: return ResultData(data=updated_count, massage="修改成功").toJson() return ResultData(data=updated_count, massage="修改失败").toJson() @app.route("/delete_last_message", methods=['POST']) def delete_last_message(): data = request.get_json() # 解析 JSON sender = data.get("sender") udid = data.get("device") text = data.get("text") updated_count = JsonUtils.delete_json_items( match={"sender": sender, "text": text}, # 匹配条件 filename="log/last_message.json", # 要修改的文件 multi=False # 只改第一条匹配的 ) if updated_count > 0: return ResultData(data=updated_count, massage="修改成功").toJson() return ResultData(data=updated_count, massage="修改失败").toJson() # @app.route("/killWda", methods=['POST']) # def killWda(): # data = request.get_json() # 解析 JSON # udid = data.get("device") # print(udid) # # # AiUtils.kill_wda(udid) # time.sleep(10) # AiUtils.launch_wda(udid) # # return ResultData(data="", msg="WDA重新启动").toJson() if __name__ == '__main__': app.run("0.0.0.0", port=5000, debug=True, use_reloader=False)