Skip to main content

sqlite

info

SQLite 是一个开源的嵌入式关系型数据库,以其轻量级、零配置、无服务器架构而闻名。Python 通过标准库 sqlite3 模块提供了完整的 SQLite 支持。

基础操作

连接数据库

connect() 函数用于连接到 SQLite 数据库文件。

函数签名:sqlite3.connect(database, timeout=5.0, **kwargs) -> Connection

参数说明:

  • database:数据库文件路径,可以是文件路径或 :memory: (内存数据库)
  • timeout:数据库被锁定时的等待超时时间(秒),默认 5.0
  • check_same_thread:是否检查连接是否在同一线程中使用,默认 True
  • isolation_level:事务隔离级别,默认为 "DEFERRED"

返回值:

  • 返回一个 Connection 对象
import sqlite3

# 连接到磁盘上的数据库(如果不存在则创建)
conn = sqlite3.connect('example.db')

# 连接到内存数据库(适合临时数据或测试)
memory_conn = sqlite3.connect(':memory:')

# 连接后记得关闭(推荐使用上下文管理器)
conn.close()

# 推荐方式:使用上下文管理器自动管理连接
with sqlite3.connect('example.db') as conn:
# 在这里进行数据库操作
pass
# 退出时自动提交事务并关闭连接
tip

内存数据库的使用场景

  1. 单元测试:快速创建临时测试环境
  2. 数据处理:临时存储中间结果
  3. 缓存:比字典更强大的内存缓存

注意:内存数据库在连接关闭后数据会丢失。

执行SQL语句

游标(Cursor)对象用于执行 SQL 语句和获取查询结果。

execute() 方法

函数签名:cursor.execute(sql, parameters=()) -> Cursor

参数说明:

  • sql:要执行的 SQL 语句
  • parameters:SQL 参数,用于参数化查询(防止 SQL 注入)

返回值:

  • 返回 Cursor 对象本身
import sqlite3

# 创建连接和游标
conn = sqlite3.connect('example.db')
cursor = conn.cursor()

# 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
age INTEGER,
email TEXT UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')

# 插入单条数据 - 使用参数化查询(推荐)
cursor.execute(
"INSERT INTO users (name, age, email) VALUES (?, ?, ?)",
('张三', 25, 'zhangsan@example.com')
)

# 插入数据 - 使用命名参数
cursor.execute(
"INSERT INTO users (name, age, email) VALUES (:name, :age, :email)",
{'name': '李四', 'age': 30, 'email': 'lisi@example.com'}
)

# 提交事务
conn.commit()

# 查询数据
cursor.execute("SELECT * FROM users WHERE age > ?", (20,))
results = cursor.fetchall()
print(results)

# 关闭连接
conn.close()
warning

永远不要使用字符串拼接构建 SQL 语句!

# ❌ 危险的做法 - 容易受到 SQL 注入攻击
user_input = "admin' OR '1'='1"
cursor.execute(f"SELECT * FROM users WHERE name = '{user_input}'")

# ✅ 安全的做法 - 使用参数化查询
cursor.execute("SELECT * FROM users WHERE name = ?", (user_input,))

executemany() 方法

用于批量执行相同的 SQL 语句。

函数签名:cursor.executemany(sql, parameters) -> Cursor

import sqlite3

conn = sqlite3.connect('example.db')
cursor = conn.cursor()

# 批量插入数据
users = [
('王五', 28, 'wangwu@example.com'),
('赵六', 35, 'zhaoliu@example.com'),
('孙七', 22, 'sunqi@example.com')
]

cursor.executemany(
"INSERT INTO users (name, age, email) VALUES (?, ?, ?)",
users
)
conn.commit()

print(f"插入了 {cursor.rowcount} 行数据")

conn.close()

executescript() 方法

执行包含多条 SQL 语句的脚本。

import sqlite3

conn = sqlite3.connect('example.db')
cursor = conn.cursor()

# 执行多条 SQL 语句
cursor.executescript('''
DROP TABLE IF EXISTS products;

CREATE TABLE products (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
price REAL,
stock INTEGER DEFAULT 0
);

INSERT INTO products (name, price, stock) VALUES
('笔记本电脑', 5999.99, 10),
('机械键盘', 399.99, 50),
('鼠标', 99.99, 100);
''')

