274 lines
11 KiB
Python
274 lines
11 KiB
Python
import base64
|
||
import time
|
||
|
||
import requests
|
||
from DrissionPage import ChromiumPage
|
||
from DrissionPage import ChromiumOptions
|
||
from PIL import Image,ImageDraw
|
||
|
||
import random
|
||
from DrissionPage.common import Actions
|
||
|
||
|
||
|
||
class batch_check_anchor:
|
||
|
||
def __init__(self,username,password, callback):
|
||
# 在初始化时进行一系列的自动化操作,登录->关闭窗口->点击邀约主播
|
||
start_time=time.time()
|
||
false=False
|
||
true=True
|
||
co = ChromiumOptions()
|
||
# co.incognito()
|
||
# 设置不加载图片、静音
|
||
co.mute(True)
|
||
co.headless(False)
|
||
co.auto_port(True)
|
||
self.page = ChromiumPage(co)
|
||
self.page.set.window.max()
|
||
#self.page.set.cookies.clear()
|
||
self.page.get('https://live-backstage.tiktok.com/portal/overview')
|
||
|
||
|
||
|
||
|
||
loginele=self.page.ele("@data-id=login")
|
||
|
||
cookie = self.page.ele("全部允许")
|
||
if cookie:
|
||
|
||
p = self.page.ele('@user-config-ele-id=tiktok-cookie-banner-config').shadow_root
|
||
b = p.eles('tag:button')
|
||
if b:
|
||
b[-1].click()
|
||
|
||
if loginele:
|
||
#
|
||
# self.page.set.window.show()
|
||
# 点击登录按钮
|
||
self.page.ele("@data-id=login").click()
|
||
time.sleep(random.random())
|
||
# 输入账号
|
||
self.page.ele("@class=semi-input semi-input-default").input(username,clear=True)
|
||
time.sleep(random.random())
|
||
# 输入密码
|
||
self.page.ele("@class=semi-input semi-input-default semi-input-sibling-modebtn").input(password,clear=True)
|
||
# 点击进行登录
|
||
time.sleep(random.random())
|
||
self.page.ele("@data-id=login-primary-button").click()
|
||
|
||
|
||
|
||
|
||
while True:
|
||
try:
|
||
loginele=self.page.ele("@data-id=login",timeout=1)
|
||
if not loginele:
|
||
print('后台登录成功')
|
||
# self.page.set.window.hide()
|
||
if callback:
|
||
callback()
|
||
break
|
||
time.sleep(0.2)
|
||
except:
|
||
pass
|
||
ele=self.page.ele('@class=semi-button semi-button-tertiary semi-button-size-small semi-button-borderless semi-modal-close semi-button-with-icon semi-button-with-icon-only')
|
||
|
||
print("3s 点击 关闭页面")
|
||
|
||
if ele:
|
||
ele.click()
|
||
|
||
page = self.page.ele("@data-id=guide-ok")
|
||
if page:
|
||
page.click()
|
||
|
||
invite_btn = self.page.ele("@class=semi-button semi-button-primary semi-button-with-icon",timeout=1)
|
||
|
||
if not invite_btn:
|
||
btn = self.page.ele("@data-id=know")
|
||
if btn:
|
||
btn.click()
|
||
check_btn = self.page.ele("@data-id=workplace-switch-button")
|
||
if check_btn:
|
||
check_btn.click()
|
||
while true:
|
||
time.sleep(0.2)
|
||
ele=self.page.ele('@data-id=add-host-btn')
|
||
try:
|
||
if ele:
|
||
ele.click()
|
||
inputele=self.page.eles('@class=semi-input-textarea semi-input-textarea-autosize')[1]
|
||
if inputele:
|
||
break
|
||
except:
|
||
ele=self.page.ele('@class=semi-button semi-button-tertiary semi-button-size-small semi-button-borderless semi-modal-close semi-button-with-icon semi-button-with-icon-only')
|
||
if ele:
|
||
ele.click()
|
||
|
||
end_time=time.time()
|
||
self.get_anchor("测试")
|
||
print('初始化完成 耗时:',end_time-start_time)
|
||
|
||
def get_ip(self):
|
||
# 提取代理API接口,获取1个代理IP
|
||
api_url = "https://dps.kdlapi.com/api/getdps/?secret_id=ohs2d9r6kqdtt2tau900&signature=gktsvou9sd0nq7o09q3ridg70aal97b1&num=1&pt=1&sep=1"
|
||
|
||
# 获取API接口返回的代理IP
|
||
proxy_ip = requests.get(api_url).text
|
||
|
||
# 用户名密码认证(私密代理/独享代理)
|
||
username = "d2400186192"
|
||
password = "5yv1r337"
|
||
proxies = {
|
||
"http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": proxy_ip},
|
||
"https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": proxy_ip}
|
||
}
|
||
return proxies
|
||
|
||
# 法国 德国
|
||
|
||
# 处理tiktok的验证码
|
||
def deal_identifying_code(self,b):
|
||
url = "http://api.jfbym.com/api/YmServer/customApi"
|
||
data = {
|
||
## 关于参数,一般来说有3个;不同类型id可能有不同的参数个数和参数名,找客服获取
|
||
"token": "w7hWcZPPOgQG2WTg4os5vD2WnYZ38g9bLWnmBxGUHY4",
|
||
"type": "30101",
|
||
"image": b,
|
||
}
|
||
_headers = {
|
||
"Content-Type": "application/json"
|
||
}
|
||
response = requests.request("POST", url, headers=_headers, json=data, proxies=self.get_ip()).json()
|
||
print(response)
|
||
return response['data']['data']
|
||
|
||
# text:传入邀约
|
||
def get_anchor(self,text):
|
||
start_time=time.time()
|
||
while True:
|
||
try:
|
||
inputele=self.page.eles('@class=semi-input-textarea semi-input-textarea-autosize')[1]
|
||
if inputele:
|
||
inputele.click()
|
||
inputele.input(text,clear=True)
|
||
print("点击了测试")
|
||
self.page.listen.start('https://live-backstage.tiktok.com/creators/live/union_platform_api/agency/union_invite/batch_check_anchor/')
|
||
|
||
|
||
print("监听url")
|
||
while True:
|
||
nextele=self.page.eles('@data-id=invite-host-next')[0]
|
||
if nextele:
|
||
time.sleep(0.5)
|
||
print("点击下一步")
|
||
nextele.click()
|
||
backele=self.page.ele('@data-id=invite-host-back')
|
||
if backele:
|
||
break
|
||
|
||
max_retries = 3 # 定义最大重试次数
|
||
failure_count = 0 # 初始化失败计数器
|
||
|
||
while True:
|
||
checkele=self.page.ele('@class=TUXButton-content')
|
||
|
||
if failure_count >= max_retries:
|
||
print("验证失败超过3次,终止流程")
|
||
break #
|
||
|
||
if checkele:
|
||
|
||
# self.page.set.window.show()
|
||
print(f"第{failure_count + 1}次尝试处理验证码")
|
||
# 发现验证码截图保存到本地
|
||
failure_count += 1
|
||
# 定位元素
|
||
element = self.page.ele(
|
||
"@class=cap-rounded-lg cap-cursor-pointer cap-w-full cap-h-auto")
|
||
# 截取该元素并保存
|
||
element.get_screenshot("tiktok_identifying_code.png")
|
||
|
||
# time.sleep(random.random())
|
||
with open('tiktok_identifying_code.png', 'rb') as f:
|
||
b = base64.b64encode(f.read()).decode() ## 图片二进制流base64字符串
|
||
|
||
# 处理验证码
|
||
result = self.deal_identifying_code(b)
|
||
print(result, type(result))
|
||
|
||
# 获取x,y的坐标
|
||
a = 1
|
||
for i in result.split('|'):
|
||
a += 1
|
||
x = int(int(i.split(',')[0]) * (348 / 552))
|
||
y = int(int(i.split(',')[1]) * (217 / 344))
|
||
ac = Actions(self.page)
|
||
|
||
ac.move_to(ele_or_loc=element, offset_x=0, offset_y=0).move(x, y).click()
|
||
# print(x,y)
|
||
# img = Image.open("tiktok_identifying_code.png")
|
||
# draw = ImageDraw.Draw(img)
|
||
# draw.ellipse([(x - 5, y - 5), (x + 5, y + 5)], fill='red') # 用红圈标记点击位置
|
||
# img.save(f"marked_identifying_code{a}.png")
|
||
self.page.ele(
|
||
"@class=TUXButton TUXButton--default TUXButton--medium TUXButton--primary cap-my-8 cap-w-full").click()
|
||
back_btn = self.page.ele("@data-id=invite-host-back")
|
||
if back_btn:
|
||
back_btn.click()
|
||
continue
|
||
# else:
|
||
|
||
# break
|
||
backele=self.page.ele('@data-id=invite-host-back')
|
||
if backele:
|
||
break
|
||
# time.sleep(0.2)
|
||
print('重复点击1')
|
||
# self.page.set.window.show()
|
||
res=self.page.listen.wait(1)
|
||
self.page.listen.stop()
|
||
while True:
|
||
backele=self.page.ele('@data-id=invite-host-back')
|
||
backele.click()
|
||
nextele=self.page.eles('@data-id=invite-host-next')[0]
|
||
if nextele:
|
||
break
|
||
# time.sleep(0.2)
|
||
print('重复点击2')
|
||
# self.page.set.window.show()
|
||
break
|
||
except Exception as e:
|
||
print('报错的原因',e)
|
||
|
||
self.page.refresh()
|
||
|
||
ele=self.page.ele('@class=semi-button semi-button-tertiary semi-button-size-small semi-button-borderless semi-modal-close semi-button-with-icon semi-button-with-icon-only')
|
||
if ele:
|
||
ele.click()
|
||
else:
|
||
check_btn = self.page.ele("@data-id=workplace-switch-button")
|
||
if check_btn:
|
||
check_btn.click()
|
||
|
||
|
||
while True:
|
||
ele=self.page.ele('@data-id=add-host-btn')
|
||
try:
|
||
if ele:
|
||
ele.click()
|
||
inputele=self.page.eles('@class=semi-input-textarea semi-input-textarea-autosize')[1]
|
||
if inputele:
|
||
break
|
||
except:
|
||
ele=self.page.ele('@class=semi-button semi-button-tertiary semi-button-size-small semi-button-borderless semi-modal-close semi-button-with-icon semi-button-with-icon-only',timeout=1)
|
||
if ele:
|
||
ele.click()
|
||
|
||
end_time=time.time()
|
||
print('耗时:',end_time-start_time)
|
||
return res.response.body
|
||
|
||
|