Skip to main content

FastAPI

info

接口是一组定义了不同软件组件之间如何交互的规则和协议。也叫API?

API 允许不同的软件系统通过预定义的接口进行通信,使得开发者能够利用现有的功能和服务,而无需从头开始构建。

在 Python 中,有多种接口开发框架可供选择:FastAPI、Flask、Django等。目前最主流的是 FastAPI。

FastAPI的作者 Tiangolo(真名 Sebastián Ramírez)是一位来自哥伦比亚、目前居住在德国柏林的著名全职开源维护者。

在 7 岁时就从天主教学校退学了。在退学之后,他的父母并没有强迫他回到传统学校,而是支持他居家自学。涵盖了英语、算法、机器学习、密码学和人工智能等核心计算机科学领域。

他在 23 岁左右凭借在 GitHub 上的开源贡献和扎实的技术实力证明了自己,先后在迪拜和德国柏林的知名科技公司(如 Explosion AI 和 Forethought)担任高级/首席工程师。

维度FastAPIDjango
发布时间2018 年2005 年
架构风格微框架,基于 Starlette 和 Pydantic大型框架,支持全栈开发,内置 ORM、Admin 等
数据验证内置 Pydantic,支持强类型和自动验证依赖于表单类(如 Forms 和 DRF 的 serializers)
自动文档生成内置支持 OpenAPI 和 Swagger UI不原生支持,需依赖 Django REST Framework (DRF)
学习曲线中等,需了解类型注解和异步编程较高,包含多种内置组件,适合系统性学习
可扩展性高,强大的原生功能减少了扩展需求中等,内置组件强大,但灵活性相对较低
依赖注入支持原生支持依赖注入无原生支持
Admin 界面无,需要自行开发原生支持,提供强大的 Admin 管理后台
对类型注解的支持原生支持,强类型友好不直接支持,但可结合第三方库使用

安装

FastAPI 是一个运行在 ASGI? 服务器上的高性能 Web 框架,它自身并不是一个大而全的框架,它通过绑定其他框架来实现其功能。

  • 运行服务是是 Uvicorn?,它是目前最流行的轻量级、超快速的 ASGI 服务器。
  • 处理HTTP请求和响应的是starlette?,是FastAPI的底层框架,用于处理HTTP请求和响应。
  • 数据验证是 Pydantic?,是FastAPI的底层框架,通常和标准库typing一起使用。

编写FastAPI程序的过程中你会觉得自己在写通用的Python代码,非常友好,安装时推荐安装标准版。

  • pip install fastapi:只安装核心框架。

  • pip install "fastapi[standard]":安装核心框架与常用工具。

pip包名带方括号语法是 “可选依赖组”(Optional Dependencies / Extras)。你可以把它理解为增强版安装。

如果你已经输入了pip install fastapi,依然可以通过pip install "fastapi[standard]"补全常用工具配件。

info

opencv采取的是发布多个独立包的策略:

  • opencv-python
  • opencv-python-headless
  • opencv-contrib-python
  • opencv-contrib-python-headless

因为opencv的几个包是互斥的。装了 A 就不能装 B。一旦安装了多个,必须全部卸载,再单独安装你需要的版本。

接口文档

FastAPI 内置了两套根据你的代码自动实时生成的交互式文档。它的逻辑是:

tip

如果你忘记了这些地址,可以对app对象使用.openapi_url、.redoc_url、.docs_url属性获取。

同样的,如果你的希望更改这些默认地址的路由使其更有可读性,也可以通过属性设置。

import fastapi

# 更改默认的文档地址
app = fastapi.FastAPI(redoc_url="/test_doc",docs_url="/doc")
print(app.openapi_url)
print(app.redoc_url)
print(app.docs_url)

@app.get("/")
def index():
return {"messages":"Hi"}
"""
/openapi.json
/test_doc
/doc
"""

启动命令:uv run uvicorn main:app --reload

生命周期

从你构建一个一个FastAPI应用开始,到中间收发请求,最后应用关闭,这个过程就是FastAPI的请求生命周期。

