SQLAlchemy 2.0 Async ORM 在 FastAPI 中的完整 CRUD 操作指南

张开发
2026/4/10 16:47:47 15 分钟阅读

分享文章

SQLAlchemy 2.0 Async ORM 在 FastAPI 中的完整 CRUD 操作指南
在 FastAPI 项目开发中SQLAlchemy 2.0 AsyncSession 已成为异步数据库操作的主流方案。本文从零到一、由浅入深系统讲解 ORM 的查询、新增、更新、删除全流程全部代码均可直接复制使用助力你快速掌握生产级 CRUD 操作。SQLAlchemy 2.0 Async ORM 在 FastAPI 中的完整 CRUD 操作指南2026 最新版 · 推荐收藏在 FastAPI 项目开发中SQLAlchemy 2.0 AsyncSession已成为异步数据库操作的主流方案。本文从零到一、由浅入深系统讲解 ORM 的查询、新增、更新、删除全流程全部代码均可直接复制使用助力你快速掌握生产级 CRUD 操作。一、核心执行流程所有 ORM 操作都遵循以下核心模式resultawaitdb.execute(模型类)# 执行 SQL 语句常用结果处理方式result.scalars().all()—— 返回模型对象列表最常用result.scalars().first()—— 返回第一条数据result.scalar()—— 返回单个标量值如 count、max二、查询操作# 创建异步引擎ASYNC_DATABASE_URLmysqlaiomysql://root:rootlocalhost:3306/FastAPI_first?charsetutf8async_enginecreate_async_engine(ASYNC_DATABASE_URL,echoTrue,pool_size10,max_overflow20)asynccontextmanagerasyncdeflifespan(app:FastAPI):print(正在创建数据库表...)try:awaitcreate_table()print(数据库表创建成功)exceptExceptionase:print(f创建数据库表时出错{e})yieldprint(应用关闭)appFastAPI(lifespanlifespan)# 定义基类classBase(DeclarativeBase):create_time:Mapped[datetime]mapped_column(DateTime,insert_defaultfunc.now(),comment创建时间)update_time:Mapped[datetime]mapped_column(DateTime,insert_defaultfunc.now(),onupdatefunc.now(),comment更新时间)# 定义Book模型classBook(Base):__tablename__bookid:Mapped[int]mapped_column(primary_keyTrue,comment书籍id)bookname:Mapped[str]mapped_column(String(255),comment书名)author:Mapped[str]mapped_column(String(255),comment作者)price:Mapped[float]mapped_column(Float,comment价格)publisher:Mapped[str]mapped_column(String(255),comment出版社)# 建表函数asyncdefcreate_table():asyncwithasync_engine.begin()asconn:awaitconn.run_sync(Base.metadata.create_all)AysyncSessionLocalasync_sessionmaker(bindasync_engine,class_AsyncSession,expire_on_commitFalse)asyncdefget_date():asyncwithAysyncSessionLocal()assession:try:yieldsession#这是一个返回数据库会话给路由处理函数类似returnexceptException:awaitsession.rollback()#有异常回滚raisefinally:awaitsession.close()2.1 基础查询app.get(/book/books)asyncdefget_book_list(db:AsyncSessionDepends(get_data)):resultawaitdb.execute(select(Book))bookresult.scalars().all()#book result.first().all()#book await db.get(Book,5)#获取单条数据-》根据主键returnbook2.2 条件查询where# 基础条件#需求路径参数 书idapp.get(/book/{book_id})asyncdefget_book_lists(book_id:int,db:AsyncSessionDepends(get_data)):resultawaitdb.execute(select(Book).where(Book.idbook_id))bookresult.scalars().all()returnbook# 复杂条件与、或、非#%多个字匹配只匹配一个_app.get(/book/author/search_book)asyncdefget_search_book(db:AsyncSessionDepends(get_data)):resultawaitdb.execute(select(Book).where(Book.author.like(张_)(Book.price100)))bookresult.scalars().all()returnbook常用条件方法like(张%)、like(张_)in_([1, 3, 5])between(50, 150)is_(None)/is_not(None)2.3 聚合查询fromsqlalchemyimportfunc count(awaitdb.execute(select(func.count(Book.id)))).scalar()max_price(awaitdb.execute(select(func.max(Book.price)))).scalar()avg_price(awaitdb.execute(select(func.avg(Book.price)))).scalar()为什么需要分页假设数据库有 10000 条书籍记录❌ 不分页一次性返回 10000 条服务器内存爆炸网络传输慢前端渲染卡死用户体验极差✅ 分页每次只返回 10-20 条服务器压力小加载速度快用户体验好分页的核心参数参数含义示例page当前页码第1页、第2页page_size / limit每页条数每页10条、20条skip / offset跳过多少条第2页跳过10条示例假设 page_size 10第1页skip (1-1) * 10 0 → 取第 1-10 条第2页skip (2-1) * 10 10 → 取第 11-20 条第3页skip (3-1) * 10 20 → 取第 21-30 条分页公式核心skip(page-1)*page_size### 2.4 分页查询pythonapp.get(/book/get_book_list)asyncdefget_book_list(page:int1,# 用户想看第几页page_size:int2,# 每页显示几条db:AsyncSessionDepends(get_data),):skip(page-1)*page_size# 计算跳过多少条stmtselect(Book).offset(skip).limit(page_size)# SQL分页resultawaitdb.execute(stmt)booksresult.scalars().all()returnbooks三、新增操作Create#需求用户输入图书信息id,书名作者 价格出版社#用户输入-》参数-》请求体classBookBase(BaseModel):id:intbookname:strauthor:strprice:floatpublisher:strapp.post(/book/add_book)asyncdefadd_book(book:BookBase,db:AsyncSessionDepends(get_date)):#orm对象 去 add 然后commit#1、创建orm对象book_objBook(**book.__dict__)db.add(book_obj)awaitdb.commit()returnbook_obj批量新增使用db.add_all([obj1, obj2, ...])四、更新操作Update推荐写法先查后改最清晰、安全#需求 修改图书的信息 先查再改#设计思路:路径参数书籍id 作用是查找 请求体参数作用是新数据书名、作者、价格、出版社#路径参数的在函数有一个同名的形参classBookUpdate(BaseModel):bookname:strauthor:strprice:floatpublisher:strapp.put(/book/update_book/{book_id})asyncdefupdata_book(book_id:int,data:BookUpdate,db:AsyncSessionDepends(get_data),):db_bookawaitdb.get(Book,book_id)ifdb_bookisNone:raiseHTTPException(status_code404,detail查无此书)# 修改其实是查询赋值db_book.booknamedata.bookname db_book.authordata.author db_book.pricedata.price db_book.publisherdata.publisherawaitdb.commit()return{message:修改成功}五、删除操作Deleteapp.delete(/book/{book_id})asyncdefdelete_book(book_id:int,db:AsyncSessionDepends(get_db)):db_bookawaitdb.get(Book,book_id)ifnotdb_book:raiseHTTPException(status_code404,detail图书不存在)awaitdb.delete(db_book)awaitdb.commit()return{message:删除成功}批量删除awaitdb.execute(delete(Book).where(Book.price20))awaitdb.commit()六、最佳实践与注意事项✅ 推荐做法始终搭配await db.commit()和await db.refresh()更新时使用exclude_unsetTrue避免覆盖默认值分页必须添加order_by()防止结果乱序路径参数用于查询请求体用于新增/修改所有接口做好 404 判断和异常处理⚠️ 注意事项生产环境关闭 SQL 日志echoFalse复杂查询建议使用selectinload解决 N1 问题批量操作后一定要commit()

更多文章