Skip to main content

接口开发

API概述

API,全称为 应用程序编程接口(Application Programming Interface),是一组定义了不同软件组件之间如何交互的规则和协议。API 允许不同的软件系统通过预定义的接口进行通信,使得开发者能够利用现有的功能和服务,而无需从头开始构建。这种模块化的设计不仅提高了开发效率,还促进了软件生态系统的扩展和创新。

一个典型的 API 包含以下几个关键组成部分:

  • 请求(Request): 客户端向服务器发送的调用请求,通常包含请求的方法、URL、头信息和参数。
  • 响应(Response): 服务器对客户端请求的返回结果,包含状态码、头信息和数据载体。
  • 端点(Endpoint): API 提供的具体功能接口的 URL 地址。
  • 方法(Method): 定义在特定端点上可以执行的操作,如 GET、POST、PUT、DELETE 等。
  • 数据格式(Data Format): 传输数据的格式,常见的有 JSON、XML 等。

接口标准

特性OpenAI API 风格MCPOpenAPI
类型商业 API 设计开放通信协议开放 API 描述规范
是否开放治理❌(OpenAI 控制)✅(社区驱动)✅(Linux 基金会)
是否可自由实现⚠️(法律风险模糊)✅(MIT 许可)
是否有正式规范文档✅(但非标准组织发布)✅(mcp.spec)✅(openapis.org)
目标调用 LLMLLM 与工具通信描述 RESTful API

开发框架

任何一家公司都可以定义符合自己公司业务需求的 API,但是他们可能使用的都是同一套开发框架在 Python 中,有多种接口开发框架可供选择:

维度FastAPIDjango
发布时间2018 年2005 年
开发语言PythonPython
架构风格微框架,基于 Starlette 和 Pydantic大型框架,支持全栈开发,内置 ORM、Admin 等
性能高,基于 ASGI,支持异步特性中等,依赖于 WSGI
异步支持原生支持,使用 async/await通过 Django 3.0+ 开始支持异步(仍在逐步完善)
数据验证内置 Pydantic,支持强类型和自动验证依赖于表单类(如 Forms 和 DRF 的 serializers)
自动文档生成内置支持 OpenAPI 和 Swagger UI不原生支持,需依赖 Django REST Framework (DRF)
学习曲线中等,需了解类型注解和异步编程较高,包含多种内置组件,适合系统性学习
社区和生态新兴框架,社区成长迅速,但生态略小最悠久且成熟,生态完善,适合大型项目
可扩展性高,强大的原生功能减少了扩展需求中等,内置组件强大,但灵活性相对较低
依赖注入支持原生支持依赖注入无原生支持
生产就绪稳定性较高,但相比 Flask 和 Django 使用年限较短成熟稳定,被广泛应用于大型生产系统
调试工具支持 Starlette 的调试功能内置调试模式,适合开发测试
使用场景高性能、异步处理或自动文档的场景大型系统开发,支持复杂业务逻辑和多用户系统
ORM 支持无原生支持,需借助 Tortoise-ORM 等原生支持,提供 Django ORM
Admin 界面无,需要自行开发原生支持,提供强大的 Admin 管理后台
支持的 Python 版本Python 3.6+Python 3.8+
对类型注解的支持原生支持,强类型友好不直接支持,但可结合第三方库使用

FastAPI

info

FastAPI的作者 Tiangolo(真名 Sebastián Ramírez)是一位来自哥伦比亚、目前居住在德国柏林的著名开源软件工程师。在 7 岁时就从天主教学校退学了。当时的原因非常有趣:因为他总是问太多的问题,被老师认为“太难对付”。

在退学之后,他的父母并没有强迫他回到传统学校,而是支持他居家自学。涵盖了算法、机器学习、密码学和人工智能等核心计算机科学领域。甚至连英语也是通过在线资源自学的。

他在 23 岁左右曾感到有些迷茫,因为当时他虽然已经掌握了大量深奥的知识,但由于没有正式学历和文凭,他在进入职场初期感到有些不自信。然而,他最终凭借在 GitHub 上的开源贡献和扎实的技术实力证明了自己,先后在迪拜和德国柏林的知名科技公司(如 Explosion AI 和 Forethought)担任高级/首席工程师。

目前的核心工作是全职开源维护者。

安装

  • pip install fastapi:只安装核心框架。

  • pip install "fastapi[standard]":安装核心框架的同时,自动捆绑安装开发一个标准项目所需的常用工具,包括:Uvicorn、FastAPI CLI、HTTPX、Pydantic

方括号语法是 “可选依赖组”(Optional Dependencies / Extras)。你可以把它理解为增强版安装。如果你已经输入了pip install fastapi,依然可以通过pip install "fastapi[standard]"补全配件。

tip

相比之下,opencv采取的是发布多个独立包的策略:

  • opencv-python
  • opencv-python-headless
  • opencv-contrib-python
  • opencv-contrib-python-headless

opencv的几个包是互斥的。装了 A 就不能装 B。一旦安装了多个,必须全部卸载,再单独安装你需要的版本。

