利用fastapi快速开发API接口

行云流水
2023-04-18 / 0 评论 / 252 阅读 / 正在检测是否收录...

前言

在写项目时,经常需要验证前端功能。前端需要获取后端数据,后端还没有写好。需要快速写一个接口,来给前端提供数据。利用fastapi快速给定义一个接口,和前端交互。

相关文章:利用FastAPI快速开发服务模块

过程

需要先注册路由->设计结构体->定义接口->编写处理函数

1. 注册路由

routes/api.py
api_router.include_router(pay.router, tags=["pay"])

2. 结构体

app/schemas/pay.py
from pydantic import BaseModel

class Payment(BaseModel):
    """
    订单请求
    """
    merno: str
    firstname: str
    lastname: str
    url: str = None
    ip: str
    orderno: str
    total: float
    currency: str
    email: str
    notifyurl: str
    returnurl: str
    telephone: str = None
    country: str
    state: str
    city: str
    zipcode: str
    address: str
    useragent: str = None
    goods: str
    sign: str

class PayResponse(BaseModel):
    """
    订单请求返回
    """
    code: int
    msg: str
    data: dict


class Notify(BaseModel):
    """
    订单请求
    """
    orderno: str
    total: float
    status: str
    transno: str
    sign: str

3. api接口

app/http/api/pay.py
from fastapi import APIRouter, Depends

from app.http.deps import get_db
from app.schemas.pay import Payment, PayResponse, Notify
from app.services.pay.payment import ParameterHandle, NotifyHandle

router = APIRouter(
    prefix="/pay"
)


@router.post("/submit", response_model=PayResponse)
def submit(item: Payment):
    """
    生成支付订单接口
    """
    payment = item.dict()
    para = ParameterHandle(payment)
    return para.respond()

@router.post("/notify", response_model=str)
def submit(item: Notify):
    """
    生成支付订单接口
    """
    notify = item.dict()
    noti = NotifyHandle(notify)
    return noti.respond()

4. 逻辑处理

from app.schemas.pay import Payment, Notify
from app.services.pay.superxpay import *

import logging
import json

def create_payment_response_from_submit(code, result):
    """
    构建返回函数
    """
    resp_dict = {
        "code": code,
        "msg": '成功',
        "data": result
        }

    return resp_dict

class ParameterHandle:
    def __init__(self, request_json: Payment):
        self.req_json = request_json

    def respond(self):
        rep_json = {}
        #记录请求
        logging.debug(json.dumps(self.req_json, indent=4))

        if verify_md5_sign(0, self.req_json):
            ref = get_order_ref()

            get_payurl_by_post(self.req_json, ref)

            payurl = 'http://www.testpay.url'
            logging.debug(payurl)

            rep_json['payurl']= payurl

        return create_payment_response_from_submit(1, rep_json)


class NotifyHandle:
    def __init__(self, request_json: Notify):
        self.req_json = request_json

    def respond(self):
        #记录请求
        logging.debug(json.dumps(self.req_json, indent=4))

        if verify_md5_sign(1, self.req_json):
            pay_sucess_notify_aweb(self.req_json)

        return 'SUCCESS'

评论 (0)

取消
只有登录/注册用户才可评论