Files
ca_auto_table/back/apis/country/info/view.py
2025-11-20 11:42:18 +08:00

172 lines
6.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, Query, Body, HTTPException
import random
from uuid import UUID
from .schema import Create, Update, Out, OutList
from ..models import Info
from utils.decorators import handle_exceptions_unified
from utils.time_tool import parse_time
from utils.out_base import CommonOut
from tortoise.transactions import in_transaction
app = APIRouter()
# 创建信息
@app.post("", response_model=Out, description='创建信息', summary='创建信息')
@handle_exceptions_unified()
async def post(item: Create = Body(..., description='创建数据')):
"""
创建信息记录
"""
res = await Info.create(**item.model_dump())
if not res:
raise HTTPException(status_code=400, detail='创建失败')
return res
# 查询信息
@app.get("", response_model=OutList, description='获取信息', summary='获取信息')
@handle_exceptions_unified()
async def gets(
id: UUID | None = Query(None, description='主键ID'),
first_name: str | None = Query(None, description=''),
last_name: str | None = Query(None, description=''),
birthday: str | None = Query(None, description='生日'),
current_address: str | None = Query(None, description='街道地址'),
city: str | None = Query(None, description='城市'),
phone: str | None = Query(None, description='电话'),
postal_code: str | None = Query(None, description='邮编'),
province: str | None = Query(None, description='州全称'),
status: bool | None = Query(None, description='状态'),
email: str | None = Query(None, description='邮箱'),
order_by: str | None = Query('create_time', description='排序字段',
regex='^(-)?(id|first_name|last_name|city|postal_code|province|create_time|update_time)$'),
res_count: bool = Query(False, description='是否返回总数'),
create_time_start: str | int | None = Query(
None, description='创建时间开始 (支持 YYYY-MM-DD / YYYY-MM-DD HH:mm:ss / 13位时间戳)'),
create_time_end: str | int | None = Query(
None, description='创建时间结束 (支持 YYYY-MM-DD / YYYY-MM-DD HH:mm:ss / 13位时间戳)'),
update_time_start: str | int | None = Query(
None, description='更新时间开始 (支持 YYYY-MM-DD / YYYY-MM-DD HH:mm:ss / 13位时间戳)'),
update_time_end: str | int | None = Query(
None, description='更新时间结束 (支持 YYYY-MM-DD / YYYY-MM-DD HH:mm:ss / 13位时间戳)'),
page: int = Query(1, ge=1, description='页码'),
limit: int = Query(10, ge=1, le=1000, description='每页数量'),
):
"""
获取信息列表
"""
query = Info.all()
if id:
query = query.filter(id=id)
if first_name:
query = query.filter(first_name=first_name)
if last_name:
query = query.filter(last_name=last_name)
if birthday:
query = query.filter(birthday=birthday)
if current_address:
query = query.filter(current_address=current_address)
if city:
query = query.filter(city=city)
if phone:
query = query.filter(phone=phone)
if postal_code:
query = query.filter(postal_code=postal_code)
if province:
query = query.filter(province=province)
if email:
query = query.filter(email=email)
if status is not None:
query = query.filter(status=status)
if create_time_start:
query = query.filter(create_time__gte=parse_time(create_time_start))
if create_time_end:
query = query.filter(create_time__lte=parse_time(
create_time_end, is_end=True))
if update_time_start:
query = query.filter(update_time__gte=parse_time(update_time_start))
if update_time_end:
query = query.filter(update_time__lte=parse_time(
update_time_end, is_end=True))
if order_by:
query = query.order_by(order_by)
if res_count:
count = await query.count()
else:
count = -1
offset = (page - 1) * limit # 计算偏移量
query = query.limit(limit).offset(offset) # 应用分页
res = await query
if not res:
raise HTTPException(status_code=404, detail='信息不存在')
num = len(res)
return OutList(count=count, num=num, items=res)
# 更新信息
@app.put("", response_model=Out, description='更新信息', summary='更新信息')
@handle_exceptions_unified()
async def put(id: UUID = Query(..., description='主键ID'),
item: Update = Body(..., description='更新数据'),
):
"""
部分更新信息,只更新传入的非空字段
"""
# 检查信息是否存在
secret = await Info.get_or_none(id=id)
if not secret:
raise HTTPException(status_code=404, detail='信息不存在')
# 获取要更新的字段排除None值的字段
update_data = item.model_dump(exclude_unset=True)
# 如果没有要更新的字段
if not update_data:
raise HTTPException(status_code=400, detail='没有要更新的字段')
# 更新信息字段
await secret.update_from_dict(update_data)
await secret.save()
return secret
# 删除信息
@app.delete("", response_model=CommonOut, description='删除信息', summary='删除信息')
@handle_exceptions_unified()
async def delete(id: UUID = Query(..., description='主键ID'),
):
"""删除信息"""
secret = await Info.get_or_none(id=id)
if not secret:
raise HTTPException(status_code=404, detail='信息不存在')
await secret.delete()
# Tortoise ORM 单个实例的 delete() 方法返回 None而不是删除的记录数
# 删除成功时手动返回 1如果有异常会被装饰器捕获
return CommonOut(count=1)
# 随机获取一条状态修改为True的记录
@app.get("/one", response_model=Out, description='随机获取一条状态修改为True的记录', summary='随机获取一条状态修改为True的记录')
@handle_exceptions_unified()
async def random_update_status():
"""
随机获取一条状态为 False 的记录并在事务中更新为 True
"""
async with in_transaction() as conn:
q = Info.filter(status=False).using_db(conn)
current_running_count = await q.count()
if current_running_count == 0:
raise HTTPException(status_code=404, detail='没有状态为False的记录')
pick_index = random.choice(range(current_running_count))
item = await q.order_by('create_time').offset(pick_index).first()
updated = await Info.filter(id=item.id, status=False).using_db(conn).update(status=True)
if updated == 0:
raise HTTPException(status_code=400, detail='并发冲突,未更新')
return item