conn.commit()
conn.close()

获取查询结果

fetchone() 方法

获取查询结果的下一行。

函数签名:cursor.fetchone() -> tuple | None

返回值:

  • 返回一行结果(元组格式),如果没有更多行则返回 None
import sqlite3

conn = sqlite3.connect('example.db')
cursor = conn.cursor()

cursor.execute("SELECT * FROM users WHERE age > ?", (25,))

# 获取第一行
row = cursor.fetchone()
if row:
print(f"用户: {row}") # (1, '张三', 25, 'zhangsan@example.com', ...)

# 再次调用获取下一行
next_row = cursor.fetchone()
if next_row:
print(f"下一个用户: {next_row}")

conn.close()

fetchmany() 方法

获取查询结果的多行。

函数签名:cursor.fetchmany(size=cursor.arraysize) -> list[tuple]

参数说明:

  • size:要获取的行数,默认为 cursor.arraysize(通常为 1)
import sqlite3

conn = sqlite3.connect('example.db')
cursor = conn.cursor()

cursor.execute("SELECT * FROM users")

# 每次获取 2 行
while True:
rows = cursor.fetchmany(2)
if not rows:
break
for row in rows:
print(row)

conn.close()

fetchall() 方法

获取查询结果的所有行。

函数签名:cursor.fetchall() -> list[tuple]

返回值:

  • 返回所有剩余行的列表
import sqlite3

conn = sqlite3.connect('example.db')
cursor = conn.cursor()

cursor.execute("SELECT name, age FROM users ORDER BY age DESC")
all_users = cursor.fetchall()

for name, age in all_users:
print(f"{name}: {age}岁")

conn.close()
tip

性能建议

  • 如果结果集很大,使用 fetchmany() 分批处理,避免内存占用过高
  • 如果只需要一行结果,使用 fetchone()
  • 如果确定结果集不大,使用 fetchall() 最方便

使用 Row 工厂

默认情况下,查询结果返回元组。可以设置 row_factory 来改变返回格式。

import sqlite3

conn = sqlite3.connect('example.db')

# 使用 Row 工厂,可以通过列名访问数据
conn.row_factory = sqlite3.Row

cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE id = ?", (1,))
row = cursor.fetchone()

if row:
# 通过索引访问
print(row[0])

# 通过列名访问(推荐)
print(row['name'])
print(row['email'])

# 转换为字典
user_dict = dict(row)
print(user_dict) # {'id': 1, 'name': '张三', ...}

# 获取所有列名
print(row.keys()) # ['id', 'name', 'age', 'email', ...]

conn.close()

事务管理

SQLite 默认支持事务。

import sqlite3

conn = sqlite3.connect('example.db')
cursor = conn.cursor()

try:
# 开始事务(隐式开始)
cursor.execute("INSERT INTO users (name, age) VALUES (?, ?)", ('测试用户1', 20))
cursor.execute("INSERT INTO users (name, age) VALUES (?, ?)", ('测试用户2', 21))

# 提交事务
conn.commit()
print("事务提交成功")

except sqlite3.Error as e:
# 发生错误时回滚
conn.rollback()
print(f"事务回滚: {e}")

finally:
conn.close()

# 推荐方式:使用上下文管理器自动管理事务
with sqlite3.connect('example.db') as conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO users (name, age) VALUES (?, ?)", ('测试用户3', 22))
# 退出时自动提交,发生异常时自动回滚
info

事务隔离级别

  • None:自动提交模式,每条语句立即提交
  • "DEFERRED":默认值,在首次读写时开始事务
  • "IMMEDIATE":立即开始事务
  • "EXCLUSIVE":立即获取排他锁
conn = sqlite3.connect('example.db', isolation_level='IMMEDIATE')

数据类型映射

SQLite 只有 5 种存储类型,Python 的 sqlite3 模块会自动进行类型转换。

Python 类型SQLite 类型说明
NoneNULL空值
intINTEGER整数
floatREAL浮点数
strTEXT文本
bytesBLOB二进制数据
import sqlite3
import datetime

conn = sqlite3.connect('example.db')
cursor = conn.cursor()