阶段范畴 (Category)核心组件详细动作说明
1. 应用启动Lifespan (Start)lifespan初始化:建立数据库长连接、预加载 AI 模型、配置全局资源。
2. 请求接收RequestASGI Server接收:通过网络套接字(Socket)接收客户端请求。
3. 请求拦截RequestMiddleware解析与预检:解析 HTTP 报文,处理 CORS、GZip 解压、Session 状态还原。
4. 路由匹配RequestRouter定位:根据 URL 和 Method 找到匹配的路径函数。匹配失败则直接返回 404。
5. 依赖注入RequestDepends前置准备:执行依赖项(如 Auth 校验、DB Session 开启)。若失败则抛出异常。
6. 参数校验RequestPydantic入参验证:将数据转为模型对象并校验格式。校验失败则返回 422。
7. 业务执行ProcessingPath Function逻辑运算:运行你编写的业务代码。若产生错误,将在此处抛出异常。
8. 异常处理ProcessingExc Handler异常补救:捕获业务异常,将其转换为统一格式的 JSONResponse。
9. 数据转换Responsejsonable_encoder序列化:将 Python 对象、ORM 模型转换为 JSON 兼容的基础数据格式。
10. 响应校验Responseresponse_model出参验证:根据 Schema 强制过滤字段(如剔除密码),确保输出安全。
11. 响应拦截ResponseMiddleware后处理:最后一次修改 Header(如加签名)、计算耗时、记录访问日志。
12. 物理发送ResponseASGI Server传输:通过网络套接字(Socket)将字节流正式传回客户端。
13. 后台任务Post-ResponseBackgroundTasks扫尾:在连接断开后,异步执行耗时操作(如发送邮件、统计数据)。
14. 应用关闭Lifespan (End)lifespan释放:关闭数据库连接池,清理内存资源,确保程序优雅退出。

Lifespan

Lifespan可以将应用启动时操作和应用关闭时操作写在同一个块代码中,避免人为的遗忘。

from contextlib import asynccontextmanager
from fastapi import FastAPI

def fake_answer_to_everything_ml_model(x):
"""一个假的机器学习模型,返回一个数的2倍"""
return x * 2
"""
lifespan只接受异步的上下文管理器。

而asynccontextmanager刚好可以把生成器变成上下文管理器

yield 之前的内容等同于 with 语法执行。

yield 之后的内容等同于 with 语法结束。
"""
@asynccontextmanager
async def lifespan(app: FastAPI):
# 加载模型(提前加载好模型,每次请求时就不用再耗时加载了)
# app.state 是一个全局的状态抽屉,你可以把所有全局要用的内容追加到其属性名当中
app.state.model = fake_answer_to_everything_ml_model
yield
del app.state.model
print("安全的关闭数据库并清理内存空间")

app = FastAPI(lifespan=lifespan)

# 请求约定与路由
@app.get("/")
# 自动读取类型标注为格式验证
async def predict(x:float) :
result = app.state.model(x)
return {"result": result}

你可以通过http://127.0.0.1:8000/?x=1访问,结果是{"result": 2}

更推荐通过浏览器访问http://127.0.0.1:8000/docs

点击Try it out按钮,填写参数,点击Execute按钮,查看结果。

接收请求

前端大部分的请求是通过浏览器发送请求到后端,后端接收请求并处理,最后返回响应给前端。

tip

为了最大限度压榨计算的能力。Uvicorn可以启动多个相同的实例.

推荐通过命令行启动多work:uvicorn app:app --host 0.0.0.0 --port 8000 --workers 4

Uvicorn多Worker通过多进程,创建多个完全一样的副本,运行时不同副本完全隔离。因此全局变量的正确做法是用Redis等外部存储。

多Worker监听同一个端口,如何调度?不是Uvicorn自己实现,调度由操作系统内核完成。

请求是一种相当复杂的数据类型。

请求方法

请求方法是指浏览器发送请求到后端时,所使用的方法。浏览器直接打开某个地址,就是GET方法。

方法的本身带有一定的语义,比如:GET方法表示获取资源

方法语义带 Body幂等安全可缓存
GET获取资源❌ 否✅ 是✅ 是✅ 是
POST创建资源✅ 是❌ 否❌ 否❌ 否
PUT更新(全量)✅ 是✅ 是❌ 否❌ 否
PATCH更新(局部)✅ 是❌ 否❌ 否❌ 否
DELETE删除资源⚠️ 建议不带✅ 是❌ 否❌ 否
OPTIONS探测支持❌ 否✅ 是✅ 是❌ 否
幂等性 (Idempotence)

同一个操作执行 1 次和执行 100 次,对系统资源的影响是否一致。

  • 幂等 (Safe/Idempotent): GET:查 100 次,数据不会变。

  • 非幂等 (Non-Idempotent): POST:每点一次"提交",后端就创建一个新订单。点 100 次,产生 100 个订单。

缓存机制 (Caching)
  • GET 是可以被浏览器、CDN、代理服务器自动缓存的。

  • POST 默认不会被缓存。这保证了金融交易、账号注册等操作必须实时到达后端。

