Skip to main content

SQLAlchemy

SQLAlchemy 是 Python 下成熟的 ORM 与 SQL 工具库,支持多种数据库方言,换库(如从 SQLite 到 PostgreSQL)时几乎不用改业务代码。详见 SQLAlchemy 2.0 官方文档

安装与引擎

pip install sqlalchemy
# 使用 PostgreSQL 时通常还需:pip install psycopg2-binary

所有连接都从 Engine 开始。Engine 是连接池与数据库 URL 的入口,应用内通常全局只建一个

from sqlalchemy import create_engine

engine = create_engine("sqlite:///:memory:", echo=True)
# PostgreSQL: "postgresql+psycopg2://user:pass@host:5432/dbname"
# MySQL: "mysql+pymysql://user:pass@host:3306/dbname"
  • URL 含义方言+驱动://用户:密码@主机:端口/库名。SQLite 无服务端,用 /:memory: 表示内存库。
  • echo=True:把发出的 SQL 打印到标准输出,调试用;生产应关闭。
  • 连接池:默认每个 Engine 自带连接池(如 QueuePool),无需手写连接管理;高并发时可调 pool_sizemax_overflow
tip

Engine 不持有“当前连接”;占用连接的是 engine.connect() 或通过 Session 发起的操作。请求级、短生命周期的 Session 配全局单例 Engine 即可。

基本运行示例

下面这段可在本机直接运行(内存 SQLite,无需装数据库)。后文中的 engineBaseUser 均沿用此处定义。

from sqlalchemy import create_engine, select, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session

engine = create_engine("sqlite:///:memory:", echo=False)

class Base(DeclarativeBase):
pass

class User(Base):
__tablename__ = "user_account"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(30), nullable=False)
email: Mapped[str] = mapped_column(String(255), nullable=False)

Base.metadata.create_all(engine)

with Session(engine) as session:
session.add(User(name="alice", email="alice@example.com"))
session.add(User(name="bob", email="bob@example.com"))
session.commit()

with Session(engine) as session:
stmt = select(User).where(User.name == "alice")
user = session.scalars(stmt).first()
print(user.name, user.email) # alice alice@example.com

类型对应与转换

ORM 把 Python 类型 映射到 数据库列类型,读写时做序列化/反序列化。对应关系搞错会报错或产生静默错误。

类型对应表

Python 类型(业务层)SQLAlchemy 类型(列定义)常见数据库列类型
intInteger / BigIntegerINTEGER, BIGINT
floatFloat / DoubleREAL, DOUBLE
strString(length) / TextVARCHAR(n), TEXT
boolBooleanBOOLEAN(部分数据库用 SMALLINT)
datetime.datetimeDateTime / TIMESTAMPTIMESTAMP, DATETIME
datetime.dateDateDATE
datetime.timeTimeTIME
decimal.DecimalNumeric(precision, scale)NUMERIC/DECIMAL
bytesLargeBinary / BLOBBLOB, BYTEA
dict / list(JSON)JSONJSON, JSONB
uuid.UUIDUuid(2.0) / CHAR(36)UUID, CHAR(36)

声明式 ORM 中通常用 Mapped[python_type] + mapped_column(类型或留空),SQLAlchemy 会根据 Mapped 的注解和 mapped_column 的参数决定最终列类型:

from datetime import date, datetime
from decimal import Decimal
from typing import Optional
from uuid import UUID
from sqlalchemy import String, Text, Numeric, DateTime, Date, Boolean, JSON, LargeBinary
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

class Base(DeclarativeBase):
pass

class Product(Base):
__tablename__ = "product"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(255))
description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
price: Mapped[Decimal] = mapped_column(Numeric(10, 2))
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
tags: Mapped[Optional[list]] = mapped_column(JSON, nullable=True) # 存 list/dict
binary_data: Mapped[Optional[bytes]] = mapped_column(LargeBinary, nullable=True)
  • nullable:数据库允许 NULL 时,Python 侧用 Optional[...]mapped_column(..., nullable=True),否则写入 None 可能违反非空约束。
  • 默认值default=... 在 Python 层赋默认值;server_default=text("...") 在数据库层(如 CURRENT_TIMESTAMP)。

