Files
ca_auto_table/spider/main.py
2025-11-28 16:02:13 +08:00

804 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import random
import time
from datetime import datetime
from DrissionPage import Chromium
from loguru import logger
from work import get_random_canada_info
from mail_ import mail_
from bit_browser import bit_browser
from api import api
from proxys import proxy_list
import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
class Auto:
def __init__(self, http: str = None):
self.browser = Chromium(http)
self.tab = self.browser.latest_tab
pass
# cf打码
def solve_cloudflare(self, is_ok: bool = False):
tab = self.browser.latest_tab
for _ in range(5):
tab.wait(1)
res = tab.ele(
't:h1@text()=Sorry, you have been blocked', timeout=1)
if res:
logger.error("Cloudflare验证失败")
return False
try:
shadow1 = tab.ele(
'x://*[@name="cf-turnstile-response"]').parent().shadow_root
iframe = shadow1.get_frame(1)
if iframe:
logger.debug("找到Cloudflare iframe")
shadow2 = iframe.ele('x:/html/body').shadow_root
if shadow2:
logger.debug("找到Cloudflare iframe body shadow root")
status = shadow2.ele(
'x://span[text()="Verifying..."]', timeout=1.5)
if status:
tab.wait(3)
status = shadow2.ele(
'x://span[text()="Success!"]', timeout=1.5)
if status:
logger.debug("Cloudflare验证成功")
return True
checkbox = shadow2.ele(
'x://input[@type="checkbox"]', timeout=1.5)
if checkbox:
checkbox.click()
logger.debug("点击Cloudflare复选框")
tab.wait(3)
logger.debug("重新获取状态")
# return False
except Exception as e:
# logger.error(f"处理Cloudflare异常: {e}")
if is_ok:
logger.debug(f"cloudflare处理通过: {e}")
return True
return self.solve_cloudflare(is_ok=True)
tab.wait(1)
return False
# 打开URL
def open_url(self, url: str):
self.tab.get(url)
# 等待进入首页
def wait_home(self):
logger.debug("等待进入首页")
jc = 0
while True:
if jc > 3:
logger.error("等待进入首页超过5次未成功")
return False
self.tab.wait(1)
# 判断cf是否通过
bol = self.solve_cloudflare()
if not bol:
logger.debug("Cloudflare验证失败.")
# 刷新网页
self.tab.refresh()
self.tab.wait(1.5)
jc += 1
continue
else:
logger.debug("Cloudflare验证成功.")
self.tab.wait(1.5)
bol = self.tab.ele(
't:h1@text()=Sorry, you have been blocked', timeout=1)
if bol:
logger.debug("ip被ban秒")
return False
bol = self.tab.ele(
't:div@text():ERR_TIMED_OUT', timeout=1)
if bol:
logger.debug("刷新网页")
self.tab.refresh()
self.tab.wait(1.5)
bol = self.tab.ele(
't:div@text():ERR_SSL_PROTOCOL_ERROR', timeout=1)
if bol:
logger.debug("刷新网页")
self.tab.refresh()
self.tab.wait(1.5)
bol = self.tab.ele(
't:div@text():ERR_SOCKS_CONNECTION_FAILED', timeout=1)
if bol:
logger.debug("刷新网页")
self.tab.refresh()
self.tab.wait(1.5)
html = self.tab.url
logger.debug(f"当前URL: {html}")
if 'https://veritaconnect.ca/canadianbreadsettlement/en-us' == html:
logger.debug("成功进入首页")
return True
jc += 1
# 点击continue按钮
def click_continue(self, bl: bool = False):
logger.debug("点击Continue按钮")
jc = 0
while True:
if jc > 3:
logger.error("点击Continue按钮超过5次未成功")
return False
try:
continue_button = self.tab.ele(
't:button@text():Continue', timeout=1)
if continue_button:
jc += 1
# 滚动到最底部
self.tab.scroll.to_bottom()
self.tab.wait(1)
# 判断cf是否通过
bol = self.solve_cloudflare()
if not bol:
logger.debug("Cloudflare验证失败..")
self.tab.refresh()
self.tab.wait(1.5)
continue
else:
logger.debug("Cloudflare验证成功..")
self.tab.wait(3)
continue_button.click()
logger.debug("点击Continue按钮成功")
self.tab.wait(1.5)
# bol = self.tab.ele('@text():Loading', timeout=1)
# if bol:
# logger.debug("Loading...")
# if bl:
# logger.debug("多次异常界面, 结束继续点击")
# return False
# logger.debug("异常界面")
# self.tab.wait(1)
# return self.click_continue(bl=True)
bol = self.tab.ele(
't:div@text():ERR_TIMED_OUT', timeout=1)
if bol:
logger.debug("刷新网页")
self.tab.refresh()
self.tab.wait(1.5)
bol = self.tab.ele(
't:div@text():ERR_SSL_PROTOCOL_ERROR', timeout=1)
if bol:
logger.debug("刷新网页")
self.tab.refresh()
self.tab.wait(1.5)
bol = self.tab.ele(
't:div@text():ERR_SOCKS_CONNECTION_FAILED', timeout=1)
if bol:
logger.debug("刷新网页")
self.tab.refresh()
self.tab.wait(1.5)
bol = self.tab.ele(
't:h1@text()=Sorry, you have been blocked', timeout=1)
if bol:
logger.debug("ip被ban秒")
# 刷新网页
self.tab.refresh()
self.tab.wait(1.5)
bol = self.tab.ele(
't:h2@text()=You are being rate limited', timeout=1)
if bol:
logger.debug("被限流, 退出")
return False
bol = self.tab.ele(
't:li@text():There was a problem, please try again.', timeout=1)
if bol:
if bl:
logger.debug("多次异常界面, 结束继续点击")
return False
logger.debug("异常界面")
self.tab.wait(1)
return self.click_continue(bl=True)
html = self.tab.url
logger.debug(f"当前URL: {html}")
if 'https://veritaconnect.ca/canadianbreadsettlement/en-us/Claimant/UnknownClaimForm' in html:
logger.debug("成功进入问卷界面")
return True
jc += 1
except Exception as e:
logger.error(f"点击Continue按钮异常: {e}")
self.tab.wait(1)
# 随机取城市
def get_random_city(self, province: str | None = None):
cities = {
"Alberta": ["Calgary", "Edmonton"],
"British Columbia": ["Vancouver"],
# "Manitoba": ["Winnipeg", "Rochester"],
# "New Brunswick": ["Fredericton", "Moncton"],
# "Newfoundland and Labrador": ["St. John's", "Halifax"],
"Nova Scotia": ["Halifax"],
"Ontario": ["Toronto"],
# "Prince Edward Island": ["Charlottetown", "St. John's"],
# "Quebec": ["Quebec City", "Montreal"],
# "Saskatchewan": ["Saskatoon", "Regina"],
}
if province is None:
province = random.choice(list(cities.keys()))
return province, random.choice(cities.get(province, []))
def get_province_by_city(self) -> str | None:
"""
根据城市名称解析对应省份
参数:
city (str): 城市名称,例如 `Calgary`、`Edmonton` 等
返回值:
str | None: 对应的省份名称;未匹配返回 None
"""
mapping = {
"Calgary": "Alberta",
"Edmonton": "Alberta",
"Vancouver": "British Columbia",
"Halifax": "Nova Scotia",
"Toronto": "Ontario",
"Ottawa": "Ontario",
"Mississauga": "Ontario",
"Brampton": "Ontario",
"Hamilton": "Ontario",
"Kitchener": "Ontario",
"London": "Ontario",
"Markham": "Ontario",
"Vaughan": "Ontario",
"Windsor": "Ontario",
"Oshawa": "Ontario",
"Brantford": "Ontario",
"Barrie": "Ontario",
"Sudbury": "Ontario",
"Kingston": "Ontario",
"Guelph": "Ontario",
"Cambridge": "Ontario",
"Sarnia": "Ontario",
"Peterborough": "Ontario",
"Waterloo": "Ontario",
"Belleville": "Ontario",
"Brockville": "Ontario",
"Burlington": "Ontario",
"Cornwall": "Ontario",
"Kawartha Lakes": "Ontario",
"North Bay": "Ontario",
"Orillia": "Ontario",
"Pickering": "Ontario",
"Sault Ste. Marie": "Ontario",
"Stratford": "Ontario",
"Durham": "Ontario",
"Norfolk County": "Ontario",
"Prince Edward County": "Ontario",
"Quinte West": "Ontario",
"St. Catharines": "Ontario",
"Welland": "Ontario",
"Thorold": "Ontario",
"Niagara Falls": "Ontario",
"Pelham": "Ontario",
"Port Colborne": "Ontario",
}
# 随机返回一条 key 和 value
return random.choice(list(mapping.items()))
# 随机实物
def get_random_food(self, city: str, shop: str) -> list[str]:
"""
随机选择 1~2 种食物类别,并为每个类别至少选择 1 个具体产品
参数:
shop (str): 商店名称(当前未使用,占位参数)
返回值:
list[str]: 随机选取的产品名称列表
"""
categories = [
[
'Wonder Bread White',
'Villaggio White Bread',
'No Name Sliced White Bread',
"President's Choice White Sliced Bread",
],
[
"Ben's Original Whole Wheat Bread",
"POM Whole Wheat Bread",
"Silver Hills Bakery Whole Wheat Sliced Bread",
"Country Harvest Whole Wheat Bread",
],
[
"Wonder Bread Hot Dog Buns",
"Villaggio Hamburger Buns",
"Dempster's Dinner Rolls",
"No Frills Hot Dog Buns",
],
[
"Stonemill Bakehouse Bagels",
"Wonder Bagels",
"Montreal Bagels (pre-packaged, e.g., St. Lawrence brand)",
"President's Choice Bagels",
],
[
"Silver Hills Multi-Grain Sliced Bread",
"POM Multi-Grain Bread",
"Country Harvest Multi-Grain Loaf",
],
[
"President's Choice French Stick",
"Dempster's Italian Style Bread",
"Wonder Italian Bread",
"Villaggio Country Style Loaf",
],
]
# 随机选择 1~2 个类别(不重复)
category_count = random.randint(1, 2)
chosen_categories = random.sample(categories, k=category_count)
# 每个类别至少选择 1 个产品,最多选择 3 个以避免过多
selected_products: list[str] = []
for cat in chosen_categories:
max_pick = min(3, len(cat))
pick_count = random.randint(1, max_pick)
selected_products.extend(random.sample(cat, k=pick_count))
logger.debug(f"随机选择的产品: {selected_products}")
text = f'{shop}, {city} buy: '
for p in selected_products:
text += f'{p} * {random.randint(1, 3)}, '
text = text[:-2]
text = text + '.'
logger.debug(f'随机选择的产品文本: {text}')
return text
# 填写问卷
def fill_questionnaire(self):
"""
完成问卷填写
参数:
city (str): 线程启动时传入的城市名称,用于匹配省份并填写数据
"""
try:
city, province = self.get_province_by_city()
if province is None:
logger.error(f"未找到城市对应省份")
return
j = 0
while True:
if j > 3:
return False
info = get_random_canada_info(province, city)
if len(info.get('postcode')) > 5:
break
j += 1
first_name = info["firstname"]
last_name = info["lastname"]
# 将生日格式从 '8/28/1995' 转为 'yyyy-mm-dd'日月不足两位补0
birthday = info["birthday"]
current_address = info["address_str"]
# 保持使用线程传入的城市与解析出的省份
postal_code = info["postcode"]
email = mail_.email_create_random()
phone = info["phone"]
shop = api.get_random_shop()
if shop is None:
return None
street = shop.get('street')
if street is None:
return None
text = self.get_random_food(shop.get('city'), street)
# 人数
person_count = str(random.randint(3, 5))
logger.debug("填写问卷")
self.tab.wait(0.1)
logger.debug(f"填写first_name: {first_name}")
self.tab.ele('t:input@id=FirstName').set.value(first_name)
self.tab.wait(0.1)
logger.debug(f"填写last_name: {last_name}")
self.tab.ele('t:input@id=LastName').set.value(last_name)
self.tab.wait(0.1)
logger.debug(f"填写birthday: {birthday}")
self.tab.ele('t:input@id=DateOfBirth').set.value(birthday)
self.tab.wait(0.1)
logger.debug(f"填写current_address: {current_address}")
self.tab.ele('t:input@id=AddressLine1').set.value(current_address)
self.tab.wait(0.1)
logger.debug(f"填写city: {city}")
self.tab.ele('t:input@id=City').set.value(city)
self.tab.wait(0.1)
logger.debug(f"填写province: {province}")
self.tab.ele(
't:select@id=CanProv').ele(f't:option@text()={province}').click()
self.tab.wait(0.1)
logger.debug(f"填写postal_code: {postal_code}")
self.tab.ele('t:input@id=CanPostal').set.value(postal_code)
self.tab.wait(0.1)
logger.debug(f"填写NumberOfAdults: {person_count}")
self.tab.ele(
't:select@id=NumberOfAdults').ele(f't:option@text()={person_count}').click()
self.tab.wait(0.1)
logger.debug(f"选择地址没变")
self.tab.eles('t:input@id=IsDifferentAddress')[1].click()
self.tab.wait(0.1)
logger.debug(f"填写email: {email}")
self.tab.ele('t:input@id=EmailAddress').set.value(email)
self.tab.wait(0.1)
logger.debug(f"填写ConfirmEmailAddress: {email}")
self.tab.ele('t:input@id=ConfirmEmailAddress').set.value(email)
self.tab.wait(0.1)
logger.debug(f"填写phone: {phone}")
self.tab.ele('t:input@id=PhoneNumber').set.value(phone)
self.tab.wait(0.1)
logger.debug(f"选择同意条款")
self.tab.ele('t:input@id=IVerify').click()
self.tab.wait(0.1)
logger.debug(f"选择没有申请过")
self.tab.eles('t:input@id=IsCompensated')[1].click()
self.tab.wait(0.1)
logger.debug(f"填写text: {text}")
self.tab.ele('t:textarea@id=MetaAnswerA').set.value(text)
self.tab.wait(0.1)
logger.debug(f"勾选同意我的名字")
self.tab.ele('t:input@id=IDeclare').click()
self.tab.wait(0.1)
logger.debug(f"填写PrintName: {last_name+' '+first_name}")
self.tab.ele(
't:input@id=PrintName').set.value(last_name+' '+first_name)
self.tab.wait(0.1)
return self.submit_file(first_name, last_name, birthday, current_address, city, phone, postal_code, province, email, text)
except Exception as e:
logger.error(f"填写问卷失败: {e}")
# 提交问卷
def submit_file(self, first_name: str, last_name: str, birthday: str, current_address: str, city: str, phone: str, postal_code: str, province: str, email: str, text: str):
jc = 0
while True:
if jc >= 3:
logger.error("提交问卷失败")
return False
res = self.tab.ele(
't:h2@text()=CLAIM SUBMISSION CONFIRMATION', timeout=3)
if res:
logger.info("提交问卷成功")
res = self.tab.ele('@text():Your claim number: ')
if res:
logger.info(f"反馈地址: {res.text}")
text =f"{text}----{res.text}"
status = True
else:
status=False
api.create_info(
first_name=first_name,
last_name=last_name,
birthday=birthday,
current_address=current_address,
city=city,
phone=phone,
postal_code=postal_code,
province=province,
email=email,
text=text,
status=status
)
return True
bol = self.tab.ele(
't:div@text():ERR_TIMED_OUT', timeout=1)
if bol:
logger.debug("刷新网页")
self.tab.refresh()
self.tab.wait(1.5)
bol = self.tab.ele(
't:div@text():ERR_SSL_PROTOCOL_ERROR', timeout=1)
if bol:
logger.debug("刷新网页")
self.tab.refresh()
self.tab.wait(1.5)
bol = self.tab.ele(
't:div@text():ERR_SOCKS_CONNECTION_FAILED', timeout=1)
if bol:
logger.debug("刷新网页")
self.tab.refresh()
self.tab.wait(1.5)
bol = self.solve_cloudflare()
if not bol:
logger.debug("Cloudflare验证失败.")
self.tab.wait(1)
else:
logger.debug("Cloudflare验证成功.")
logger.debug(f"点击Submit按钮")
self.tab.ele('t:button@text():Submit').click()
self.tab.wait(3)
jc += 1
def parse_proxy(proxy: str) -> tuple[str, int, str, str] | None:
"""
解析代理字符串为四元组 `(host, port, user, pwd)`
参数:
proxy: 形如 `host:port:user:pwd`
返回值:
(host, port, user, pwd) 或 None格式错误
"""
try:
host, port, user, pwd = proxy.split(":", 3)
return host, int(port), user, pwd
except Exception:
logger.error(f"代理格式错误: {proxy}")
return None
def create_fingerprint_browser(proxy: str) -> tuple[str, str] | None:
"""
创建指纹浏览器并打开窗口,返回 `(browser_id, debugger_http)`
参数:
proxy: 代理字符串
返回值:
(browser_id, http) 或 None失败
"""
info = parse_proxy(proxy)
if info is None:
return None
host, port, user, pwd = info
try:
browser_id = bit_browser.bit_browser_create(
remark=f"{user}",
proxy_type="socks5",
host=host,
port=str(port),
proxy_user=user,
proxy_pwd=pwd,
)
if not browser_id:
return None
logger.info(f"创建指纹浏览器成功: {browser_id}")
time.sleep(0.1)
http = bit_browser.bit_browser_open(browser_id)
if not http:
return None
logger.info(f"打开指纹浏览器成功: {browser_id}")
return browser_id, http
except Exception as e:
logger.error(f"创建指纹浏览器失败: {e}")
return None
def close_and_delete_browser(browser_id: str) -> None:
"""
关闭并删除指定指纹浏览器
参数:
browser_id: 指纹浏览器ID
"""
try:
bit_browser.bit_browser_close(browser_id)
except Exception as e:
logger.warning(f"关闭浏览器失败或已关闭: {browser_id} - {e}")
time.sleep(0.1)
try:
bit_browser.bit_browser_delete(browser_id)
except Exception as e:
logger.warning(f"删除浏览器失败或已删除: {browser_id} - {e}")
def run_task_with_proxy(proxy: str, stop_event: threading.Event) -> None:
"""
使用代理创建指纹浏览器、执行自动化,并在结束后清理
参数:
proxy: 代理字符串
"""
browser_id: str | None = None
try:
created = create_fingerprint_browser(proxy)
if not created:
return
browser_id, http = created
if stop_event.is_set():
return
auto = Auto(http=http)
auto.open_url('https://veritaconnect.ca/canadianbreadsettlement/en-us')
if stop_event.is_set():
return
if not auto.wait_home():
return
if stop_event.is_set():
return
if not auto.click_continue():
return
if stop_event.is_set():
return
auto.fill_questionnaire()
except Exception as e:
logger.error(f"执行任务异常: {e}")
finally:
if browser_id:
try:
close_and_delete_browser(browser_id)
except Exception:
pass
def proxy_loop(proxy: str, stop_event: threading.Event) -> None:
"""
为单个代理保持持续运行:任务结束后立即重建并再次执行
参数:
proxy: 代理字符串
stop_event: 停止事件,用于外部触发退出循环
"""
while not stop_event.is_set():
try:
if is_forbidden_time():
cleanup_all_browsers()
secs = seconds_until(20, 0)
if stop_event.wait(timeout=secs):
break
continue
run_task_with_proxy(proxy, stop_event)
except Exception as e:
logger.error(f"代理循环异常: {proxy} - {e}")
if stop_event.is_set():
break
if stop_event.wait(timeout=0.1):
break
def is_forbidden_time() -> bool:
"""
判断当前是否处于禁跑时段(每日 18:30 ~ 20:00本地时间
返回值:
bool: True 表示处于禁跑时段
"""
now = datetime.now()
start = now.replace(hour=18, minute=30, second=0, microsecond=0)
end = now.replace(hour=20, minute=0, second=0, microsecond=0)
return start <= now < end
def seconds_until(hour: int, minute: int) -> float:
"""
计算到今天指定时间点的剩余秒数
参数:
hour: 目标小时24小时制
minute: 目标分钟
返回值:
float: 剩余秒数,若目标时间已过则为 0
"""
now = datetime.now()
target = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
if target <= now:
return 0.0
return (target - now).total_seconds()
def count_fingerprint_browsers() -> int:
"""
统计当前指纹浏览器数量
返回值:
int: 当前总数量
"""
try:
res = bit_browser.bit_browser_get(0, 100)
data = res.get("data", {}) if isinstance(res, dict) else {}
total = data.get("totalNum")
lst = data.get("list", [])
if isinstance(total, int) and total >= 0:
return total
return len(lst)
except Exception as e:
logger.warning(f"统计指纹浏览器数量失败: {e}")
return 0
def cleanup_all_browsers() -> None:
"""
关闭并删除所有指纹浏览器
"""
try:
res = bit_browser.bit_browser_get(0, 100)
data = res.get("data", {}) if isinstance(res, dict) else {}
lst = data.get("list", [])
ids = [i.get("id") for i in lst if i.get("id")]
for bid in ids:
close_and_delete_browser(bid)
except Exception as e:
logger.warning(f"清理所有指纹浏览器失败: {e}")
def monitor_browsers_and_restart(limit: int, stop_event: threading.Event, restart_event: threading.Event) -> None:
"""
每 30 秒检测指纹浏览器数量,超过 `limit` 则触发重启事件并清理所有浏览器
参数:
limit: 允许的最大浏览器数量(通常为代理数量)
restart_event: 触发重启的事件
"""
while not stop_event.is_set():
time.sleep(30)
count = count_fingerprint_browsers()
if count > limit:
logger.warning(f"指纹浏览器数量 {count} 超过限制 {limit},执行重启")
restart_event.set()
stop_event.set()
cleanup_all_browsers()
break
def main():
"""
多线程并发管理:按代理数量并发创建指纹浏览器并执行任务;每 30 秒监控数量,超限则重启。
"""
proxies = list(proxy_list)
while True:
stop_event = threading.Event()
restart_event = threading.Event()
if is_forbidden_time():
cleanup_all_browsers()
secs = seconds_until(20, 0)
logger.info(f"处于禁跑时段休眠至20:00剩余 {int(secs)}")
time.sleep(secs)
continue
with ThreadPoolExecutor(max_workers=len(proxies)) as executor:
futures_map = {executor.submit(proxy_loop, p, stop_event): p for p in proxies}
monitor_thread = threading.Thread(
target=monitor_browsers_and_restart,
args=(len(proxies), stop_event, restart_event),
daemon=True,
)
monitor_thread.start()
while True:
if restart_event.is_set():
stop_event.set()
try:
executor.shutdown(wait=False)
except Exception:
pass
break
# 进入禁跑时段时,立即停止并清理浏览器
if is_forbidden_time():
logger.info("进入禁跑时段,停止当前批次并清理指纹浏览器")
stop_event.set()
try:
executor.shutdown(wait=False)
except Exception:
pass
cleanup_all_browsers()
break
time.sleep(0.2)
try:
monitor_thread.join(timeout=5)
except Exception:
pass
continue
if __name__ == "__main__":
main()