安全性 (Safe Methods)

这里的"安全"是指"是否会修改服务器数据":

  • 安全:GET理论上只读,不应该引起服务器状态变化。
  • 不安全:POST、PUT、PATCH、DELETE 是不安全的,因为它们会改变数据库。
from fastapi import FastAPI, Request

app = FastAPI()

# 创建接受单一请求方式(GET)的接口
@app.get("/")
async def index():
return {"message": "Hello, GET!"}

# 技术上同个路由可以绑定不同的方法,但并不推荐
@app.delete("/")
async def index():
return {"message": "Hello, DELETE!"}

# 等价于
@app.delete("/")
@app.get("/")
async def method(request: Request):
if request.method == "GET":
return {"message": "这是 GET 请求"}
return {"message": f"这是 {request.method} 请求"}
info

OpenAPI 规范中,在同一个 Path 下,HTTP 方法(Operation)必须是唯一的。

所以并不鼓励在同一个路由下绑定多个方法,也不推荐使用 api_route 来支持多种请求方式。

遵循规范可以避免额外的BUG。

请求URL

请求URL是指浏览器发送请求到后端时的URL。由协议、域名/主机、端口、路径、查询参数组成。

  • 常见的协议有:HTTP、HTTPS、FTP、SMTP、DNS、MQTT。
  • 当在本地开发时,域名/主机通常为localhost127.0.0.10.0.0.0表示所有IP地址。
  • 当协议为HTTP/HTTPS时,端口默认为80/443。可省略不写。
from fastapi import FastAPI, Request

app = FastAPI()

@app.get("/index")
async def catch_all(request: Request):
# 获取请求URL
"""
可以继续获取到请求URL的各个部分:
url.scheme : 协议
url.hostname : 主机
url.port : 端口
url.netloc : 网络位置(主机:端口)
url.path : 路径
url.query : 查询参数
url.is_secure : 是否使用安全连接
url.components : 组件元组(协议、主机、端口、路径、查询参数)
url.fragment : 锚点 (URL的锚点部分,以#开头,快速跳转到页面中的特定位置)

有些数据库连接URL会包含用户名和密码,如:http://admin:123456@192.168.1.10:9200
url.username : 用户名 (http://admin:123456@192.168.1.10:9200) 中的admin
url.password : 密码 (http://admin:123456@192.168.1.10:9200) 中的123456
"""
url = request.url # "http://127.0.0.1:8000/index?id=1"

# 网络与客户端信息
"""
可以获取到客户端的网络与客户端信息:
client.host : 主机
client.port : 端口
"""
client_host = request.client.host # "127.0.0.1"
client_port = request.client.port # 8000
## 获取服务器IP和端口
server_ip, server_port = request.scope.get("server") # ("127.0.0.1", 8000)
return {}

路径与查询参数

路径与查询参数本身是可以传递信息的:

  • 路径参数(强制必填)
  • 查询参数(可设默认参数)
from fastapi import FastAPI,Query
app = FastAPI()

# 路径参数
@app.get("/images_path/{image_width}/{image_height}")
async def get_image(image_width: int, image_height: int):
"""
浏览器参数格式:
http://localhost:8000/images_path/100/100

用于定位特定资源
"""
return {"image_width": image_width, "image_height": image_height}

# 查询参数
@app.get("/images_query/")
async def get_images(width: int = Query(default=100), height: int = Query(default=100)):
"""
浏览器传入参数:
http://localhost:8000/images_query?width=100&height=100

浏览器使用默认值:
http://localhost:8000/images_query/

用于搜索、排序、分页
"""
return {"width": width, "height": height}

请求头字典

除了上面必备的请求信息之外,你还可以额外添加一些内容辅助服务器理解请求。可通过request.headers可以获取到请求头字典。

  • User-Agent:用于标识浏览器的信息,常用于区分移动端和PC端、初级反爬。

  • Referer:用于标识请求的来源,常用于统计分析。

  • Cookie:用于标识请求的Cookie?,常用于用户身份验证。

  • Content-Type:用于标识请求的Content-Type,以便服务器正确解析请求体。

tip

你也可以自定义任何名称的请求头。这些非标准头通常被称为 Custom Headers。

自定义头推荐以 X- 开头(例如 X-My-Token),以区分标准头。

from fastapi import FastAPI, Request
import secrets