# 创建测试表
cursor.execute('''
CREATE TABLE IF NOT EXISTS data_types (
id INTEGER PRIMARY KEY,
integer_col INTEGER,
real_col REAL,
text_col TEXT,
blob_col BLOB,
null_col TEXT
)
''')

# 插入不同类型的数据
cursor.execute('''
INSERT INTO data_types (integer_col, real_col, text_col, blob_col, null_col)
VALUES (?, ?, ?, ?, ?)
''', (42, 3.14, 'Hello', b'binary data', None))

conn.commit()

# 查询数据
cursor.execute("SELECT * FROM data_types")
row = cursor.fetchone()
print(row)
# (1, 42, 3.14, 'Hello', b'binary data', None)

conn.close()

日期和时间处理

SQLite 没有专门的日期时间类型,通常使用 TEXT、REAL 或 INTEGER 存储。

import sqlite3
from datetime import datetime, date

# 注册适配器和转换器
sqlite3.register_adapter(datetime, lambda val: val.isoformat())
sqlite3.register_adapter(date, lambda val: val.isoformat())

sqlite3.register_converter("timestamp", lambda val: datetime.fromisoformat(val.decode()))
sqlite3.register_converter("date", lambda val: date.fromisoformat(val.decode()))

# 连接时启用类型检测
conn = sqlite3.connect('example.db', detect_types=sqlite3.PARSE_DECLTYPES)
cursor = conn.cursor()

cursor.execute('''
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY,
title TEXT,
event_date date,
created_at timestamp
)
''')

# 插入日期时间
now = datetime.now()
today = date.today()

cursor.execute(
"INSERT INTO events (title, event_date, created_at) VALUES (?, ?, ?)",
('会议', today, now)
)
conn.commit()

# 查询会自动转换回 Python 对象
cursor.execute("SELECT * FROM events")
row = cursor.fetchone()
print(f"事件: {row[1]}, 日期: {row[2]}, 创建时间: {row[3]}")
print(f"类型: {type(row[2])}, {type(row[3])}")

conn.close()

实用技巧

获取插入的自增 ID

import sqlite3

conn = sqlite3.connect('example.db')
cursor = conn.cursor()

cursor.execute("INSERT INTO users (name, age) VALUES (?, ?)", ('新用户', 25))
conn.commit()

# 获取最后插入的行 ID
last_id = cursor.lastrowid
print(f"新插入的用户 ID: {last_id}")

conn.close()

检查表是否存在

import sqlite3

def table_exists(conn, table_name):
"""检查表是否存在"""
cursor = conn.cursor()
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name=?
""", (table_name,))
return cursor.fetchone() is not None

conn = sqlite3.connect('example.db')

if table_exists(conn, 'users'):
print("users 表存在")
else:
print("users 表不存在")

conn.close()

获取表结构信息

import sqlite3

conn = sqlite3.connect('example.db')
cursor = conn.cursor()

# 获取表的列信息
cursor.execute("PRAGMA table_info(users)")
columns = cursor.fetchall()

print("表结构:")
for col in columns:
cid, name, col_type, notnull, default, pk = col
print(f" {name}: {col_type}", end='')
if pk:
print(" [PRIMARY KEY]", end='')
if notnull:
print(" [NOT NULL]", end='')
print()

# 获取所有表名
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = cursor.fetchall()
print("\n数据库中的表:")
for table in tables:
print(f" - {table[0]}")

conn.close()

备份数据库

import sqlite3

def backup_database(source_db, target_db):
"""备份数据库到另一个文件"""
source = sqlite3.connect(source_db)
target = sqlite3.connect(target_db)

# 使用 backup API
source.backup(target)

source.close()
target.close()
print(f"数据库已备份到 {target_db}")

# 备份到文件
backup_database('example.db', 'example_backup.db')

# 或者直接复制文件(确保数据库没有活动连接)
import shutil
shutil.copy2('example.db', 'example_backup2.db')

使用装饰器简化数据库操作

import sqlite3
from functools import wraps

def with_database(db_path):
"""数据库操作装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
try:
result = func(conn, *args, **kwargs)
conn.commit()
return result
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
return wrapper
return decorator