ASGI

FastAPI 是一个现代的、快速的(高性能)、功能强大的 Web 框架,用于构建 API。运行在 ASGI 服务器上。

  • A (Asynchronous) :异步。支持 async/await 特性,允许程序在等待 IO(如读数据库)时去处理其他请求。
  • S (Server) :服务器。
  • G (Gateway) :网关。
  • I (Interface) :接口/标准。

ASGI 服务器技术选型

Uvicorn:目前最流行的轻量级、超快速的 ASGI 服务器。

单独安装方式:pip install uvicorn

Daphne:最早的 ASGI 服务器(由 Django 团队开发)。

Hypercorn:支持 HTTP/2 和 HTTP/3 的 ASGI 服务器。

接口文档

FastAPI 最受开发者欢迎的特性之一就是它原生内置了两套完全自动生成的交互式文档。你不需要写一行额外的文档代码,只要你写好了 API 逻辑,文档就实时生成了。

这两套文档分别是 Swagger UI 和 ReDoc。

FastAPI 并不是硬生生写了两套网页。它的逻辑是:

  1. 解析代码 :FastAPI 扫描你的路径、参数、Pydantic 模型。
  2. 生成 OpenAPI 规范 :它会自动生成一个符合国际标准 (OpenAPI/Swagger) 的 JSON 架构文件 (你可以访问 http://127.0.0.1:8000/openapi.json 看到它)。
  3. 渲染 UI :Swagger UI 和 ReDoc 只是两个“皮肤”,它们读取这个 JSON 文件并将其可视化。

Swagger UI

访问地址:http://127.0.0.1:8000/docs

核心用途:调试与测试。

直接测试 (Try it out):你不需要安装 Postman 或使用 curl 命令。点击 API 展开,点击 "Try it out",填入参数,直接就能看到服务器返回的结果。

实时校验:如果你定义的 Pydantic 模型要求某个字段是 int,你在 Swagger 里输入字符串,它会立刻变红提示你错误。

ReDoc

访问地址:http://127.0.0.1:8000/redoc

核心用途:阅读与归档。

极其整洁:它采用经典的三栏式布局(导航、说明、代码示例),非常适合给前端同学或第三方合作伙伴阅读。

基础用法

from fastapi import FastAPI, Header, HTTPException, Query, Path
from pydantic import BaseModel
from typing import Optional, Dict

app = FastAPI()

# Pydantic Models
class PostExampleModel(BaseModel):
title: Optional[str] = "默认标题"
content: Optional[str] = "默认内容"

class PutExampleModel(BaseModel):
name: Optional[str] = "默认名称"
description: Optional[str] = "默认描述"

class ApiResourcePostModel(BaseModel):
key: str

class ApiResourcePutModel(BaseModel):
field: str

# GET /get_example/{user_id}
@app.get("/get_example/{user_id}")
def get_example(
user_id: int,
name: Optional[str] = Query("默认名字"),
user_agent: Optional[str] = Header("未知")
):
"""
**方法:** GET

**示例请求:**
http:day3.ipynb
GET /get_example/123?name=Alice HTTP/1.1
Host: example.com
User-Agent: Mozilla/5.0

**示例响应:**
json
{
"user_id": 123,
"name": "Alice",
"user_agent": "Mozilla/5.0"
}

"""
return {"user_id": user_id, "name": name, "user_agent": user_agent}

# POST /post_example
@app.post("/post_example")
def post_example(
data: PostExampleModel,
authorization: Optional[str] = Header("无授权")
):
"""
**方法:** POST

**示例请求:**
http
POST /post_example HTTP/1.1
Host: example.com
Content-Type: application/json
Authorization: Bearer token

{
"title": "示例标题",
"content": "示例内容"
}

**示例响应:**
json
{
"title": "示例标题",
"content": "示例内容",
"authorization": "Bearer token"
}

"""
return {"title": data.title, "content": data.content, "authorization": authorization}

# PUT /put_example/{item_id}
@app.put("/put_example/{item_id}")
def put_example(
item_id: int,
data: PutExampleModel,
x_custom_header: Optional[str] = Header("无自定义头")
):
"""
**方法:** PUT

**示例请求:**
http
PUT /put_example/456 HTTP/1.1
Host: example.com
Content-Type: application/x-www-form-urlencoded
X-Custom-Header: 自定义值

name=新名称&description=新描述

**示例响应:**
json
{
"item_id": 456,
"name": "新名称",
"description": "新描述",
"custom_header": "自定义值"
}

"""
return {
"item_id": item_id,
"name": data.name,
"description": data.description,
"custom_header": x_custom_header,
}

# DELETE /delete_example
@app.delete("/delete_example")
def delete_example(
confirm: bool = Query(False),
authorization: Optional[str] = Header("无授权")
):
"""
**方法:** DELETE

**示例请求(确认删除):**
http
DELETE /delete_example?confirm=true HTTP/1.1
Host: example.com
Authorization: Bearer token

**示例响应(确认删除):**
json
{
"deleted": true,
"authorization": "Bearer token"
}


**示例请求(未确认删除):**
http
DELETE /delete_example HTTP/1.1
Host: example.com

**示例响应(未确认删除):**
json
{
"message": "删除未确认"
}

HTTP 状态码:400
"""
if not confirm:
raise HTTPException(status_code=400, detail="删除未确认")
return {"deleted": confirm, "authorization": authorization}

# GET /api/resource/{resource_id}
@app.get("/api/resource/{resource_id}")
def api_resource_get(
resource_id: str,
detail: Optional[str] = Query("basic")
):
"""
**方法:** GET

**请求示例:**
GET /api/resource/abc123?detail=full HTTP/1.1
Host: example.com

**响应示例:**
json
{
"method": "GET",
"resource_id": "abc123",
"detail": "full"
}

"""
return {"method": "GET", "resource_id": resource_id, "detail": detail}

# POST /api/resource/{resource_id}
@app.post("/api/resource/{resource_id}")
def api_resource_post(
resource_id: str,
data: ApiResourcePostModel
):
"""
**方法:** POST

**请求示例:**
POST /api/resource/abc123 HTTP/1.1
Host: example.com
Content-Type: application/json

{
"key": "value"
}

**响应示例:**
json
{
"method": "POST",
"resource_id": "abc123",
"data": {
"key": "value"
}
}

"""
return {"method": "POST", "resource_id": resource_id, "data": data.dict()}

# PUT /api/resource/{resource_id}
@app.put("/api/resource/{resource_id}")
def api_resource_put(
resource_id: str,
data: ApiResourcePutModel
):
"""
**方法:** PUT

**请求示例:**
PUT /api/resource/abc123 HTTP/1.1
Host: example.com
Content-Type: application/x-www-form-urlencoded

field=new%20value

**响应示例:**
json
{
"method": "PUT",
"resource_id": "abc123",
"data": {
"field": "new value"
}
}

"""
return {"method": "PUT", "resource_id": resource_id, "data": data.dict()}

# DELETE /api/resource/{resource_id}
@app.delete("/api/resource/{resource_id}")
def api_resource_delete(
resource_id: str,
confirm: bool = Query(False)
):
"""
**方法:** DELETE

**请求示例:**
DELETE /api/resource/abc123?confirm=true HTTP/1.1
Host: example.com

**响应示例:**
json
{
"method": "DELETE",
"resource_id": "abc123",
"confirm": true
}

"""
return {"method": "DELETE", "resource_id": resource_id, "confirm": confirm}

if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

下面是异步函数作为 FastAPI 的返回值的示例

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import random

app = FastAPI()

async def async_range(n):
for i in range(n):
yield f"<p>{i}</p>"
await asyncio.sleep(random.randint(1, 3) * 1)

@app.get("/stream")
async def stream_numbers():
return StreamingResponse(async_range(5), media_type="html", background=None)

if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
tip

如何验证 FastAPI 的真实并发能力?

通过代码而不是浏览器。如果你在 同一个浏览器 (比如 Chrome)中打开两个标签页,同时请求同一个 URL:

  • 浏览器的锁定机制 :大多数现代浏览器为了防止过度消耗服务器资源,会对完全相同的 URL 进行连接锁定。它会等待第一个请求完成后,才发出第二个。

数据流

在 Web 接口请求中,数据主要分布

数据来源 (Location)浏览器/HTTP 格式FastAPI 参数定义方式Python 转换后的类型典型用途
Path (路径)/users/10user_id: intint定位特定资源
Query (查询)?q=fast&page=1q: str, page: intstr,int搜索、排序、分页
Body (JSON){"id": 1, "name": "AI"}item: Item(BaseModel)Pydantic Model提交复杂业务数据
Body (Form)user=me&pw=123user: str = Form()str传统表单登录
File (文件)二进制流 (Multipart)file: UploadFileSpooledTemporaryFile上传头像、文档
Header (请求头)Authorization: Bearer...token: str = Header()str鉴权、设备信息
Cookiesession=xyzid: str = Cookie()str身份追踪

接口除了可以传输文本数据,还可传输图片、视频等数据,OpenCV 可以捕获屏幕,将两者结合起来,实现内网直播功能,可以在局域网内通过浏览器观看屏幕共享。


import os
from importlib import import_module
from fastapi import FastAPI
from fastapi.responses import HTMLResponse, StreamingResponse
import uvicorn

from io import BytesIO
import cv2
from PIL import ImageGrab, Image
import time
import threading
from threading import get_ident


class CameraEvent(object):
def __init__(self):
self.events = {}

def wait(self):
ident = get_ident()
if ident not in self.events:
self.events[ident] = [threading.Event(), time.time()]
return self.events[ident][0].wait()

def set(self):
now = time.time()
remove = None
for ident, event in self.events.items():
if not event[0].isSet():
event[0].set()
event[1] = now
else:
if now - event[1] > 5:
remove = ident
if remove:
del self.events[remove]

def clear(self):
self.events[get_ident()][0].clear()


class BaseCamera(object):
thread = None
frame = None
last_access = 0
event = CameraEvent()

def __init__(self):
if BaseCamera.thread is None:
BaseCamera.last_access = time.time()

BaseCamera.thread = threading.Thread(target=self._thread)
BaseCamera.thread.start()

while self.get_frame() is None:
time.sleep(0)

def get_frame(self):
BaseCamera.last_access = time.time()

BaseCamera.event.wait()
BaseCamera.event.clear()

return BaseCamera.frame

@staticmethod
def frames():
raise RuntimeError('Must be implemented by subclasses.')

@classmethod
def _thread(cls):
print('Starting camera thread.')
frames_iterator = cls.frames()
for frame in frames_iterator:
BaseCamera.frame = frame
BaseCamera.event.set()
time.sleep(0)
if time.time() - BaseCamera.last_access > 10:
frames_iterator.close()
print('Stopping camera thread due to inactivity.')
break
BaseCamera.thread = None



class Camera(BaseCamera):
video_source = 0

@staticmethod
def set_video_source(source):
Camera.video_source = source

@staticmethod
def frames():
camera = cv2.VideoCapture(Camera.video_source)
if not camera.isOpened():
raise RuntimeError('Error')

while True:
image = ImageGrab.grab() # 获取屏幕数据
# w, h = image.size
image = image.resize((1366, 750), Image.LANCZOS) # 图片缩放
output_buffer = BytesIO() # 创建二进制对象
image.save(output_buffer, format='JPEG', quality=100) # quality提升图片分辨率
frame = output_buffer.getvalue() # 获取二进制数据
yield frame # 生成器返回一张图片的二进制数据

app = FastAPI()


@app.get('/', response_class=HTMLResponse)
async def index():
"""
视图函数
:return:
"""
return '''<html>

<head>
<title>屏幕共享</title>
</head>

<body>
<img src="/video_feed">
</body>

</html>'''


def gen(camera):
"""
流媒体发生器
"""
while True:
frame = camera.get_frame()

yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')


@app.get('/video_feed')
async def video_feed():
"""流媒体数据"""
return StreamingResponse(gen(Camera()),
media_type='multipart/x-mixed-replace; boundary=frame')


if __name__ == '__main__':
ip_host = '127.0.0.1' # 本机ip地址
ip_host2 = '0.0.0.0' # 内网ip地址
import webbrowser
webbrowser.open(f'http://{ip_host}:80')
uvicorn.run(app, host=ip_host2, port=80)

并发执行环境

async def 内部使用 同步库

async def 函数中使用同步的 requests.get() 而非 httpx.get() 时会发生阻塞。

会阻塞整个事件循环(Event Loop)。async def 函数在事件循环中执行,而 requests 是同步阻塞库。FastAPI 使用单线程事件循环处理所有异步任务,在异步函数中执行同步阻塞操作会导致整个服务器的其他异步请求都被阻塞。在异步函数中阻塞 2 秒,所有其他异步请求也会被阻塞 2 秒。

普通的 def 内部使用 同步库

使用普通的 def(同步函数)时,FastAPI 会自动将该函数放入 External Thread Pool(外部线程池)执行。

在线程池未满时不影响性能,线程池满时会排队等待。虽然会阻塞线程池中的线程,但不会阻塞事件循环,其他异步请求仍可正常处理。

在 FastAPI 中,使用同步库(如 requestspyodbc)时,应使用 def 而不是 async def

tip

对于 IO 密集型任务(如网络请求、数据库查询),优先使用 async def 配合异步库(如 httpxasyncpg)。

对于 CPU 密集型任务或必须使用同步库的场景,使用 def 让 FastAPI 自动将其放入线程池执行。

依赖注入

Depends 是 FastAPI 的核心特性,用于实现代码解耦和业务逻辑分离。

最佳实践是将鉴权、数据库连接等横切关注点从业务逻辑函数中分离,抽取为依赖函数,通过 Depends 注入。

用户鉴权示例

为 API 添加用户鉴权功能:

所有 /admin 开头的接口必须在 Header 中带上 X-Token

Token 不正确时返回 403。

Token 正确时,路径函数可直接获取解析后的 user_name

from fastapi import FastAPI, Depends, Header, HTTPException

app = FastAPI()

# 1. 定义依赖项
async def verify_admin_token(x_token: str = Header(...)):
if x_token != "super-secret-token":
raise HTTPException(status_code=403, detail="Token 错误!")
return "Admin_User" # 返回值可以被后续函数直接使用

# 2. 在接口中注入
@app.get("/admin/db-backup")
async def backup(user: str = Depends(verify_admin_token)):
# 只有通过了 verify_admin_token,才会执行到这里
# user 的值就是 "Admin_User"
return {"message": f"Hello {user}, backup started..."}

优势:对于多个管理接口,只需在每个函数中添加 Depends,也可全局挂载到路由组(Router)。

Depends 的执行顺序

Depends 的执行顺序是先运行依赖函数,再进入路径函数。

  1. 扫描依赖:FastAPI 看到路径函数声明了 Depends(verify_admin_token)
  2. 前置执行:FastAPI 先调用 verify_admin_token,检查 Header,运行函数逻辑
  3. 异常中断:如果在 verify_admin_token 里触发了 raise HTTPException,流程会直接中断,返回响应给前端。路径函数不会被执行
  4. 注入结果:如果运行成功,verify_admin_token 的返回值会被赋值给路径函数的参数
  5. 业务逻辑:最后才运行路径函数内部的代码

依赖缓存机制

当接口依赖 A 和 B,且 A 和 B 都依赖数据库连接 get_db 时,FastAPI 默认在同一请求中只执行一次 get_db,并将结果缓存后分发给 A 和 B,节省资源。

Depends 支持嵌套。verify_admin_token 也可以依赖其他函数(如 get_db)。

数据库选型建议

承载 10k+ QPS 时,应选择异步 ORM(如 Tortoise-ORM 或 SQLAlchemy 2.0 async)而非同步 SQLAlchemy。

  • 同步 ORM:必须配合 def 使用。FastAPI 会使用线程池,但线程资源昂贵(每个线程占用数 MB 内存),大量线程可能导致服务器内存不足。
  • 异步 ORM:配合 async def 使用。不占用额外线程,在等待数据库返回时让出 CPU 处理其他请求。这是 FastAPI 高性能的关键。

Pydantic 的进阶数据校验

在注册用户接口中,需要接收 passwordconfirm_password,并在 Pydantic 模型中校验这两个字段必须完全一致。

校验失败时,FastAPI 应返回自定义错误信息(如 "两次密码不一致")。

实现字段间校验需要使用 model_validator

from pydantic import BaseModel, model_validator

class RegisterUser(BaseModel):
username: str
password: str
confirm_password: str

# mode='after' 表示数据初步解析完后再跑这个校验
@model_validator(mode='after')
def check_passwords_match(self):
pw1 = self.password
pw2 = self.confirm_password
if pw1 != pw2:
raise ValueError("两次输入的密码不一致!")
return self

当 Pydantic 抛出 ValueError 时,FastAPI 会自动捕获并转换成 422 Unprocessable Entity 错误返回给前端,并附带清晰的 JSON 错误说明。

中间件

中间件在请求被路径函数处理之前,以及响应返回之后运行。

中间件是一个在 每个请求 被路径函数处理之前,以及在 每个响应 返回之后运行的逻辑。

它的执行逻辑像一个“洋葱”:

Request 进来 \rightarrow 经过中间件(记录时间、检查黑名单)。

到达目的地 \rightarrow 执行你的路径函数(Business Logic)。

Response 出去 \rightarrow 再次经过中间件(添加自定义 Header、计算耗时)。

import time
from fastapi import FastAPI, Request

app = FastAPI()

@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
# === 1. 请求到达前执行 ===
start_time = time.perf_counter()

# === 2. 传递给后面的逻辑 (路径函数或其他中间件) ===
response = await call_next(request)

# === 3. 响应返回前执行 ===
process_time = time.perf_counter() - start_time
# 在响应头里加个“耗时”字段
response.headers["X-Process-Time"] = str(process_time)

return response
特性中间件 (Middleware)依赖注入 (Depends)
执行范围全局。只要挂载了,所有接口都会跑。局部或特定路由。可以精细控制哪个接口用。
访问权限只能拿到原始 Request 对象,很难拿到 Pydantic 解析后的数据。全能。能拿到解析后的参数、模型、甚至其他依赖。
灵活性适合做:CORS(跨域)、Gzip压缩、日志统计、全局异常捕获。适合做:身份验证、数据库连接、权限校验。
异步支持必须是异步的。可以是同步 def 也可以是异步 async def。

中间件的执行顺序

中间件的执行顺序是:最后添加的中间件最先处理请求,最先添加的中间件最后处理响应

如果一个中间件直接返回响应(没有执行 await call_next(request)),那么内部的中间件和路径函数都不会被执行。

from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

app = FastAPI()

@app.middleware("http")
async def blacklist_middleware(request: Request, call_next):
user_ip = request.client.host
if user_ip in ["1.2.3.4"]:
# 如果是黑名单,直接返回 403,不会执行 call_next
return JSONResponse(content={"detail": "You are banned"}, status_code=403)

# 如果不是黑名单,继续执行
response = await call_next(request)
return response
tip

如果黑名单是基于 IP(简单判断),用 Middleware 或 Nginx 层解决。

如果黑名单是基于用户 ID(涉及业务逻辑),用 Depends 挂在全局路由上。

异常处理器

在项目中,为了避免在每个地方都写 try...except,需要在程序崩溃或报错时返回统一格式的 JSON。

FastAPI 提供了 @app.exception_handler 装饰器来实现统一异常捕获。

from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

app = FastAPI()

# 定义一个自定义异常类
class MyBusinessException(Exception):
def __init__(self, name: str):
self.name = name

# 统一捕获这个异常
@app.exception_handler(MyBusinessException)
async def my_exception_handler(request: Request, exc: MyBusinessException):
return JSONResponse(
status_code=418,
content={"message": f"Oops! {exc.name} 触发了业务逻辑错误"},
)

@app.get("/test/{name}")
async def test_exception(name: str):
if name == "error":
# 抛出异常后,FastAPI 会自动去上面找对应的 handler
raise MyBusinessException(name=name)
return {"message": "Success"}

BackgroundTasks

高并发文件处理系统场景:用户上传大视频文件(如 1GB),系统需要对视频进行转码(CPU 密集型操作,耗时数分钟)。

系统需要立即返回"正在处理中",而非让用户等待转码完成。

BackgroundTasks vs Celery

对于这类场景,主要有以下几种选择:

  • A. 在 async def 里直接写转码逻辑:❌ 不适合,会阻塞整个事件循环
  • B. 在 def 里写转码逻辑,靠线程池:⚠️ 不能立刻返回"正在处理中"
  • C. 使用 BackgroundTasks(FastAPI 内置后台任务):✅ 适合中小型项目
  • D. 使用 Celery + Redis(分布式任务队列):✅ 适合大型/生产项目

BackgroundTasks 的优势与局限

BackgroundTasks 是 FastAPI 内置的功能,非常轻量。

在 Response("正在处理中")发送给用户之后,在同一 Python 进程内启动任务继续执行。

优点:

  • 不需要额外安装 Redis 或 RabbitMQ
  • 代码简洁
  • 可以立刻返回响应给用户

局限性:任务仍在 Web 服务器进程内执行。转码视频占用 CPU 较高,多个并发转码任务可能导致 Web 服务器 CPU 占用率升至 100%,影响其他 API 请求的响应速度。

代码示例

from fastapi import FastAPI, BackgroundTasks, UploadFile, File
from fastapi.responses import JSONResponse
import time
import os

app = FastAPI()

def process_video(file_path: str, output_path: str):
"""
模拟视频转码任务(CPU 密集型操作)
注意:这里使用 def 而不是 async def,因为转码是 CPU 密集型任务
"""
print(f"开始处理视频: {file_path}")
try:
# 模拟耗时操作(实际场景中这里会是视频转码逻辑,如使用 ffmpeg)
time.sleep(5) # 模拟转码耗时

# 模拟转码后的操作(如保存转码后的文件)
print(f"视频转码完成: {file_path} -> {output_path}")

# 实际场景中可能还需要:
# - 更新数据库状态
# - 发送通知给用户
# - 清理临时文件等
except Exception as e:
print(f"转码失败: {e}")

@app.post("/upload-video")
async def upload_video(
background_tasks: BackgroundTasks,
file: UploadFile = File(...)
):
"""
上传视频文件,立即返回响应,后台处理转码

工作流程:
1. 接收文件并保存到临时目录
2. 将转码任务添加到后台任务队列
3. 立即返回"正在处理中"的响应
4. 后台任务在响应发送后开始执行
"""
# 保存上传的文件
file_path = f"/tmp/{file.filename}"
os.makedirs("/tmp", exist_ok=True)

with open(file_path, "wb") as f:
content = await file.read()
f.write(content)

# 定义输出路径
output_path = f"/tmp/transcoded_{file.filename}"

# 添加后台任务:在响应返回后执行
background_tasks.add_task(process_video, file_path, output_path)

# 立即返回响应,不等待转码完成
return JSONResponse({
"message": "视频已上传,正在处理中",
"filename": file.filename,
"status": "processing",
"note": "转码完成后会通过其他方式通知您"
})

关键点说明:

  1. background_tasks: BackgroundTasks:通过依赖注入获取 BackgroundTasks 实例
  2. background_tasks.add_task():添加后台任务,第一个参数是函数,后续参数是函数的参数
  3. 立即返回响应:在添加后台任务后立即返回,不等待任务完成
  4. 任务执行时机:后台任务会在响应发送给客户端之后才开始执行

测试方法:

# 使用 curl 上传文件
curl -X POST "http://127.0.0.1:8000/upload-video" \
-F "file=@your_video.mp4"

# 响应会立即返回,但转码任务会在后台继续执行,可观察fastapi输出。

完成任务后通知用户的方式可以有多种,可以设置进程轮询定时查询任务状态,也可以使用WebSocket实时通知。

注意事项:

  • 后台任务函数可以是 defasync def
  • 如果任务失败,不会影响已返回的响应
  • 任务在同一个 Python 进程中执行,不会跨进程或跨服务器
  • 适合轻量级任务,对于重量级任务建议使用 Celery

在上面的示例中,upload_video 函数接收文件后立即返回响应,而 process_video 函数会在响应发送给客户端后,在后台执行。

Celery + Redis 的优势

Web 服务器将任务指令放入 Redis 队列后立即返回。由专门的 Worker 进程(可在其他服务器上)处理耗时的转码任务。

优点:

  • 解耦:无论视频转码多慢,都不会影响 API 服务器的响应速度
  • 分布式:支持多台服务器处理任务,抗压能力强
  • 可扩展:可以根据任务量动态增加 Worker 数量

适用场景:

  • 中小型项目、任务量不大:使用 BackgroundTasks
  • 大型项目、高并发、需要分布式处理:使用 Celery + Redis

MCP

Model Context Protocol(MCP) 是一种开放协议,旨在标准化应用程序与大型语言模型(LLM)之间的上下文交互。

MCP最初设计的对象是各类AI客户端(如 Claude Desktop、Cursor),客户端通过配置文件连接 MCP 服务器

MCP 服务器可以暴露多种类型的功能:

  • Resources(资源): 只读数据源,如文件内容、数据库查询结果等
  • Tools(工具): LLM 可以调用的函数,用于执行操作或获取动态数据
  • Prompts(提示词): 预定义的提示词模板,可供 LLM 使用
  • Sampling(采样): 工具可反向调用大模型为自己生成一些内容

并不是所有的主要类型功能都被支持,其中Tools支持范围最广。

为了更快速的构建MCP,可以使用FastMCP 处理所有复杂的协议细节,专注于构建。大多数情况下,只需装饰一个 Python 函数即可,FastMCP 会处理剩下的工作。

参考地址:https://github.com/jlowin/fastmcp

代码示例

file_server.py
"""
FastMCP Echo Server
"""

from fastmcp import FastMCP

# Create server
mcp = FastMCP("Echo Server")


@mcp.tool
def echo_tool(text: str) -> str:
"""Echo the input text"""
return text


@mcp.resource("echo://static")
def echo_resource() -> str:
return "Echo!"


@mcp.resource("echo://{text}")
def echo_template(text: str) -> str:
"""Echo the input text"""
return f"Echo: {text}"


@mcp.prompt("echo")
def echo_prompt(text: str) -> str:
return text

配置文件

上面的代码我部署在https://fastmcp.cloud/中,下面的地址是其为我随机分配的。填入cursor后登录验证即可。

{
"mcpServers": {
"fastmcp": {
"url": "https://magnificent-crimson-antlion.fastmcp.app/mcp",
"transport": "http"
}
}
}

如果是通过放在本地,则只需要配置全局的fastmcp,确保路径正确即可。

{
"mcpServers": {
"local-echo-server": {
"command": "fastmcp",
"args": ["run", "C:\\Users\\allen\\Desktop\\MCP\\server.py"],
"transport": "stdio"
}
}
}

更多信息请参考 MCP 官方文档PyPI 包页面

Django

基础用法

Django 的接口开发,需要先创建项目和应用,然后创建视图函数,最后在 urls.py 中配置路由。

虽然 Django 的接口开发比较繁琐,但是 Django 的接口开发比较规范,易于维护。

安装 Django 和 Django REST framework

pip install django djangorestframework

创建项目和应用

django-admin startproject myproject
cd myproject
django-admin startapp apiapp

项目结构如下:

myproject/
├── apiapp/
│ ├── __init__.py
│ ├── views.py
│ ├── urls.py
│ └── serializers.py
├── myproject/
│ ├── __init__.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
└── manage.py

myproject/settings.py 中添加 rest_frameworkapiappINSTALLED_APPS 列表中:

INSTALLED_APPS = [
... # 保持原有配置
'rest_framework',
'apiapp',
]

定义序列化器,有助于验证和解析输入数据。

apiapp/serializers.py
from rest_framework import serializers

class PostExampleSerializer(serializers.Serializer):
title = serializers.CharField(default="默认标题", required=False)
content = serializers.CharField(default="默认内容", required=False)

class PutExampleSerializer(serializers.Serializer):
name = serializers.CharField(default="默认名称", required=False)
description = serializers.CharField(default="默认描述", required=False)

class ApiResourcePostSerializer(serializers.Serializer):
key = serializers.CharField()

class ApiResourcePutSerializer(serializers.Serializer):
field = serializers.CharField()

定义视图函数

apiapp/views.py
from rest_framework import status
from rest_framework.decorators import api_view
from rest_framework.response import Response
from rest_framework.views import APIView

from .serializers import (
PostExampleSerializer,
PutExampleSerializer,
ApiResourcePostSerializer,
ApiResourcePutSerializer,
)

@api_view(['GET'])
def get_example(request, user_id):
"""
**方法:** GET

**示例请求:**
GET /get_example/123?name=Alice HTTP/1.1
Host: example.com
User-Agent: Mozilla/5.0

**示例响应:** ```json
{
"user_id": 123,
"name": "Alice",
"user_agent": "Mozilla/5.0"
}
"""
name = request.query_params.get('name', '默认名字')
user_agent = request.headers.get('User-Agent', '未知')
return Response({"user_id": user_id, "name": name, "user_agent": user_agent})

@api_view(['POST'])
def post_example(request):
"""
**方法:** POST

**示例请求:**
POST /post_example HTTP/1.1
Host: example.com
Content-Type: application/json
Authorization: Bearer token

{
"title": "示例标题",
"content": "示例内容"
}

**示例响应:** ```json
{
"title": "示例标题",
"content": "示例内容",
"authorization": "Bearer token"
}
"""
serializer = PostExampleSerializer(data=request.data)
if serializer.is_valid():
title = serializer.validated_data.get('title', '默认标题')
content = serializer.validated_data.get('content', '默认内容')
auth = request.headers.get('Authorization', '无授权')
return Response({"title": title, "content": content, "authorization": auth})
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

@api_view(['PUT'])
def put_example(request, item_id):
"""
**方法:** PUT

**示例请求:**
PUT /put_example/456 HTTP/1.1
Host: example.com
Content-Type: application/x-www-form-urlencoded
X-Custom-Header: 自定义值

name=新名称&description=新描述

**示例响应:** ```json
{
"item_id": 456,
"name": "新名称",
"description": "新描述",
"custom_header": "自定义值"
}
"""
serializer = PutExampleSerializer(data=request.data)
if serializer.is_valid():
name = serializer.validated_data.get('name', '默认名称')
description = serializer.validated_data.get('description', '默认描述')
custom_header = request.headers.get('X-Custom-Header', '无自定义头')
return Response({
"item_id": item_id,
"name": name,
"description": description,
"custom_header": custom_header,
})
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

@api_view(['DELETE'])
def delete_example(request):
"""
**方法:** DELETE

**示例请求(确认删除):**
DELETE /delete_example?confirm=true HTTP/1.1
Host: example.com
Authorization: Bearer token

**示例响应(确认删除):** ```json
{
"deleted": true,
"authorization": "Bearer token"
} ```

**示例请求(未确认删除):**
DELETE /delete_example HTTP/1.1
Host: example.com

**示例响应(未确认删除):** ```json
{
"message": "删除未确认"
} ```
HTTP 状态码:400
"""
confirm = request.query_params.get('confirm', 'false').lower() == 'true'
auth = request.headers.get('Authorization', '无授权')
if not confirm:
return Response({"message": "删除未确认"}, status=status.HTTP_400_BAD_REQUEST)
return Response({"deleted": confirm, "authorization": auth})

class ApiResourceView(APIView):
"""
**方法:** GET, POST, PUT, DELETE

**GET 请求示例:**
GET /api/resource/abc123?detail=full HTTP/1.1
Host: example.com

**GET 响应示例:** ```json
{
"method": "GET",
"resource_id": "abc123",
"detail": "full"
}
"""

def get(self, request, resource_id):
detail = request.query_params.get('detail', 'basic')
return Response({"method": "GET", "resource_id": resource_id, "detail": detail})

def post(self, request, resource_id):
"""
**POST 请求示例:**
POST /api/resource/abc123 HTTP/1.1
Host: example.com
Content-Type: application/json

{
"key": "value"
}

**POST 响应示例:** ```json
{
"method": "POST",
"resource_id": "abc123",
"data": {
"key": "value"
}
} ```
"""
serializer = ApiResourcePostSerializer(data=request.data)
if serializer.is_valid():
data = serializer.validated_data
return Response({"method": "POST", "resource_id": resource_id, "data": data})
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

def put(self, request, resource_id):
"""
**PUT 请求示例:**
PUT /api/resource/abc123 HTTP/1.1
Host: example.com
Content-Type: application/x-www-form-urlencoded

field=new%20value

**PUT 响应示例:** ```json
{
"method": "PUT",
"resource_id": "abc123",
"data": {
"field": "new value"
}
} ```
"""
serializer = ApiResourcePutSerializer(data=request.data)
if serializer.is_valid():
data = serializer.validated_data
return Response({"method": "PUT", "resource_id": resource_id, "data": data})
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

def delete(self, request, resource_id):
"""
**DELETE 请求示例:**
DELETE /api/resource/abc123?confirm=true HTTP/1.1
Host: example.com

**DELETE 响应示例:** ```json
{
"method": "DELETE",
"resource_id": "abc123",
"confirm": true
} ```
"""
confirm = request.query_params.get('confirm', 'false').lower() == 'true'
return Response({"method": "DELETE", "resource_id": resource_id, "confirm": confirm})

apiapp/urls.py 中配置路由

apiapp/urls.py
from django.urls import path
from . import views

urlpatterns = [
path('get_example/<int:user_id>/', views.get_example, name='get_example'),
path('post_example/', views.post_example, name='post_example'),
path('put_example/<int:item_id>/', views.put_example, name='put_example'),
path('delete_example/', views.delete_example, name='delete_example'),
path('api/resource/<str:resource_id>/', views.ApiResourceView.as_view(), name='api_resource'),
]

myproject/urls.py 中包含 apiapp.urls

myproject/urls.py
from django.contrib import admin
from django.urls import path, include

urlpatterns = [
path('admin/', admin.site.urls),
path('api/', include('apiapp.urls')),
]

运行项目

python manage.py runserver