app = FastAPI()
"""
secret_key选择足够复杂的即可。
我这里生成一个 32 字节(256 位)的十六进制随机字符串作为secret_key,用来加密和解密 Session 数据。
不要频繁更换:一旦你修改了 secret_key,所有之前发给用户的 Session Cookie 都会失效。用户会发现自己莫名其妙被踢出登录了。

FastAPI 为了保持轻量和安全,默认不开启Session功能,因为 Session 涉及到 Cookie 的签名加密和安全性设置。
"""
from starlette.middleware.sessions import SessionMiddleware
app.add_middleware(SessionMiddleware,
secret_key=secrets.token_hex(32),
session_cookie="session_cookie", # 浏览器中显示的 Cookie 名称
max_age=1209600 # Session 有效期 2 周(1209600秒)
)

@app.get("/index")
async def catch_all(request: Request):
# 你可以通过request.cookies获取,但是要手动解析。
"""
csrftoken=9VZsSS9jnRLIopC5TbdIXRl3yMKhbq5c; sessionid=h8ofhxjbfysrbzbbmcnlo601zav5qxhl
"""
cookies1 = dict(request.headers).get("cookie")

# cookie作为常用的请求头字段,有一个专门的属性cookies对象
cookies2 = request.cookies
"""
{
"csrftoken": "9VZsSS9jnRLIopC5TbdIXRl3yMKhbq5c",
"sessionid": "h8ofhxjbfysrbzbbmcnlo601zav5qxhl"
}
"""
# 方便获取解析后的cookie字典。
cookies2.get("sessionid")

## 获取session
session = request.session
# 获取session中的views,如果为空则初始化为0
views = request.session.get("views", 1)
# 将views加1
request.session["views"] = views + 1
if request.session["views"] > 3:
# 手动清空session,可用于用户点击退出之后
request.session.clear()

return {
"message": f"欢迎回来!这是你第 {views} 次访问。",
"session":session,
}

"""
{
"message": "欢迎回来!这是你第 2 次访问。",
"session": {
"views": 3
}
}
"""

base64图片处理

有时候用户登录之后,你希望获取它的用户信息,包含:用户名、头像。一个是文本、一个是小图片。

此时我们一般会选择把图片编码为base64字符串,然后和用户名一起返回给前端。这样可以减少一次请求。此时base64字符串可当作JSON数据之一传输。

import base64
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

# 模拟读取本地头像文件并转换为 Base64
def get_image_base64(file_path: str):
try:
with open(file_path, "rb") as image_file:
# 读取二进制数据 -> 转为 Base64 编码 -> 转为 UTF-8 字符串
encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
return f"data:image/png;base64,{encoded_string}"
except FileNotFoundError:
return None

@app.get("/user/profile")
async def get_user_profile():
# 模拟从数据库获取数据
user_info = {
"username": "Gemini_User",
"user_id": 1024,
"email": "hello@example.com"
}

# 获取图片的 Base64 字符串(假设文件名为 avatar.png)
avatar_base64 = get_image_base64("1.png")

# 合并返回
return {
"code": 200,
"data": {
**user_info,
"avatar": avatar_base64 # 前端拿到后可以直接放在 <img src="..."> 中
}
}

Content-Type实现兼容处理

form表单是互联网的“元老”,诞生于 JSON 之前。它不能处理复杂的嵌套(比如列表里套对象)。如果你要编写一个具有兼容性的后台,支持form表单和JSON请求。

你可以通过判断Content-Type来完成。

from fastapi import FastAPI,Query,Request
app = FastAPI()

@app.post("/")
async def get_image(request: Request):
"""
Content-Type、Authorization 还是你自己定义的 X-Custom-Token

都可以通过 request.headers.get(key, "") 来获取键值。
"""
content_type = request.headers.get("Content-type", "")
xheader = request.headers.get("X-Content-type", "")
if xheader:
return {"message": "X-header"}
if content_type.startswith("multipart/form-data"):
return {"message": "From"}
elif content_type.startswith("application/json"):
return {"message": "JSON"}

return {"message": f"UNknow: {content_type}"}

Body

数据来源 (Location)浏览器/HTTP 格式FastAPI 参数定义方式Python 转换后的类型典型用途
Body (JSON){"id": 1, "name": "AI"}item: Item(BaseModel)Pydantic Model提交复杂业务数据
Body (Form)user=me&pw=123user: str = Form()str传统表单登录
File (文件)二进制流 (Multipart)file: UploadFileSpooledTemporaryFile上传头像、文档

接口除了可以传输文本数据,还可传输图片、视频等数据,OpenCV 可以捕获屏幕,将两者结合起来,实现内网直播功能,可以在局域网内通过浏览器观看屏幕共享。

Middleware中间件

中间件是一个在 每个请求 被路径函数处理之前,以及在 每个响应 返回之后,发出响应前运行的逻辑。

