合并代码后提交

This commit is contained in:
zw
2025-08-13 20:20:13 +08:00
parent cd61c2138a
commit 33610b27b0
6 changed files with 418 additions and 67 deletions

View File

@@ -1,10 +1,18 @@
# 设备模型
class DeviceModel(object):
def __init__(self, deviceId, screenPort, type):
def __init__(self, deviceId, screenPort, width, height, scale, type):
super(DeviceModel, self).__init__()
# 设备id
self.deviceId = deviceId
# 投屏端口
self.screenPort = screenPort
# 屏幕宽度
self.width = width
# 屏幕高度
self.height = height
# 物理分辨率和实际分辨率的倍数
self.scale = scale
# 1 添加 2删除
self.type = type
@@ -14,5 +22,8 @@ class DeviceModel(object):
return {
'deviceId': self.deviceId,
'screenPort': self.screenPort,
"width": self.width,
"height": self.height,
"scale": self.scale,
'type': self.type
}

View File

@@ -16,8 +16,8 @@ accountToken = "xHtil6YiAH2QxDgAYVwCfVafx7xkOoeHVfiVgfqfdwe88KZW5jbRsjDS9ZGFILJS
prologueList = []
# 评论列表
commentsList = []
# 存储主播名和session_id的字典
anchorWithSession = {}
# 安全删除数据
def removeModelFromAnchorList(model: AnchorModel):

View File