类型不一致时

  1. 数据库是字符串,业务要 int/float/date

    • 读:在 Python 里转,如 int(row.count_str),或用 TypeDecorator 在 ORM 层统一转(见下)。
    • 写:传入目标类型(如 int),由驱动/ORM 转成库能接受的格式;若库是 VARCHAR,会按 String 存成 str(123)
  2. 数据库是 NUMERIC/DECIMAL,Python 用 float

    • 金额等用 decimal.Decimal + Numeric(precision, scale) 更稳妥;用 float 会有精度问题,必要时先 round(x, 2)
  3. 数据库是 JSON/JSONB,Python 用 dict/list

    • 用 SQLAlchemy 的 JSON 类型:写 dict/list,ORM 序列化为 JSON 字符串;读时自动反序列化。
  4. 数据库是 BYTEA/BLOB,Python 用 bytes

    • LargeBinary,读写都是 bytes。若存的是 UTF-8 字符串,取回后 value.decode('utf-8')
  5. 时区:datetime 无时区 vs 有时区

    • 库侧:TIMESTAMP WITH TIME ZONE(如 PostgreSQL timestamptz)存 UTC;WITHOUT TIME ZONE 只存字面值。
    • Python:datetime.utcnow() 无时区;datetime.now(timezone.utc) 带时区。存库统一用 UTC,展示再转本地;列可用 DateTime(timezone=True)(库支持时)。

TypeDecorator

库里存的是某种格式(如字符串),业务想用另一种 Python 类型(enum、自定义类)时,用 TypeDecorator 统一“存什么/读成什么”:

import json
from sqlalchemy import TypeDecorator, String

class JSONListType(TypeDecorator):
impl = String
cache_ok = True

def process_bind_param(self, value, dialect):
if value is not None:
value = json.dumps(value, ensure_ascii=False)
return value

def process_result_value(self, value, dialect):
if value is not None:
value = json.loads(value)
return value
  • process_bind_param:Python → 数据库(写)。
  • process_result_value:数据库 → Python(读)。

列在库里仍是字符串,在 ORM 上读写则是 list/dict。


模型与表

DeclarativeBase 定义模型,类对应表、属性对应列:

from typing import Optional
from sqlalchemy import String, Index
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

class Base(DeclarativeBase):
pass

class User(Base):
__tablename__ = "user_account"
__table_args__ = (
Index("ix_user_email", "email", unique=True),
)

id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(30))
email: Mapped[str] = mapped_column(String(255), unique=True)
fullname: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
  • 主键:至少一列 primary_key=True。自增:多数数据库对整数主键默认自增;显式可用 mapped_column(Integer, primary_key=True, autoincrement=True)
  • 唯一 / 索引unique=True 单列唯一;多列唯一或复合索引用 __table_args__ 中的 UniqueConstraintIndex
  • 建表Base.metadata.create_all(engine) 只创建不存在的表,不改已有表;上线改表结构用 Alembic 做迁移。

Session 与事务

Session 是 ORM 的工作单元:跟踪对象变更,在 commit 时一次性刷到数据库,并代表一个事务边界。

基本用法

from sqlalchemy.orm import Session

with Session(engine) as session:
user = User(name="alice", email="alice@example.com")
session.add(user)
session.commit()
  • with:退出时自动关闭 Session;commit 后事务结束,连接归还连接池。
  • add:把实例加入 Session,在 commit 时生成 INSERT。
  • commit:先 flush(把挂起的 INSERT/UPDATE/DELETE 发到库),再提交事务。

事务边界

  • with Session(engine): 时,进入 with 会隐式开始事务,commit() 提交;若未 commit 就退出则会 rollback。也可显式处理异常:
with Session(engine) as session:
try:
session.add(User(name="bob", email="bob@example.com"))
session.commit()
except Exception:
session.rollback()
raise
  • commit 之后不要继续用同一 Session 做写操作而不再次 begin;下一次读写会开启新事务。

flush 与 commit

  • flush():把当前 Session 里挂起的变更发到数据库(执行 SQL),但不提交事务。同一事务里先 INSERT 再拿自增 id、或触发约束/触发器时可用。
  • commit():先隐式 flush,再 COMMIT,事务结束。
with Session(engine) as session:
user = User(name="alice", email="alice@example.com")
session.add(user)
session.flush()
print(user.id) # 若 id 为自增,此时已可拿到
session.commit()

同一事务内多次读写

在一个 with 里连续读写,都在同一事务中,要么全部提交要么全部回滚:

from sqlalchemy import select

with Session(engine) as session:
u = session.scalars(select(User).where(User.email == "alice@example.com")).one()
u.name = "Alice Smith"
session.add(User(name="new_user", email="new@example.com"))
session.commit()

CRUD

  • 单条session.add(实例)多条session.add_all([实例1, 实例2])
  • 主键为自增时,commit 或 flush 后实例上会填上生成的 id。
  • 若希望“无则插入、有则忽略”可配合数据库的 ON CONFLICT(如 PostgreSQL 的 insert().on_conflict_do_nothing())。
from sqlalchemy.orm import Session

with Session(engine) as session:
# 单条
u1 = User(name="alice", email="alice@example.com")
session.add(u1)

# 多条
u2 = User(name="bob", email="bob@example.com")
u3 = User(name="carol", email="carol@example.com")
session.add_all([u2, u3])

session.flush()
print(u1.id, u2.id, u3.id) # 自增 id 已生成,如 1 2 3

session.commit()

“有则忽略”示例(PostgreSQL,需表上有唯一约束如 email):

from sqlalchemy.dialects.postgresql import insert

with Session(engine) as session:
stmt = insert(User).values(
name="alice",
email="alice@example.com",
).on_conflict_do_nothing(index_elements=["email"])
session.execute(stmt)
session.commit()

2.0 风格用 select() + session.scalars()session.execute()

from sqlalchemy import select

with Session(engine) as session:
stmt = select(User).where(User.email == "alice@example.com")
user = session.scalars(stmt).first()

stmt = select(User.id, User.name).where(User.id > 0).limit(10)
rows = session.execute(stmt).all()
  • select(User):选整个实体,返回 ORM 对象。
  • select(User.id, User.name):只选列,返回 Row,用 row.idrow.name 访问。
  • scalars(stmt):返回“标量”迭代器(每行一个对象);.first().one().all() 取单条或列表。
  • execute(stmt):返回 Result,.scalars().all() 等价于上面标量结果。

  • 方式一:查出对象,改属性,commit。Session 会跟踪变更并生成 UPDATE。
from sqlalchemy import select

with Session(engine) as session:
u = session.scalars(select(User).where(User.id == 1)).one()
u.name = "New Name"
session.commit()
  • 方式二:批量更新用 Core 的 update()
from sqlalchemy import update

with Session(engine) as session:
session.execute(update(User).where(User.id == 1).values(name="New Name"))
session.commit()

  • 按对象删session.delete(实例),再 commit。
  • 按条件删:用 Core 的 delete()
from sqlalchemy import delete

with Session(engine) as session:
session.execute(delete(User).where(User.id == 1))
session.commit()

查询条件与聚合

  • where:链式多个条件为 AND;User.name == "x"User.id.in_([1,2,3])User.name.like("a%") 等。
  • order_byorder_by(User.name) 升序,order_by(User.name.desc()) 降序。
  • limit / offset:分页。
  • 聚合func.count()func.sum() 等:
from sqlalchemy import select, func

with Session(engine) as session:
stmt = select(func.count(User.id)).select_from(User)
total = session.scalars(stmt).scalar()
print(total)

关系与预加载

一对多、多对一等用 ForeignKey + relationship()