import time
from fastapi import FastAPI, Request

app = FastAPI()

@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
# === 1. 请求到达前执行 ===
start_time = time.perf_counter()

# === 2. 传递给后面的逻辑 (路径函数或其他中间件) ===
response = await call_next(request)

# === 3. 响应返回前执行 ===
process_time = time.perf_counter() - start_time
# 在响应头里加个“耗时”字段
response.headers["X-Process-Time"] = str(process_time)

return response

中间件的执行顺序是:最后添加的中间件最先处理请求,最先添加的中间件最后处理响应

如果一个中间件直接返回响应(没有执行 await call_next(request)),那么内部的中间件和路径函数都不会被执行。

from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

app = FastAPI()

@app.middleware("http")
async def blacklist_middleware(request: Request, call_next):
user_ip = request.client.host
if user_ip in ["1.2.3.4"]:
# 如果是黑名单,直接返回 403,不会执行 call_next
return JSONResponse(content={"detail": "You are banned"}, status_code=403)

# 如果不是黑名单,继续执行
response = await call_next(request)
return response
tip

如果黑名单是基于 IP(简单判断),用 Middleware 或 Nginx 层解决。

如果黑名单是基于用户 ID(涉及业务逻辑),用 Depends 挂在全局路由上。

Router路由

同步路由

使用普通的 def(同步函数)时,FastAPI 会自动将该函数放入 External Thread Pool(外部线程池)执行。

默认线程池大小为40。在线程池未满时不影响性能,线程池满时会排队等待。

虽然会阻塞线程池中的线程,但不会阻塞事件循环,其他异步请求仍可正常处理。

在 FastAPI 中,使用同步库(如 requestspyodbc)时,应使用 def 而不是 async def

异步路由

async def 函数中使用同步的操作时会发生阻塞。会阻塞整个事件循环(Event Loop)。async def 函数在事件循环中执行,而同步操作是阻塞的。

FastAPI 使用单线程事件循环处理所有异步任务,在异步函数中执行同步阻塞操作会导致整个服务器的其他异步请求都被阻塞。

在异步函数中阻塞 2 秒,所有其他异步请求也会被阻塞 2 秒。

tip

对于 IO 密集型任务(如网络请求、数据库查询),优先使用 async def 配合异步库(如 httpxasyncpg)。

对于 CPU 密集型任务或必须使用同步库的场景,使用 def 让 FastAPI 自动将其放入线程池执行。

from fastapi import FastAPI
import time
import asyncio

app = FastAPI()

@app.get("/sleep_async")
async def sleep():
# 应该使用await asyncio.sleep(3)避免阻塞事件循环
time.sleep(3)
return {"message": f"Sleeping for 3 seconds...{time.time()}"}

@app.get("/sleep")
def sleep():
time.sleep(3)
return {"message": f"Sleeping for 3 seconds...{time.time()}"}

if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
tip

验证 FastAPI 的真实并发能力应该通过代码而不是浏览器。如果你在 同一个浏览器 (比如 Chrome)中打开50个标签页,同时请求同一个 URL:

  • 浏览器的锁定机制 :大多数现代浏览器为了防止过度消耗服务器资源,会对完全相同的 URL 进行连接锁定。它会等待第一个请求完成后,才发出第二个。

处处异步才能提升性能。承载 10k+ QPS 时,应选择异步 ORM(如 Tortoise-ORM 或 SQLAlchemy 2.0 async)而非同步 SQLAlchemy。

  • 同步 ORM:必须配合 def 使用。FastAPI 会使用线程池,但线程资源昂贵(每个线程占用数 MB 内存),大量线程可能导致服务器内存不足。
  • 异步 ORM:配合 async def 使用。不占用额外线程,在等待数据库返回时让出 CPU 处理其他请求。这是 FastAPI 高性能的关键。

Depends依赖

Depends 是 FastAPI 的核心特性,用于实现代码解耦和业务逻辑分离。

最佳实践是将鉴权、数据库连接等横切关注点从业务逻辑函数中分离,抽取为依赖函数,通过 Depends 注入。

Depends 的执行顺序是先运行依赖函数,再进入路径函数。

  1. 扫描依赖:FastAPI 看到路径函数声明了 Depends(verify_admin_token)
  2. 前置执行:FastAPI 先调用 verify_admin_token,检查 Header,运行函数逻辑
  3. 异常中断:如果在 verify_admin_token 里触发了 raise HTTPException,流程会直接中断,返回响应给前端。路径函数不会被执行
  4. 注入结果:如果运行成功,verify_admin_token 的返回值会被赋值给路径函数的参数
  5. 业务逻辑:最后才运行路径函数内部的代码
  • Depends 支持嵌套。verify_admin_token 也可以依赖其他函数(如 get_db)。
  • 当接口依赖 A 和 B,且 A 和 B 都依赖数据库连接 get_db 时,FastAPI 默认在同一请求中只执行一次 get_db,并将结果缓存后分发给 A 和 B,节省资源。
