Files
ca_auto_table/spider/api.py

100 lines
3.1 KiB
Python
Raw Normal View History

2025-11-20 21:42:20 +08:00
import requests
from loguru import logger
import csv
import os
import random
class Api:
def __init__(self) -> None:
2025-11-20 22:56:11 +08:00
# self.base_url = 'http://127.0.0.1:6060'
self.base_url = 'http://192.168.11.67:6060'
2025-11-20 21:42:20 +08:00
# 创建店铺
def create_shop(self, city: str, street: str, shop_name: str) -> dict:
url = f'{self.base_url}/country/shop'
item = {
'city': city,
'street': street,
'shop_name': shop_name,
}
response = requests.post(url, json=item).json()
logger.info(response)
return response
# 查询店铺
def get_shop(self, city: str) -> dict:
url = f'{self.base_url}/country/shop'
response = requests.get(url).json()
# logger.info(response)
return response
# 创建信息
def create_info(self, first_name: str, last_name: str, birthday: str, current_address: str, city: str, phone: str, postal_code: str, province: str, email: str, text: str,status: bool=False, email_content: str|None=None) -> dict:
url = f'{self.base_url}/country/info'
item = {
"first_name": first_name,
"last_name": last_name,
"birthday": birthday,
"current_address": current_address,
"city": city,
"phone": phone,
"postal_code": postal_code,
"province": province,
"status": status,
"email": email,
"email_content": email_content,
"text": text
}
response = requests.post(url, json=item).json()
logger.info(response)
return response
# 根据城市 随机获取一个店铺
def get_random_shop(self) -> dict:
url = f'{self.base_url}/country/shop/random'
response = requests.get(url).json()
# logger.info(response)
if not response.get('street'):
logger.error(f'没有店铺')
return None
return response
2025-11-21 01:26:38 +08:00
def main():
"""
从同目录的 `bakeries.csv` 读取面包店数据按列映射输出或创建店铺
2025-11-20 21:42:20 +08:00
2025-11-21 01:26:38 +08:00
列顺序`Name,Address,City`
"""
api = Api()
2025-11-21 09:58:05 +08:00
csv_path = os.path.join(os.path.dirname(__file__), 'data.csv')
2025-11-21 01:26:38 +08:00
if not os.path.exists(csv_path):
logger.error(f'CSV 文件不存在: {csv_path}')
return
2025-11-20 21:42:20 +08:00
2025-11-21 01:26:38 +08:00
with open(csv_path, 'r', encoding='utf-8') as file:
reader = csv.reader(file)
header = next(reader, None)
for row in reader:
if len(row) < 3:
logger.warning(f'行列数不足,跳过: {row}')
continue
2025-11-21 09:58:05 +08:00
shop_name, street, city = row[1], row[2], row[0]
2025-11-21 01:26:38 +08:00
if ' (city)' in city:
city = city.replace(' (city)', '')
if 'Quebec' in city:
continue
if ',' in city:
city = city.split(',')[0]
logger.info(f'city: {city}, street: {street}, shop_name: {shop_name}')
api.create_shop(city, street, shop_name)
2025-11-20 21:42:20 +08:00
# def main2():
# api = Api()
# city = 'Toronto'
# shop = api.get_random_shop()
# if shop:
# logger.info(shop)
# if __name__ == '__main__':
2025-11-21 09:58:05 +08:00
# main()
2025-11-20 21:42:20 +08:00
api = Api()