Files
ca_auto_table/spider/main.py

480 lines
18 KiB
Python
Raw Normal View History

2025-11-20 11:42:18 +08:00
from math import log
import random
from re import S
import time
from tkinter import N
from DrissionPage import Chromium
from loguru import logger
from work import get_random_canada_info
from mail_ import mail_
from bit_browser import bit_browser
2025-11-20 21:42:20 +08:00
from api import api
2025-11-20 11:42:18 +08:00
class Auto:
2025-11-20 21:42:20 +08:00
def __init__(self,http:str=None):
2025-11-20 11:42:18 +08:00
self.browser = Chromium(http)
self.tab = self.browser.latest_tab
pass
# cf打码
def solve_cloudflare(self):
tab = self.browser.latest_tab
2025-11-20 21:42:20 +08:00
for _ in range(5):
self.tab.wait(0.5)
2025-11-20 11:42:18 +08:00
try:
shadow1 = tab.ele(
'x://*[@name="cf-turnstile-response"]').parent().shadow_root
iframe = shadow1.get_frame(1)
if iframe:
logger.debug("找到Cloudflare iframe")
shadow2 = iframe.ele('x:/html/body').shadow_root
if shadow2:
logger.debug("找到Cloudflare iframe body shadow root")
2025-11-20 23:39:38 +08:00
status = shadow2.ele(
'x://span[text()="Verifying..."]', timeout=0.5)
if status:
tab.wait(3)
continue
2025-11-20 11:42:18 +08:00
status = shadow2.ele(
2025-11-20 21:42:20 +08:00
'x://span[text()="Success!"]', timeout=0.5)
2025-11-20 11:42:18 +08:00
if status:
logger.debug("Cloudflare验证成功")
return True
checkbox = shadow2.ele(
2025-11-20 21:42:20 +08:00
'x://input[@type="checkbox"]', timeout=0.5)
2025-11-20 11:42:18 +08:00
if checkbox:
checkbox.click()
logger.debug("点击Cloudflare复选框")
2025-11-20 21:42:20 +08:00
tab.wait(3)
2025-11-20 11:42:18 +08:00
logger.debug("重新获取状态")
# return False
except Exception as e:
# logger.error(f"处理Cloudflare异常: {e}")
logger.debug(f"cloudflare处理通过: {e}")
return True
tab.wait(1)
return False
# 打开URL
def open_url(self, url: str):
self.tab.get(url)
# 等待进入首页
def wait_home(self):
logger.debug("等待进入首页")
jc = 0
while True:
2025-11-20 21:42:20 +08:00
if jc > 3:
2025-11-20 11:42:18 +08:00
logger.error("等待进入首页超过5次未成功")
return False
self.tab.wait(1)
# 判断cf是否通过
bol = self.solve_cloudflare()
if not bol:
logger.debug("Cloudflare验证失败.")
continue
else:
logger.debug("Cloudflare验证成功.")
self.tab.wait(1.5)
2025-11-20 23:39:38 +08:00
bol = self.tab.ele('t:h1@text():Sorry, you have been blocked', timeout=1)
if bol:
logger.debug("ip被ban秒")
return False
2025-11-20 11:42:18 +08:00
html = self.tab.url
logger.debug(f"当前URL: {html}")
if 'https://veritaconnect.ca/canadianbreadsettlement/en-us' == html:
logger.debug("成功进入首页")
return True
jc += 1
# 点击continue按钮
def click_continue(self, bl: bool = False):
logger.debug("点击Continue按钮")
jc = 0
while True:
2025-11-20 21:42:20 +08:00
if jc > 3:
2025-11-20 11:42:18 +08:00
logger.error("点击Continue按钮超过5次未成功")
return False
try:
continue_button = self.tab.ele(
't:button@text():Continue', timeout=1)
if continue_button:
# 判断cf是否通过
bol = self.solve_cloudflare()
if not bol:
logger.debug("Cloudflare验证失败..")
continue
else:
logger.debug("Cloudflare验证成功..")
continue_button.click()
logger.debug("点击Continue按钮成功")
self.tab.wait(1.5)
2025-11-20 21:42:20 +08:00
bol = self.tab.ele('t:div@text():Loading...',timeout=1)
if bol:
logger.debug("Loading...")
if bl:
logger.debug("多次异常界面, 结束继续点击")
return False
logger.debug("异常界面")
self.tab.wait(1)
return self.click_continue(bl=True)
bol = self.tab.ele('t:h2@text()=You are being rate limited', timeout=1)
if bol:
logger.debug("被限流, 退出")
return False
2025-11-20 11:42:18 +08:00
bol = self.tab.ele(
't:li@text():There was a problem, please try again.', timeout=1)
if bol:
if bl:
logger.debug("多次异常界面, 结束继续点击")
return False
logger.debug("异常界面")
self.tab.wait(1)
return self.click_continue(bl=True)
html = self.tab.url
logger.debug(f"当前URL: {html}")
if 'https://veritaconnect.ca/canadianbreadsettlement/en-us/Claimant/UnknownClaimForm' in html:
logger.debug("成功进入问卷界面")
return True
except Exception as e:
logger.error(f"点击Continue按钮异常: {e}")
self.tab.wait(1)
return False
# 随机取城市
def get_random_city(self, province: str|None=None):
cities = {
"Alberta": ["Calgary", "Edmonton"],
"British Columbia": ["Vancouver"],
# "Manitoba": ["Winnipeg", "Rochester"],
# "New Brunswick": ["Fredericton", "Moncton"],
# "Newfoundland and Labrador": ["St. John's", "Halifax"],
"Nova Scotia": ["Halifax"],
"Ontario": ["Toronto"],
# "Prince Edward Island": ["Charlottetown", "St. John's"],
# "Quebec": ["Quebec City", "Montreal"],
# "Saskatchewan": ["Saskatoon", "Regina"],
}
if province is None:
province = random.choice(list(cities.keys()))
return province,random.choice(cities.get(province, []))
2025-11-20 21:42:20 +08:00
def get_province_by_city(self, city: str) -> str | None:
"""
根据城市名称解析对应省份
参数:
city (str): 城市名称例如 `Calgary``Edmonton`
返回值:
str | None: 对应的省份名称未匹配返回 None
"""
mapping = {
"Calgary": "Alberta",
"Edmonton": "Alberta",
"Vancouver": "British Columbia",
"Halifax": "Nova Scotia",
"Toronto": "Ontario",
}
return mapping.get(city)
# 随机实物
def get_random_food(self, shop: str) -> list[str]:
"""
随机选择 1~2 种食物类别并为每个类别至少选择 1 个具体产品
参数:
shop (str): 商店名称当前未使用占位参数
返回值:
list[str]: 随机选取的产品名称列表
"""
categories = [
[
'Wonder Bread White',
'Villaggio White Bread',
'No Name Sliced White Bread',
"President's Choice White Sliced Bread",
],
[
"Ben's Original Whole Wheat Bread",
"POM Whole Wheat Bread",
"Silver Hills Bakery Whole Wheat Sliced Bread",
"Country Harvest Whole Wheat Bread",
],
[
"Wonder Bread Hot Dog Buns",
"Villaggio Hamburger Buns",
"Dempster's Dinner Rolls",
"No Frills Hot Dog Buns",
],
[
"Stonemill Bakehouse Bagels",
"Wonder Bagels",
"Montreal Bagels (pre-packaged, e.g., St. Lawrence brand)",
"President's Choice Bagels",
],
[
"Silver Hills Multi-Grain Sliced Bread",
"POM Multi-Grain Bread",
"Country Harvest Multi-Grain Loaf",
],
[
"President's Choice French Stick",
"Dempster's Italian Style Bread",
"Wonder Italian Bread",
"Villaggio Country Style Loaf",
],
]
# 随机选择 1~2 个类别(不重复)
category_count = random.randint(1, 2)
chosen_categories = random.sample(categories, k=category_count)
# 每个类别至少选择 1 个产品,最多选择 3 个以避免过多
selected_products: list[str] = []
for cat in chosen_categories:
max_pick = min(3, len(cat))
pick_count = random.randint(1, max_pick)
selected_products.extend(random.sample(cat, k=pick_count))
logger.debug(f"随机选择的产品: {selected_products}")
text = f'{shop} buy: '
for p in selected_products:
text += f'{p} * {random.randint(1, 3)}, '
text = text[:-2]
text = text + '.'
logger.debug(f'随机选择的产品文本: {text}')
return text
2025-11-20 11:42:18 +08:00
# 填写问卷
2025-11-20 21:42:20 +08:00
def fill_questionnaire(self, city: str):
"""
根据传入的城市解析省份并完成问卷填写
参数:
city (str): 线程启动时传入的城市名称用于匹配省份并填写数据
"""
try:
province = self.get_province_by_city(city)
if province is None:
logger.error(f"未找到城市对应省份: {city}")
return
j = 0
while True:
if j >3:
return False
info = get_random_canada_info(province, city)
if len(info.get('postcode')) > 5:
break
j += 1
first_name = info["firstname"]
last_name = info["lastname"]
# 将生日格式从 '8/28/1995' 转为 'yyyy-mm-dd'日月不足两位补0
birthday = info["birthday"]
current_address = info["address_str"]
# 保持使用线程传入的城市与解析出的省份
postal_code = info["postcode"]
email = mail_.email_create_random()
phone = info["phone"]
shop = api.get_random_shop()
if shop is None:
return None
street = shop.get('street')
if street is None:
return None
text = self.get_random_food(street)
# 人数
person_count = str(random.randint(3, 5))
logger.debug("填写问卷")
self.tab.wait(0.1)
logger.debug(f"填写first_name: {first_name}")
self.tab.ele('t:input@id=FirstName').set.value(first_name)
self.tab.wait(0.1)
logger.debug(f"填写last_name: {last_name}")
self.tab.ele('t:input@id=LastName').set.value(last_name)
self.tab.wait(0.1)
logger.debug(f"填写birthday: {birthday}")
self.tab.ele('t:input@id=DateOfBirth').set.value(birthday)
self.tab.wait(0.1)
logger.debug(f"填写current_address: {current_address}")
self.tab.ele('t:input@id=AddressLine1').set.value(current_address)
self.tab.wait(0.1)
logger.debug(f"填写city: {city}")
self.tab.ele('t:input@id=City').set.value(city)
self.tab.wait(0.1)
logger.debug(f"填写province: {province}")
self.tab.ele(
't:select@id=CanProv').ele(f't:option@text()={province}').click()
self.tab.wait(0.1)
logger.debug(f"填写postal_code: {postal_code}")
self.tab.ele('t:input@id=CanPostal').set.value(postal_code)
self.tab.wait(0.1)
logger.debug(f"填写NumberOfAdults: {person_count}")
self.tab.ele(
't:select@id=NumberOfAdults').ele(f't:option@text()={person_count}').click()
self.tab.wait(0.1)
logger.debug(f"选择地址没变")
self.tab.eles('t:input@id=IsDifferentAddress')[1].click()
self.tab.wait(0.1)
logger.debug(f"填写email: {email}")
self.tab.ele('t:input@id=EmailAddress').set.value(email)
self.tab.wait(0.1)
logger.debug(f"填写ConfirmEmailAddress: {email}")
self.tab.ele('t:input@id=ConfirmEmailAddress').set.value(email)
self.tab.wait(0.1)
logger.debug(f"填写phone: {phone}")
self.tab.ele('t:input@id=PhoneNumber').set.value(phone)
self.tab.wait(0.1)
logger.debug(f"选择同意条款")
self.tab.ele('t:input@id=IVerify').click()
self.tab.wait(0.1)
logger.debug(f"选择没有申请过")
self.tab.eles('t:input@id=IsCompensated')[1].click()
self.tab.wait(0.1)
logger.debug(f"填写text: {text}")
self.tab.ele('t:textarea@id=MetaAnswerA').set.value(text)
self.tab.wait(0.1)
logger.debug(f"勾选同意我的名字")
self.tab.ele('t:input@id=IDeclare').click()
self.tab.wait(0.1)
logger.debug(f"填写PrintName: {last_name+' '+first_name}")
self.tab.ele(
't:input@id=PrintName').set.value(last_name+' '+first_name)
self.tab.wait(0.1)
for i in range(3):
bol = self.solve_cloudflare()
if not bol:
logger.debug("Cloudflare验证失败.")
self.tab.wait(0.1)
else:
logger.debug("Cloudflare验证成功.")
logger.debug(f"点击Submit按钮")
self.tab.ele('t:button@text():Submit').click()
break
api.create_info(
first_name=first_name,
last_name=last_name,
birthday=birthday,
current_address=current_address,
city=city,
phone=phone,
postal_code=postal_code,
province=province,
email=email,
text=text
)
self.tab.wait(2)
except Exception as e:
logger.error(f"填写问卷失败: {e}")
2025-11-20 11:42:18 +08:00
# 取对应城市的代理
def get_proxy( city: str):
if city == "Calgary":
return "us.novproxy.io:1000:uwqr8065-region-CA-st-Alberta-city-Calgary:d6vqwerx".split(':')
elif city =='Edmonton':
return 'us.novproxy.io:1000:uwqr8065-region-CA-st-Alberta-city-Edmonton:d6vqwerx'.split(':')
elif city =='Vancouver':
return 'us.novproxy.io:1000:uwqr8065-region-CA-st-British Columbia-city-Vancouver:d6vqwerx'.split(':')
elif city =='Halifax':
return 'us.novproxy.io:1000:uwqr8065-region-CA-st-Nova Scotia-city-Halifax:d6vqwerx'.split(':')
elif city == 'Toronto':
return 'us.novproxy.io:1000:uwqr8065-region-CA-st-Ontario-city-Toronto:d6vqwerx'.split(':')
else:
return None
"""指纹浏览器操作"""
# 创建指纹浏览器
def create_fingerprint_browser(city: str):
"""
根据城市创建指纹浏览器并执行问卷流程
参数:
city (str): 城市名称例如 `Calgary``Edmonton`
"""
browser_id = None
try:
proxy = get_proxy(city)
2025-11-20 21:42:20 +08:00
if proxy is None:
logger.error(f"{city} 未配置对应代理,结束该线程")
return
2025-11-20 11:42:18 +08:00
logger.info(f"{city} 准备创建指纹浏览器")
browser_id = bit_browser.bit_browser_create(
remark=city,
host=proxy[0],
port=proxy[1],
proxy_user=proxy[2],
proxy_pwd=proxy[3],
proxy_type='socks5'
)
logger.debug(browser_id)
# 打开指纹浏览器
http = bit_browser.bit_browser_open(browser_id)
logger.debug(http)
auto = Auto(http)
auto.open_url(
"https://veritaconnect.ca/canadianbreadsettlement/en-us/Claimant/UnknownClaimForm")
bol = auto.wait_home()
if not bol:
logger.error(f"{city} 进入首页失败,结束该线程")
return
bol = auto.click_continue()
if not bol:
logger.error(f"{city} 点击 Continue 失败,结束该线程")
return
2025-11-20 21:42:20 +08:00
auto.fill_questionnaire(city)
2025-11-20 11:42:18 +08:00
time.sleep(5)
finally:
if browser_id:
# 关闭指纹浏览器
try:
bit_browser.bit_browser_close(browser_id)
except Exception as e:
logger.error(f"{city} 关闭浏览器异常: {e}")
# 删除指纹浏览器
try:
bit_browser.bit_browser_delete(browser_id)
except Exception as e:
logger.error(f"{city} 删除浏览器异常: {e}")
def run_city_forever(city: str):
"""
持续循环运行指定城市流程完成一次即关闭并删除浏览器然后重新创建继续运行
参数:
city (str): 城市名称
"""
while True:
try:
create_fingerprint_browser(city)
except Exception as e:
logger.error(f"{city} 流程异常: {e}")
time.sleep(2)
def run_all_cities_concurrently():
"""
多线程并发运行所有城市流程
"""
import threading
cities = ['Calgary', 'Edmonton', 'Vancouver', 'Halifax', 'Toronto']
2025-11-20 21:42:20 +08:00
# cities = ['Calgary']
2025-11-20 11:42:18 +08:00
threads = []
for city in cities:
t = threading.Thread(target=run_city_forever, args=(city,), name=f"{city}-thread")
t.start()
threads.append(t)
logger.info(f"{city} 线程已启动")
2025-11-20 21:42:20 +08:00
# time.sleep(2)
2025-11-20 11:42:18 +08:00
for t in threads:
t.join()
logger.info("所有城市流程执行完成")
if __name__ == "__main__":
2025-11-20 21:42:20 +08:00
# auto = Auto()
# auto.get_random_food('a')
2025-11-20 11:42:18 +08:00
run_all_cities_concurrently()