@with_database('example.db')
def get_user_by_id(conn, user_id):
"""根据 ID 获取用户"""
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
return cursor.fetchone()

@with_database('example.db')
def create_user(conn, name, age, email):
"""创建新用户"""
cursor = conn.cursor()
cursor.execute(
"INSERT INTO users (name, age, email) VALUES (?, ?, ?)",
(name, age, email)
)
return cursor.lastrowid

# 使用
user = get_user_by_id(1)
if user:
print(dict(user))

new_id = create_user('新用户', 30, 'new@example.com')
print(f"创建了用户,ID: {new_id}")

常见陷阱与最佳实践

陷阱 1:线程安全问题

import sqlite3
import threading

# ❌ 错误:多个线程共享同一个连接
conn = sqlite3.connect('example.db')

def worker():
cursor = conn.cursor() # 多线程共享连接会出问题
cursor.execute("INSERT INTO users (name, age) VALUES (?, ?)", ('用户', 20))
conn.commit()

threads = [threading.Thread(target=worker) for _ in range(5)]
for t in threads:
t.start()

# ✅ 正确:每个线程创建自己的连接
def worker_correct():
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
cursor.execute("INSERT INTO users (name, age) VALUES (?, ?)", ('用户', 20))
conn.commit()
conn.close()

# 或者使用 check_same_thread=False(谨慎使用)
conn = sqlite3.connect('example.db', check_same_thread=False)

陷阱 2:忘记提交事务

import sqlite3

# ❌ 错误:修改数据后忘记提交
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
cursor.execute("INSERT INTO users (name, age) VALUES (?, ?)", ('测试', 25))
conn.close() # 数据未提交,丢失了!

# ✅ 正确:记得提交
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
cursor.execute("INSERT INTO users (name, age) VALUES (?, ?)", ('测试', 25))
conn.commit() # 提交事务
conn.close()

# ✅ 更好:使用上下文管理器
with sqlite3.connect('example.db') as conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO users (name, age) VALUES (?, ?)", ('测试', 25))
# 自动提交

陷阱 3:频繁的小事务导致性能差

import sqlite3
import time

conn = sqlite3.connect('example.db')
cursor = conn.cursor()

# ❌ 慢:每次插入都提交
start = time.time()
for i in range(1000):
cursor.execute("INSERT INTO users (name, age) VALUES (?, ?)", (f'用户{i}', 20))
conn.commit() # 每次都提交,很慢!
print(f"耗时: {time.time() - start:.2f}秒")

# ✅ 快:批量提交
start = time.time()
for i in range(1000):
cursor.execute("INSERT INTO users (name, age) VALUES (?, ?)", (f'用户{i}', 20))
conn.commit() # 一次性提交
print(f"耗时: {time.time() - start:.2f}秒")

# ✅ 更快:使用 executemany
start = time.time()
data = [(f'用户{i}', 20) for i in range(1000)]
cursor.executemany("INSERT INTO users (name, age) VALUES (?, ?)", data)
conn.commit()
print(f"耗时: {time.time() - start:.2f}秒")

conn.close()

陷阱 4:数据库锁定

import sqlite3

# SQLite 在写入时会锁定整个数据库

# ❌ 可能导致死锁
conn1 = sqlite3.connect('example.db')
conn2 = sqlite3.connect('example.db')

cursor1 = conn1.cursor()
cursor2 = conn2.cursor()

cursor1.execute("BEGIN")
cursor1.execute("INSERT INTO users (name, age) VALUES (?, ?)", ('用户1', 20))

# conn2 尝试写入会被阻塞,直到 conn1 提交或回滚
try:
cursor2.execute("INSERT INTO users (name, age) VALUES (?, ?)", ('用户2', 21))
conn2.commit()
except sqlite3.OperationalError as e:
print(f"数据库被锁定: {e}")

conn1.commit()
conn1.close()
conn2.close()

# ✅ 解决方案:设置更长的超时时间
conn = sqlite3.connect('example.db', timeout=30.0)

性能优化

使用索引

import sqlite3

conn = sqlite3.connect('example.db')
cursor = conn.cursor()

# 创建索引以加速查询
cursor.execute("CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_users_age ON users(age)")