特性中间件 (Middleware)依赖注入 (Depends)
执行范围全局。只要挂载了,所有接口都会跑。局部或特定路由。可以精细控制哪个接口用。
访问权限只能拿到原始 Request 对象,很难拿到 Pydantic 解析后的数据。全能。能拿到解析后的参数、模型、甚至其他依赖。
灵活性适合做:CORS(跨域)、Gzip压缩、日志统计、全局异常捕获。适合做:身份验证、数据库连接、权限校验。
异步支持必须是异步的。可以是同步 def 也可以是异步 async def。

Pydantic参数校验

在注册用户接口中,需要接收 passwordconfirm_password,并在 Pydantic 模型中校验这两个字段必须完全一致。

校验失败时,FastAPI 应返回自定义错误信息(如 "两次密码不一致")。

实现字段间校验需要使用 model_validator

from pydantic import BaseModel, model_validator

class RegisterUser(BaseModel):
username: str
password: str
confirm_password: str

# mode='after' 表示数据初步解析完后再跑这个校验
@model_validator(mode='after')
def check_passwords_match(self):
pw1 = self.password
pw2 = self.confirm_password
if pw1 != pw2:
raise ValueError("两次输入的密码不一致!")
return self

当 Pydantic 抛出 ValueError 时,FastAPI 会自动捕获并转换成 422 Unprocessable Entity 错误返回给前端,并附带清晰的 JSON 错误说明。

Response响应

用户返回的内容有:Python对象、HTML字符串、图片、视频、文件、JSON数据等。继承关系如下:

Response (Base Class)
├── JSONResponse
│ ├── ORJSONResponse
│ └── UJSONResponse
├── HTMLResponse
├── PlainTextResponse
├── FileResponse
├── StreamingResponse
└── RedirectResponse

下面是一个实现屏幕共享的示例,使用StreamingResponse实现流式响应:


import os
from importlib import import_module
from fastapi import FastAPI
from fastapi.responses import HTMLResponse, StreamingResponse
import uvicorn

from io import BytesIO
import cv2
from PIL import ImageGrab, Image
import time
import threading
from threading import get_ident


class CameraEvent(object):
def __init__(self):
self.events = {}

def wait(self):
ident = get_ident()
if ident not in self.events:
self.events[ident] = [threading.Event(), time.time()]
return self.events[ident][0].wait()

def set(self):
now = time.time()
remove = None
for ident, event in self.events.items():
if not event[0].isSet():
event[0].set()
event[1] = now
else:
if now - event[1] > 5:
remove = ident
if remove:
del self.events[remove]

def clear(self):
self.events[get_ident()][0].clear()


class BaseCamera(object):
thread = None
frame = None
last_access = 0
event = CameraEvent()

def __init__(self):
if BaseCamera.thread is None:
BaseCamera.last_access = time.time()

BaseCamera.thread = threading.Thread(target=self._thread)
BaseCamera.thread.start()

while self.get_frame() is None:
time.sleep(0)

def get_frame(self):
BaseCamera.last_access = time.time()

BaseCamera.event.wait()
BaseCamera.event.clear()

return BaseCamera.frame

@staticmethod
def frames():
raise RuntimeError('Must be implemented by subclasses.')

@classmethod
def _thread(cls):
print('Starting camera thread.')
frames_iterator = cls.frames()
for frame in frames_iterator:
BaseCamera.frame = frame
BaseCamera.event.set()
time.sleep(0)
if time.time() - BaseCamera.last_access > 10:
frames_iterator.close()
print('Stopping camera thread due to inactivity.')
break
BaseCamera.thread = None



class Camera(BaseCamera):
video_source = 0

@staticmethod
def set_video_source(source):
Camera.video_source = source

@staticmethod
def frames():
camera = cv2.VideoCapture(Camera.video_source)
if not camera.isOpened():
raise RuntimeError('Error')

while True:
image = ImageGrab.grab() # 获取屏幕数据
# w, h = image.size
image = image.resize((1366, 750), Image.LANCZOS) # 图片缩放
output_buffer = BytesIO() # 创建二进制对象
image.save(output_buffer, format='JPEG', quality=100) # quality提升图片分辨率
frame = output_buffer.getvalue() # 获取二进制数据
yield frame # 生成器返回一张图片的二进制数据