@@ -30,15 +30,11 @@ class Deviceinfo(object):
lists = Usbmux().device_list()
# 添加设备逻辑
for device in lists:
print(device)
if device not in self.deviceArray:
self.screenProxy += 1
self.connectDevice(device.udid)
self.deviceArray.append(device)
# 创建模型
model = DeviceModel(device.udid,self.screenProxy,type=1)
self.deviceModelList.append(model)
# 发送数据
self.manager.send(model.toDict())
# 处理拔出设备的逻辑
def removeDevice():
@@ -75,6 +71,17 @@ class Deviceinfo(object):
try:
d = wda.USBClient(identifier, 8100)
LogManager.info("启动wda成功", identifier)
size = d.window_size()
width = size.width
height = size.height
scale = d.scale
# 创建模型
model = DeviceModel(identifier, self.screenProxy, width, height, scale, type=1)
self.deviceModelList.append(model)
# 发送数据
self.manager.send(model.toDict())
except Exception as e:
LogManager.error("启动wda失败。请检查wda是否正常", identifier)
return
@@ -82,7 +89,7 @@ class Deviceinfo(object):
d.app_start(WdaAppBundleId)
d.home()
time.sleep(2)
target = self.relayDeviceScreenPort()
target = self.relayDeviceScreenPort(identifier)
self.pidList.append({
"target": target,
"id": identifier
@@ -91,9 +98,9 @@ class Deviceinfo(object):
# 转发设备端口
def relayDeviceScreenPort(self):
def relayDeviceScreenPort(self, udid):
try:
command = f"iproxy.exe {self.screenProxy} 9100"
command = f"iproxy.exe -u {udid} {self.screenProxy} 9100"
# 创建一个没有窗口的进程
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW

View File

@@ -6,6 +6,7 @@ import numpy as np
import wda
from Utils.LogManager import LogManager
import xml.etree.ElementTree as ET
from lxml import etree
from wda import Client
# 工具类
@@ -280,3 +281,146 @@ class AiUtils(object):
def getUnReadMsgCount(cls, session: Client):
btn = cls.getMsgBoxButton(session)
return cls.findNumber(btn.label)
# 获取聊天页面的聊天信息
@classmethod
def extract_messages_from_xml(cls, xml: str):
"""
输入 WDA 的页面 XML输出按时间顺序的消息列表
每项形如:
{'type': 'time', 'text': '昨天 下午8:48'}
{'type': 'msg', 'dir': 'in'|'out', 'text': 'hello'}
"""
root = etree.fromstring(xml.encode("utf-8"))
items = []
# 屏幕宽度(用于右对齐判断)
app = root.xpath('/XCUIElementTypeApplication')
screen_w = cls.parse_float(app[0], 'width', 414.0) if app else 414.0
# 1) 时间分隔
for t in root.xpath('//XCUIElementTypeStaticText[contains(@traits, "Header")]'):
txt = t.get('label') or t.get('name') or t.get('value') or ''
y = cls.parse_float(t, 'y')
if txt.strip():
items.append({'type': 'time', 'text': txt.strip(), 'y': y})
# 2) 消息气泡
msg_nodes = root.xpath(
'//XCUIElementTypeTable//XCUIElementTypeCell'
'//XCUIElementTypeOther[@name or @label]'
)
EXCLUDES = {'Heart', 'Lol', 'ThumbsUp', '分享发布内容', '视频贴纸标签页', '双击发送表情'}
for o in msg_nodes:
text = (o.get('label') or o.get('name') or '').strip()
if not text or text in EXCLUDES:
continue
# 拿到所在的 Cell用来找头像按钮
cell = o.getparent()
while cell is not None and cell.get('type') != 'XCUIElementTypeCell':
cell = cell.getparent()
x = cls.parse_float(o, 'x')
y = cls.parse_float(o, 'y')
w = cls.parse_float(o, 'width')
right_edge = x + w
direction = None
# 2.1 依据同 Cell 内“图片头像”的位置判定(优先,最稳)
if cell is not None:
avatar_btns = cell.xpath('.//XCUIElementTypeButton[@name="图片头像" or @label="图片头像"]')
if avatar_btns:
ax = cls.parse_float(avatar_btns[0], 'x')
# 头像在左侧 → 对方;头像在右侧 → 自己
if ax < screen_w / 2:
direction = 'in'
else:
direction = 'out'
# 2.2 退化规则:看是否右对齐
if direction is None:
# 离右边 <= 20px 视为右对齐(自己发的)
if right_edge > screen_w - 20:
direction = 'out'
else:
direction = 'in'
items.append({'type': 'msg', 'dir': direction, 'text': text, 'y': y})
# 3) 按 y 排序并清理
items.sort(key=lambda i: i['y'])
for it in items:
it.pop('y', None)
return items
@classmethod
def parse_float(cls, el, attr, default=0.0):
try:
return float(el.get(attr, default))
except Exception:
return default
# 从导航栏读取主播名称。找不到时返回空字符串
@classmethod
def get_navbar_anchor_name(cls, session, timeout: float = 2.0) -> str:
"""只从导航栏读取主播名称。找不到时返回空字符串。"""
# 可选:限制快照深度,提升解析速度/稳定性
try:
session.appium_settings({"snapshotMaxDepth": 22})
except Exception:
pass
def _text_of(el) -> str:
info = getattr(el, "info", {}) or {}
return (info.get("label") or info.get("name") or info.get("value") or "").strip()
def _clean_tail(s: str) -> str:
return re.sub(r"[,、,。.\s]+$", "", s).strip()
# 导航栏容器:从“返回”按钮向上找最近祖先,且该祖先内包含“更多/举报”(多语言兜底)
NAV_CONTAINER = (
"//XCUIElementTypeButton"
"[@name='返回' or @label='返回' or @name='Back' or @label='Back' or @name='戻る' or @label='戻る']"
"/ancestor::XCUIElementTypeOther"
"[ .//XCUIElementTypeButton"
" [@name='更多' or @label='更多' or @name='More' or @label='More' or "
" @name='その他' or @label='その他' or @name='詳細' or @label='詳細' or "
" @name='举报' or @label='举报' or @name='Report' or @label='Report' or "
" @name='報告' or @label='報告']"
"][1]"
)
# ① 优先:可访问的 Other自身有文本且不含子 Button更贴近 TikTok 的实现
XPATH_TITLE_OTHER = (
NAV_CONTAINER +
"//XCUIElementTypeOther[@accessible='true' and count(.//XCUIElementTypeButton)=0 "
" and (string-length(@name)>0 or string-length(@label)>0 or string-length(@value)>0)"
"][1]"
)
# ② 退路:第一个 StaticText
XPATH_TITLE_STATIC = (
NAV_CONTAINER +
"//XCUIElementTypeStaticText[string-length(@value)>0 or string-length(@label)>0 or string-length(@name)>0][1]"
)
# 尝试 ①
q = session.xpath(XPATH_TITLE_OTHER)
if q.wait(timeout):
t = _clean_tail(_text_of(q.get()))
if t:
return t
# 尝试 ②
q2 = session.xpath(XPATH_TITLE_STATIC)
if q2.wait(1.0):
t = _clean_tail(_text_of(q2.get()))
if t:
return t
return ""
# AiUtils.getCurrentScreenSource()

Binary file not shown.

Before

Width:  |  Height:  |  Size: 945 KiB

After

Width:  |  Height:  |  Size: 899 KiB

View File

@@ -1,4 +1,5 @@
import random
import re
import threading
import time
from enum import Enum
@@ -7,7 +8,7 @@ import os
from Utils.AiUtils import AiUtils
from Utils.ControlUtils import ControlUtils
from Utils.LogManager import LogManager
from Entity.Variables import anchorList, removeModelFromAnchorList, prologueList
from Entity.Variables import anchorList, removeModelFromAnchorList, prologueList, anchorWithSession
from Utils.Requester import Requester
@@ -199,24 +200,12 @@ class ScriptManager():
# 返回上一步
def goBack(count):
# 如需回复消息
if needReply:
count += 1
for i in range(count):
ControlUtils.clickBack(session)
time.sleep(2)
# 循环条件。1、 循环关闭 2、 数据处理完毕
while not event.is_set() or len(anchorList) > 0:
if needReply:
print("如果需要回复主播消息。走此逻辑")
if AiUtils.getUnReadMsgCount(session) > 0:
# 此处调用另外一个方法处理
# 设置查找深度
session.appium_settings({"snapshotMaxDepth": 15})
# 点击搜索按钮
ControlUtils.clickSearch(session)
# 查找输入框
input = session.xpath('//XCUIElementTypeSearchField')
@@ -237,15 +226,18 @@ class ScriptManager():
# 切换UI查找深度
session.appium_settings({"snapshotMaxDepth": 25})
# 定位 "关注" 按钮 通过关注按钮的位置点击主播首页
follow_button = session.xpath('//XCUIElementTypeOther[@name="zhang1231511"]//XCUIElementTypeButton[@enabled="true" and @visible="true"]')
follow_button = session.xpath("//XCUIElementTypeButton[@traits='Button' and @index='1']")
time.sleep(2)
if follow_button.exists:
print(follow_button.bounds)
# session.appium_settings({"snapshotMaxDepth": 10})
print("找到关注按钮!")
x = follow_button.bounds.x - 200
x = follow_button.bounds.x - 100
y = follow_button.bounds.y
print(x, y)
client.click(x, y)
print("进入主播首页啦")
else:
goBack(1)
removeModelFromAnchorList(anchor)
@@ -253,98 +245,295 @@ class ScriptManager():
continue
time.sleep(3)
session.appium_settings({"snapshotMaxDepth": 25})
time.sleep(2)
# 找到并点击第一个视频
cellClickResult = ControlUtils.clickFirstVideoFromDetailPage(session)
time.sleep(2)
# 观看主播视频
def viewAnchorVideo():
print("开始查看视频")
count = 2
print("开始查看视频,并且重新调整查询深度")
session.appium_settings({"snapshotMaxDepth": 5})
count = 3
while count != 0:
time.sleep(5)
print("条件满足,继续查看")
img = client.screenshot()
time.sleep(1)
filePath = f"resources/{udid}/bgv.png"
img.save(filePath)
LogManager.info("保存屏幕图像成功", udid)
print("保存了背景图")
time.sleep(2)
# 查找add图标
addX, addY = AiUtils.findImageInScreen("add", udid)
if addX != -1:
r = ControlUtils.clickLike(session, udid)
# 点赞成功。总数量减1否则就滑到下一个视频。原因太多。不好判断。不如直接下一个视频
if r == True:
count -= 1
else:
client.swipe_up()
else:
print("没找有效视频")
client.swipe_up()
break
r = ControlUtils.clickLike(session, udid)
# 点赞成功。
if r == True:
count -= 1
# 假装看几秒视频
time.sleep(5)
client.swipe_up()
if count == 0:
ControlUtils.clickBack(session)
break
# 右滑返回
client.swipe_right()
# 如果打开视频失败。说明该主播没有视频
if cellClickResult == True:
print("点击了视频")
session.appium_settings({"snapshotMaxDepth": 15})
time.sleep(2)
print("重新设置了匹配深度")
# 观看主播视频
LogManager.info("去查看主播视频",udid)
viewAnchorVideo()
time.sleep(2)
print("视频看完了")
time.sleep(3)
LogManager.info("视频看完了,重置试图查询深度", udid)
session.appium_settings({"snapshotMaxDepth": 25})
# 点击关注按钮
followButton = AiUtils.getFollowButton(session)
if followButton.exists:
if followButton is not None:
LogManager.info("找到关注按钮了", udid)
followButton.click()
else:
goBack(2)
return
LogManager.info("没找到关注按钮", udid)
removeModelFromAnchorList(anchor)
goBack(3)
continue
time.sleep(2)
AiUtils.getSendMesageButton(session)
msgButton = AiUtils.getSendMesageButton(session)
time.sleep(2)
if msgButton is not None:
print("找到发消息按钮了")
# 进入聊天页面
msgButton.click()
else:
print("没有识别出发消息按钮")
removeModelFromAnchorList(anchor)
goBack(3)
continue
time.sleep(3)
# 查找聊天界面中的输入框节点
chatInput = session.xpath("//*[className='XCUIElementTypeTextView']")
chatInput = session.xpath("//TextView")
if chatInput.exists:
print("找到")
print("找到输入框了, 准备发送一条打招呼消息")
# 准备打招呼的文案
# text = random.choice(prologueList)
text = "你好"
# 翻译成主播国家的语言
msg = Requester.translation(text, "法国")
# 准备发送一条信息
chatInput.click()
time.sleep(2)
text = random.choice(prologueList)
msg = Requester.translation(text, anchorCountry)
# 发送消息
chatInput.set_text(msg + "\n")
time.sleep(1)
else:
print("无法发送信息,换个主播")
# 接着下一个主播
goBack(3)
print("无法发送信息")
# 接着下一个主播
removeModelFromAnchorList(anchor)
goBack(4)
else:
print("没找到主播的第一个视频")
print(f"{anchor.anchorId}:该主播没有视频")
# 删除当前数据
removeModelFromAnchorList(anchor)
goBack(2)
goBack(3)
continue
# 设置查找深度
session.appium_settings({"snapshotMaxDepth": 15})
time.sleep(2)
if needReply:
print("如果需要回复主播消息。走此逻辑")
if AiUtils.getUnReadMsgCount(session) > 0:
# 执行回复消息逻辑
self.monitorMessages(session, udid)
homeButton = AiUtils.findHomeButton(udid)
if homeButton.exists:
homeButton.click()
else:
ControlUtils.closeTikTok(session, udid)
time.sleep(2)
ControlUtils.openTikTok(session, udid)
time.sleep(3)
# 执行完成之后。继续点击搜索
session.appium_settings({"snapshotMaxDepth": 15})
# 点击搜索按钮
ControlUtils.clickSearch(session)
else:
session.appium_settings({"snapshotMaxDepth": 15})
# 点击搜索按钮
ControlUtils.clickSearch(session)
# 检查未读消息并回复
def monitorMessages(self, session, udid):
ControlUtils.closeTikTok(session, udid)
time.sleep(2)
ControlUtils.closeTikTok(session, udid)
time.sleep(3)
session.appium_settings({"snapshotMaxDepth": 7})
el = session(xpath='//XCUIElementTypeButton[@name="a11y_vo_inbox"]')
# 如果收件箱有消息 则进行点击
if el.exists:
m = re.search(r'(\d+)', el.label) # 抓到的第一个数字串
count = int(m.group(1)) if m else 0
if count:
el.click()
time.sleep(3)
session.appium_settings({"snapshotMaxDepth": 22})
while True:
el = session(xpath='//XCUIElementTypeButton[@name="a11y_vo_inbox"]')
m = re.search(r'(\d+)', el.label) # 抓到的第一个数字串
count = int(m.group(1)) if m else 0
print("count", count)
if not count:
break
# 新粉丝
xp_new_fan_badge = (
"//XCUIElementTypeCell[.//XCUIElementTypeLink[@name='新粉丝']]"
"//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
)
# 活动
xp_activity_badge = (
"//XCUIElementTypeCell[.//XCUIElementTypeLink[@name='活动']]"
"//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
)
# 系统通知
xp_system_badge = (
"//XCUIElementTypeCell[.//XCUIElementTypeLink[@name='系统通知']]"
"//XCUIElementTypeStaticText[string-length(@value)>0 and translate(@value,'0123456789','')='']"
)
# 用户消息
xp_badge_numeric = (
'//XCUIElementTypeOther[@name="AWEIMChatListCellUnreadCountViewComponent"]'
'//XCUIElementTypeStaticText[@value and translate(@value,"0123456789","")=""]'
)
try:
# 如果 2 秒内找不到,会抛异常
badge_text = session.xpath(xp_new_fan_badge).get(timeout=2.0)
val = (badge_text.info.get("value") or
badge_text.info.get("label") or
badge_text.info.get("name"))
print("新粉丝未读数量:", val)
if badge_text:
badge_text.tap()
time.sleep(1)
ControlUtils.clickBack(session)
time.sleep(1)
except Exception:
print("当前屏幕没有找到 新粉丝 未读徽标数字")
badge_text = None
try:
# 如果 2 秒内找不到,会抛异常
badge_text = session.xpath(xp_activity_badge).get(timeout=2.0)
val = (badge_text.info.get("value") or
badge_text.info.get("label") or
badge_text.info.get("name"))
print("活动未读数量:", val)
if badge_text:
badge_text.tap()
time.sleep(1)
ControlUtils.clickBack(session)
time.sleep(1)
except Exception:
print("当前屏幕没有找到 活动 未读徽标数字")
badge_text = None
try:
# 如果 2 秒内找不到,会抛异常
badge_text = session.xpath(xp_system_badge).get(timeout=2.0)
val = (badge_text.info.get("value") or
badge_text.info.get("label") or
badge_text.info.get("name"))
print("系统通知未读数量:", val)
if badge_text:
badge_text.tap()
time.sleep(1)
ControlUtils.clickBack(session)
time.sleep(1)
except Exception:
print("当前屏幕没有找到 系统通知 未读徽标数字")
badge_text = None
try:
# 如果 2 秒内找不到,会抛异常
badge_text = session.xpath(xp_badge_numeric).get(timeout=2.0)
val = (badge_text.info.get("value") or
badge_text.info.get("label") or
badge_text.info.get("name"))
print("用户未读数量:", val)
if badge_text:
badge_text.tap()
time.sleep(3)
xml = session.source()
msgs = AiUtils.extract_messages_from_xml(xml)
# 检测出对方发的最后一条信息
last_msg_text = next(item['text'] for item in reversed(msgs) if item['type'] == 'msg')
# 向ai发送信息
# 获取主播的名称
anchor_name = AiUtils.get_navbar_anchor_name(session)
# 找到输入框
sel = session.xpath(
"//XCUIElementTypeTextView[@name='消息...' or @label='消息...' or @value='消息...']")
if anchor_name not in anchorWithSession:
# 如果是第一次发消息(没有sessionId的情况)
response = Requester.chatToAi({"msg": last_msg_text})
aiResult = response['result']
sessionId = response['session_id']
anchorWithSession[anchor_name] = sessionId
# 找到输入框输入ai返回出来的消息
if sel.exists:
sel.click() # 聚焦
time.sleep(1)
sel.clear_text()
sel.set_text(aiResult + "\n")
else:
# 如果不是第一次发消息证明存储的有sessionId
sessionId = anchorWithSession[anchor_name]
response = Requester.chatToAi({"msg": last_msg_text, "sid": sessionId})
aiResult = response['result']
if sel.exists:
sel.click() # 聚焦
time.sleep(1)
sel.clear_text()
sel.set_text(aiResult + "\n")
time.sleep(1)
# 返回
ControlUtils.clickBack(session)
except Exception:
print("当前屏幕没有找到 用户 未读徽标数字")
badge_text = None
def test(self, udid):
client = wda.USBClient(udid)
session = client.session()
session.appium_settings({"snapshotMaxDepth": 10})
print(client.source())
# manager = ScriptManager()
# manager.test("eca000fcb6f55d7ed9b4c524055214c26a7de7aa")
# manager.growAccount("eca000fcb6f55d7ed9b4c524055214c26a7de7aa")