2025-11-20 21:42:20 +08:00
|
|
|
|
import requests
|
|
|
|
|
|
from loguru import logger
|
|
|
|
|
|
import csv
|
|
|
|
|
|
import os
|
|
|
|
|
|
import random
|
|
|
|
|
|
class Api:
|
|
|
|
|
|
def __init__(self) -> None:
|
2025-11-20 22:56:11 +08:00
|
|
|
|
# self.base_url = 'http://127.0.0.1:6060'
|
|
|
|
|
|
self.base_url = 'http://192.168.11.67:6060'
|
2025-11-20 21:42:20 +08:00
|
|
|
|
|
|
|
|
|
|
# 创建店铺
|
|
|
|
|
|
def create_shop(self, city: str, street: str, shop_name: str) -> dict:
|
|
|
|
|
|
url = f'{self.base_url}/country/shop'
|
|
|
|
|
|
item = {
|
|
|
|
|
|
'city': city,
|
|
|
|
|
|
'street': street,
|
|
|
|
|
|
'shop_name': shop_name,
|
|
|
|
|
|
}
|
|
|
|
|
|
response = requests.post(url, json=item).json()
|
|
|
|
|
|
logger.info(response)
|
|
|
|
|
|
return response
|
|
|
|
|
|
|
|
|
|
|
|
# 查询店铺
|
|
|
|
|
|
def get_shop(self, city: str) -> dict:
|
|
|
|
|
|
url = f'{self.base_url}/country/shop'
|
|
|
|
|
|
response = requests.get(url).json()
|
|
|
|
|
|
# logger.info(response)
|
|
|
|
|
|
return response
|
|
|
|
|
|
|
|
|
|
|
|
# 创建信息
|
|
|
|
|
|
def create_info(self, first_name: str, last_name: str, birthday: str, current_address: str, city: str, phone: str, postal_code: str, province: str, email: str, text: str,status: bool=False, email_content: str|None=None) -> dict:
|
|
|
|
|
|
url = f'{self.base_url}/country/info'
|
|
|
|
|
|
item = {
|
|
|
|
|
|
"first_name": first_name,
|
|
|
|
|
|
"last_name": last_name,
|
|
|
|
|
|
"birthday": birthday,
|
|
|
|
|
|
"current_address": current_address,
|
|
|
|
|
|
"city": city,
|
|
|
|
|
|
"phone": phone,
|
|
|
|
|
|
"postal_code": postal_code,
|
|
|
|
|
|
"province": province,
|
|
|
|
|
|
"status": status,
|
|
|
|
|
|
"email": email,
|
|
|
|
|
|
"email_content": email_content,
|
|
|
|
|
|
"text": text
|
|
|
|
|
|
}
|
|
|
|
|
|
response = requests.post(url, json=item).json()
|
|
|
|
|
|
logger.info(response)
|
|
|
|
|
|
return response
|
|
|
|
|
|
|
|
|
|
|
|
# 根据城市 随机获取一个店铺
|
|
|
|
|
|
def get_random_shop(self) -> dict:
|
|
|
|
|
|
url = f'{self.base_url}/country/shop/random'
|
|
|
|
|
|
response = requests.get(url).json()
|
|
|
|
|
|
# logger.info(response)
|
|
|
|
|
|
if not response.get('street'):
|
|
|
|
|
|
logger.error(f'没有店铺')
|
|
|
|
|
|
return None
|
|
|
|
|
|
return response
|
|
|
|
|
|
|
2025-11-21 01:26:38 +08:00
|
|
|
|
def main():
|
|
|
|
|
|
"""
|
|
|
|
|
|
从同目录的 `bakeries.csv` 读取面包店数据,按列映射输出或创建店铺
|
2025-11-20 21:42:20 +08:00
|
|
|
|
|
2025-11-21 01:26:38 +08:00
|
|
|
|
列顺序:`Name,Address,City`
|
|
|
|
|
|
"""
|
|
|
|
|
|
api = Api()
|
2025-11-21 09:58:05 +08:00
|
|
|
|
csv_path = os.path.join(os.path.dirname(__file__), 'data.csv')
|
2025-11-21 01:26:38 +08:00
|
|
|
|
if not os.path.exists(csv_path):
|
|
|
|
|
|
logger.error(f'CSV 文件不存在: {csv_path}')
|
|
|
|
|
|
return
|
2025-11-20 21:42:20 +08:00
|
|
|
|
|
2025-11-21 01:26:38 +08:00
|
|
|
|
with open(csv_path, 'r', encoding='utf-8') as file:
|
|
|
|
|
|
reader = csv.reader(file)
|
|
|
|
|
|
header = next(reader, None)
|
|
|
|
|
|
for row in reader:
|
|
|
|
|
|
if len(row) < 3:
|
|
|
|
|
|
logger.warning(f'行列数不足,跳过: {row}')
|
|
|
|
|
|
continue
|
2025-11-21 09:58:05 +08:00
|
|
|
|
shop_name, street, city = row[1], row[2], row[0]
|
2025-11-21 01:26:38 +08:00
|
|
|
|
if ' (city)' in city:
|
|
|
|
|
|
city = city.replace(' (city)', '')
|
|
|
|
|
|
if 'Quebec' in city:
|
|
|
|
|
|
continue
|
|
|
|
|
|
if ',' in city:
|
|
|
|
|
|
city = city.split(',')[0]
|
|
|
|
|
|
logger.info(f'city: {city}, street: {street}, shop_name: {shop_name}')
|
|
|
|
|
|
api.create_shop(city, street, shop_name)
|
2025-11-20 21:42:20 +08:00
|
|
|
|
|
|
|
|
|
|
# def main2():
|
|
|
|
|
|
# api = Api()
|
|
|
|
|
|
# city = 'Toronto'
|
|
|
|
|
|
# shop = api.get_random_shop()
|
|
|
|
|
|
# if shop:
|
|
|
|
|
|
# logger.info(shop)
|
|
|
|
|
|
|
|
|
|
|
|
# if __name__ == '__main__':
|
2025-11-21 09:58:05 +08:00
|
|
|
|
# main()
|
2025-11-20 21:42:20 +08:00
|
|
|
|
|
|
|
|
|
|
api = Api()
|