# 查看查询计划
cursor.execute("EXPLAIN QUERY PLAN SELECT * FROM users WHERE email = ?", ('test@example.com',))
print(cursor.fetchall())

conn.close()

使用 WAL 模式

import sqlite3

conn = sqlite3.connect('example.db')

# 启用 WAL(Write-Ahead Logging)模式
# 优点:读写可以并发,性能更好
conn.execute("PRAGMA journal_mode=WAL")

# 其他性能优化设置
conn.execute("PRAGMA synchronous = NORMAL") # 平衡安全性和性能
conn.execute("PRAGMA cache_size = -64000") # 设置缓存大小(KB)
conn.execute("PRAGMA temp_store = MEMORY") # 临时表存储在内存中

conn.close()

批量操作技巧

import sqlite3

conn = sqlite3.connect('example.db')
cursor = conn.cursor()

# 大量插入时暂时关闭同步
cursor.execute("PRAGMA synchronous = OFF")
cursor.execute("PRAGMA journal_mode = MEMORY")

# 使用事务
cursor.execute("BEGIN TRANSACTION")

# 批量插入
data = [(f'用户{i}', i % 100) for i in range(10000)]
cursor.executemany("INSERT INTO users (name, age) VALUES (?, ?)", data)

cursor.execute("COMMIT")

# 恢复正常设置
cursor.execute("PRAGMA synchronous = FULL")

conn.close()

完整示例:简单的任务管理系统

import sqlite3
from datetime import datetime
from typing import List, Optional

class TaskManager:
"""简单的任务管理系统"""

def __init__(self, db_path: str = 'tasks.db'):
self.db_path = db_path
self._init_database()

def _init_database(self):
"""初始化数据库"""
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
description TEXT,
status TEXT DEFAULT 'pending',
priority INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP
)
''')
conn.execute('CREATE INDEX IF NOT EXISTS idx_status ON tasks(status)')

def add_task(self, title: str, description: str = '', priority: int = 0) -> int:
"""添加新任务"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
'INSERT INTO tasks (title, description, priority) VALUES (?, ?, ?)',
(title, description, priority)
)
return cursor.lastrowid

def get_task(self, task_id: int) -> Optional[dict]:
"""获取单个任务"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('SELECT * FROM tasks WHERE id = ?', (task_id,))
row = cursor.fetchone()
return dict(row) if row else None

def list_tasks(self, status: Optional[str] = None) -> List[dict]:
"""列出任务"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()

if status:
cursor.execute(
'SELECT * FROM tasks WHERE status = ? ORDER BY priority DESC, created_at',
(status,)
)
else:
cursor.execute('SELECT * FROM tasks ORDER BY priority DESC, created_at')

return [dict(row) for row in cursor.fetchall()]

def complete_task(self, task_id: int) -> bool:
"""完成任务"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
'UPDATE tasks SET status = ?, completed_at = ? WHERE id = ?',
('completed', datetime.now().isoformat(), task_id)
)
return cursor.rowcount > 0

def delete_task(self, task_id: int) -> bool:
"""删除任务"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM tasks WHERE id = ?', (task_id,))
return cursor.rowcount > 0

# 使用示例
if __name__ == '__main__':
tm = TaskManager()

# 添加任务
task1_id = tm.add_task('学习 SQLite', '完成 SQLite 教程', priority=1)
task2_id = tm.add_task('写代码', '实现任务管理功能', priority=2)
task3_id = tm.add_task('测试', '编写单元测试', priority=1)

# 列出所有任务
print("所有任务:")
for task in tm.list_tasks():
print(f" [{task['id']}] {task['title']} (优先级: {task['priority']}, 状态: {task['status']})")

# 完成一个任务
tm.complete_task(task1_id)

# 列出待完成的任务
print("\n待完成的任务:")
for task in tm.list_tasks(status='pending'):
print(f" [{task['id']}] {task['title']}")

# 获取单个任务
task = tm.get_task(task2_id)
if task:
print(f"\n任务详情: {task}")

相关资源推荐

tip

如果项目需求超出 SQLite 的能力范围(如需要高并发写入、网络访问、用户权限管理等),建议迁移到 PostgreSQL 或 MySQL。