app = FastAPI()


@app.get('/', response_class=HTMLResponse)
async def index():
"""
视图函数
:return:
"""
return '''<html>

<head>
<title>屏幕共享</title>
</head>

<body>
<img src="/video_feed">
</body>

</html>'''


def gen(camera):
"""
流媒体发生器
"""
while True:
frame = camera.get_frame()

yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')


@app.get('/video_feed')
async def video_feed():
"""流媒体数据"""
return StreamingResponse(gen(Camera()),
media_type='multipart/x-mixed-replace; boundary=frame')


if __name__ == '__main__':
ip_host = '127.0.0.1' # 本机ip地址
ip_host2 = '0.0.0.0' # 内网ip地址
import webbrowser
webbrowser.open(f'http://{ip_host}:80')
uvicorn.run(app, host=ip_host2, port=80)

exception_handler异常处理器

在项目中,为了避免在每个地方都写 try...except,需要在程序崩溃或报错时返回统一格式的 JSON。

FastAPI 提供了 @app.exception_handler 装饰器来实现统一异常捕获。

from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

app = FastAPI()

# 定义一个自定义异常类
class MyBusinessException(Exception):
def __init__(self, name: str):
self.name = name

# 统一捕获这个异常
@app.exception_handler(MyBusinessException)
async def my_exception_handler(request: Request, exc: MyBusinessException):
return JSONResponse(
status_code=418,
content={"message": f"Oops! {exc.name} 触发了业务逻辑错误"},
)

@app.get("/test/{name}")
async def test_exception(name: str):
if name == "error":
# 抛出异常后,FastAPI 会自动去上面找对应的 handler
raise MyBusinessException(name=name)
return {"message": "Success"}

jsonable_encoder数据转换

在业务执行(第 6 步)完成后,你可能返回的是一个 SQLAlchemy 模型对象、一个普通的 Python 类实例、或者是包含 datetime 对象的字典。但是,JSON 格式只支持字符串、数字、布尔值、数组、对象和 null。

jsonable_encoder 的作用是将 Python 的复杂数据类型(非 JSON 原生支持的类型)转换为 JSON 兼容的基础类型。

  • 日期/时间 (datetime, date): 转换为 ISO 格式的字符串。
  • 数据库模型 (ORM Objects): 提取其属性并转换为字典。
  • UUID/Decimal: 转换为字符串或浮点数。
  • Set/Tuple: 转换为 List。
info

json标准库:遇到 datetime 对象、UUID、或者 Pydantic 模型、SQLAlchemy 数据库对象时,它会直接抛出 TypeError。

jsonable_encoder:它是一个预处理器。它能识别这些复杂的 Python 对象,并递归地将它们转换为 JSON 兼容的格式(例如把 datetime 转为 ISO 字符串,把 Pydantic 模型转为 dict)。

from fastapi import FastAPI
from fastapi.encoders import jsonable_encoder
from datetime import datetime
from pydantic import BaseModel

class Item(BaseModel):
name: str
timestamp: datetime

# 模拟业务返回
item = Item(name="AI Assistant", timestamp=datetime.now())

# 直接用标准库,报错:Object of type Order is not JSON serializable
# import json
# json.dumps(my_order)
json_compatible_data = jsonable_encoder(item)

print(type(json_compatible_data))
# 输出: <class 'dict'>

print(json_compatible_data)
# 输出: {"name": "AI Assistant", "timestamp": "2023-10-27T10:00:00.000000"}
# 注意:datetime 成功变成了字符串,没有报错

response_model响应模型

from fastapi import FastAPI
from pydantic import BaseModel, EmailStr
from typing import Optional

app = FastAPI()

# 定义响应模型:只有用户名和邮箱,没有密码
class UserOut(BaseModel):
username: str
email: EmailStr
nickname: Optional[str] = None

# 模拟数据库中的完整原始数据
fake_db_user = {
"username": "Gemini_User",
"password": "hashed_password_123456", # 敏感字段
"email": "hello@example.com",
"is_admin": True, # 内部字段
"nickname": "小杰"
}

@app.get("/user", response_model=UserOut)
async def get_user():
# 哪怕我返回了包含密码的完整字典,FastAPI 也会根据 UserOut 进行过滤
return fake_db_user

# 结果:{"username": "Gemini_User", "email": "hello@example.com", "nickname": "小杰"}

BackgroundTasks

高并发文件处理系统场景:用户上传大视频文件(如 1GB),系统需要对视频进行转码(CPU 密集型操作,耗时数分钟)。