from typing import List
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship

class User(Base):
__tablename__ = "user_account"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(30))
addresses: Mapped[List["Address"]] = relationship(
"Address", back_populates="user", lazy="selectin"
)

class Address(Base):
__tablename__ = "address"
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
email_address: Mapped[str] = mapped_column(String(255))
user: Mapped["User"] = relationship(back_populates="addresses")

关系在 Python 中的类型

  • 一对多:一方 Mapped[List["Child"]],多方 Mapped["Parent"]
  • 多对多:要中间表,两边 Mapped[List[...]],用 secondary=association_table

懒加载与 N+1

  • lazy="select"(默认):首次访问 user.addresses 再发 SELECT,容易 N+1。
  • lazy="selectin":加载 User 时用一条 WHERE user_id IN (...) 批量加载 Address,列表页可用。
  • lazy="joined":用 JOIN 一次查出 User 和 Address。

不依赖默认 lazy 时,用 joinedload / selectinload 显式预加载:

from sqlalchemy import select
from sqlalchemy.orm import selectinload

with Session(engine) as session:
stmt = select(User).options(selectinload(User.addresses)).where(User.id == 1)
user = session.scalars(stmt).one()
# 访问 user.addresses 不再触发额外 SQL
for addr in user.addresses:
print(addr.email_address)

原生 SQL 与 text()

需要手写 SQL 时用 text(),参数用 :name 占位、字典传入,避免拼接导致 SQL 注入:

from sqlalchemy import text

with engine.connect() as conn:
result = conn.execute(
text("SELECT id, name FROM user_account WHERE id > :id"),
{"id": 0}
)
for row in result:
print(row.id, row.name)

与 Session 一起用时:

from sqlalchemy import text

with Session(engine) as session:
session.execute(
text("UPDATE user_account SET name = :name WHERE id = :id"),
{"name": "x", "id": 1}
)
session.commit()

注意事项

对象只属于一个 Session

一个 ORM 实例只应属于一个 Session。从 Session A 查出来再放到 Session B 的 add 里,会报“对象已绑定到另一 Session”。应在新 Session 里按 id 重新查,或传值建新对象。

commit 后对象过期

默认 commit 后 Session 会把已加载对象标为 expired,下次访问属性时会再查一次库(refresh)。若不想要这种行为,可设 expire_on_commit=False(可能读到旧数据,需心里有数)。

字符串与编码

  • String(length) 对应 VARCHAR(length),超长会报错或截断;大段文本用 Text
  • 存中文时,库与连接都用 UTF-8(PostgreSQL 的 encoding、MySQL 的 charset)。

整数与时区

  • Integer 一般 32 位;主键或大数用 BigInteger
  • 时间统一用 UTC 存,展示再转本地;列用 DateTime(timezone=True)(库支持时),Python 用 datetime.now(timezone.utc)

上线前后

  • 关闭 echo
  • Alembic 做 schema 迁移,不依赖 create_all 改表。
  • 常用查询字段加索引;唯一约束与业务一致。

和数据库概念的对应

  • ACID:Session 的 commit/rollback 即事务提交与回滚;单次 commit 内要么全成功要么全回滚。
  • 隔离级别:由库与连接配置决定;SQLAlchemy 可在 engine 或连接上设 isolation_level="READ COMMITTED" 等。
  • 连接池:Engine 自带连接池;Session 按请求创建、用后关闭,连接会归还池子。

小结

概念用途
Engine全局单例,连接池 + 数据库 URL
Mapped / mapped_columnPython 类型 ↔ 数据库列类型
TypeDecorator自定义“存什么/读成什么”
Session工作单元与事务边界,用 with Session(engine)
flush / commitflush 发 SQL 不提交,commit 提交事务
select().where().order_by()2.0 风格查询
relationship + lazy / selectinload关系与 N+1 避免
text()原生 SQL,参数 :name

异步 asyncio、Alembic、FastAPI 集成等见 官方教程ORM 快速入门