系统需要立即返回"正在处理中",而非让用户等待转码完成。

对于这类场景,主要有以下几种选择:

  • async def 里直接写转码逻辑会阻塞整个事件循环
  • def 里写转码逻辑,靠线程池不能立刻返回"正在处理中"
  • 使用 BackgroundTasks(FastAPI 内置后台任务):✅ 适合中小型项目
  • 使用 Celery + Redis(分布式任务队列):✅ 适合大型/生产项目

BackgroundTasks 是 FastAPI 内置的功能,非常轻量。

在 Response("正在处理中")发送给用户之后,在同一 Python 进程内启动任务继续执行。

优点:不需要额外安装 Redis 或 RabbitMQ、代码简洁、可以立刻返回响应给用户

局限性:任务仍在 Web 服务器进程内执行。转码视频占用 CPU 较高,多个并发转码任务可能导致 Web 服务器 CPU 占用率升至 100%,影响其他 API 请求的响应速度。

from fastapi import FastAPI, BackgroundTasks, UploadFile, File
from fastapi.responses import JSONResponse
import time
import os

app = FastAPI()

def process_video(file_path: str, output_path: str):
"""
模拟视频转码任务(CPU 密集型操作)
注意:这里使用 def 而不是 async def,因为转码是 CPU 密集型任务
"""
print(f"开始处理视频: {file_path}")
try:
# 模拟耗时操作(实际场景中这里会是视频转码逻辑,如使用 ffmpeg)
time.sleep(5) # 模拟转码耗时

# 模拟转码后的操作(如保存转码后的文件)
print(f"视频转码完成: {file_path} -> {output_path}")

# 实际场景中可能还需要:
# - 更新数据库状态
# - 发送通知给用户
# - 清理临时文件等
except Exception as e:
print(f"转码失败: {e}")

@app.post("/upload-video")
async def upload_video(
background_tasks: BackgroundTasks,
file: UploadFile = File(...)
):
"""
上传视频文件,立即返回响应,后台处理转码

工作流程:
1. 接收文件并保存到临时目录
2. 将转码任务添加到后台任务队列
3. 立即返回"正在处理中"的响应
4. 后台任务在响应发送后开始执行
"""
# 保存上传的文件
file_path = f"/tmp/{file.filename}"
os.makedirs("/tmp", exist_ok=True)

with open(file_path, "wb") as f:
content = await file.read()
f.write(content)

# 定义输出路径
output_path = f"/tmp/transcoded_{file.filename}"

# 添加后台任务:在响应返回后执行
background_tasks.add_task(process_video, file_path, output_path)

# 立即返回响应,不等待转码完成
return JSONResponse({
"message": "视频已上传,正在处理中",
"filename": file.filename,
"status": "processing",
"note": "转码完成后会通过其他方式通知您"
})

关键点说明:

  1. background_tasks: BackgroundTasks:通过依赖注入获取 BackgroundTasks 实例
  2. background_tasks.add_task():添加后台任务,第一个参数是函数,后续参数是函数的参数
  3. 立即返回响应:在添加后台任务后立即返回,不等待任务完成
  4. 任务执行时机:后台任务会在响应发送给客户端之后才开始执行

测试方法:

# 使用 curl 上传文件
curl -X POST "http://127.0.0.1:8000/upload-video" \
-F "file=@your_video.mp4"

# 响应会立即返回,但转码任务会在后台继续执行,可观察fastapi输出。

完成任务后通知用户的方式可以有多种,可以设置进程轮询定时查询任务状态,也可以使用WebSocket实时通知。

注意事项:

  • 后台任务函数可以是 defasync def
  • 如果任务失败,不会影响已返回的响应
  • 任务在同一个 Python 进程中执行,不会跨进程或跨服务器
  • 适合轻量级任务,对于重量级任务建议使用 Celery

在上面的示例中,upload_video 函数接收文件后立即返回响应,而 process_video 函数会在响应发送给客户端后,在后台执行。

Celery + Redis 的优势

Web 服务器将任务指令放入 Redis 队列后立即返回。由专门的 Worker 进程(可在其他服务器上)处理耗时的转码任务。

优点:

  • 解耦:无论视频转码多慢,都不会影响 API 服务器的响应速度
  • 分布式:支持多台服务器处理任务,抗压能力强
  • 可扩展:可以根据任务量动态增加 Worker 数量

适用场景:

  • 中小型项目、任务量不大:使用 BackgroundTasks
  • 大型项目、高并发、需要分布式处理:使用 Celery + Redis