【FastAPI】ORM-02.使用 ORM 高效处理数据库逻辑
目录一查询操作1.查询基础与结果处理 (Execution/Scalars)2.条件查询3.聚合操作 (Count / Group By / Avg / Sum)4. 分页查询5.连接查询5.1 relationship的简单说明5.2 连接查询5.2.1 显示JOIN二新增更新和删除操作1. 新增数据 (Create)2.更新数据 (Update)3.删除数据总结 FastAPI ORM 操作速查表一查询操作方便演示下面统一定义一个基础模型fromsqlalchemy.ormimportMapped,mapped_columnfromsqlalchemyimportString,Integer,FloatclassProduct(Base):__tablename__productsid:Mapped[int]mapped_column(primary_keyTrue)name:Mapped[str]mapped_column(String(50))price:Mapped[float]mapped_column(Float)stock:Mapped[int]mapped_column(Integer)category:Mapped[str]mapped_column(String(20))1.查询基础与结果处理 (Execution/Scalars)在异步中所有的查询都通过 db.execute()db就是我们注入的AsyncSession对象 发出但返回的是一个包装过的 Result 对象。scalars(): 最常用。把原始的行记录转换成模型对象如 Product 对象。first(): 拿第一条没找到返回 None。all(): 拿全部返回列表。基础使用如下fromsqlalchemyimportselectapp.get(/products)asyncdefget_products(db:AsyncSessionDepends(get_db)):# 1. 构造语句stmtselect(Product)# 2. 异步执行得到 Result 对象resultawaitdb.execute(stmt)# 3. 提取模型对象列表productsresult.scalars().all()returnproductsawait db.execute(stmt) 返回的是什么数据库实际跑了一条 SELECT products.id, products.name, … FROM products;SQLAlchemy 把每一行原始数据5 个字段打包成 Row 对象。result 是一个 Result 对象里面装着很多个 Row。此时你还没有得到任何一个 Product 实例只是拿到了一堆长得像元组 字典的混合体Row。select(Product) 写下的“蓝图”决定了什么你在 select() 里把什么放在第一个参数SQLAlchemy 就会把那一行的第一个位置标记成什么类型的东西。写 select(Product)SQLAlchemy 心里说“第一列是一个 Product 实体”。于是当收到数据库返回的原始行时它会用整行数据自动组装出一个 Product 实例塞到第一个位置。写 select(Product.name)第一个参数只是一个字段那第一个位置就是一个普通字符串不会组装对象。这个蓝图在你调用 db.execute(stmt) 之前就已经定好了。scalars() 到底干了什么Result.scalars() 的工作非常简单把每一行的第一个元素拿出来丢掉其他列返回一个 ScalarResult。如果蓝图里第一个位置是 Product 实体那么拿出来的就是 Product 对象。如果蓝图里第一个位置是 Product.name 字段那么拿出来的就是字符串。# 第一个参数是 Product 实体 → scalars() 返回 List[Product]resultawaitdb.execute(select(Product))productsresult.scalars().all()# 每个元素都是 Product 实例# 第一个参数是字段 → scalars() 返回 List[str]resultawaitdb.execute(select(Product.name))namesresult.scalars().all()# 每个元素都是字符串注意不是看什么“主键”或“字段个数”只取决于你在 select() 的第一个参数是字段还是整个模型类。之后用得到的ScalarResult的方法根据需求选择first还是all就行了。其实Result上也可以直接用first或all只不过这样拿到的是Row对象。除去上面这种方式外还能用AsyncSession直接提供的快捷方法通过db直接使用。db.get(Model, primary_key) —— 按主键查一个db.add(instance) —— 把新对象挂到会话db.delete(instance) —— 标记删除db.refresh(instance) —— 重新从数据库拉取最新数据这些方法是直接在会话层提供的“语法糖”省去手写 SQL 的步骤。2.条件查询条件查询是 SQLAlchemy 里最核心的“筛选”技能。无论查单条还是查列表99% 的业务逻辑都要靠它。fromsqlalchemyimportselect,and_,or_# 基础条件stmtselect(Product).where(Product.price1000)# AND多次 where 或用 and_stmtselect(Product).where(Product.price1000,Product.stock50)# 逗号默认 ANDstmtselect(Product).where(and_(Product.price1000,Product.stock50))# 显式 and_# OR必须用 or_stmtselect(Product).where(or_(Product.category电子,Product.category家电))# AND 与 OR 混合stmtselect(Product).where(and_(or_(Product.category电子,Product.category家电),Product.price1000))# SQL: WHERE (category 电子 OR category 家电) AND price 1000# INstmtselect(Product).where(Product.id.in_([1,3,5]))# BETWEENstmtselect(Product).where(Product.price.between(500,2000))# 模糊查询%代表多个字符_代表一个字符stmtselect(Product).where(Product.name.contains(手机))# LIKE %手机%stmtselect(Product).where(Product.name.startswith(华为))# LIKE 华为%stmtselect(Product).where(Product.name.endswith(Pro))# LIKE %Prostmtselect(Product).where(Product.name.ilike(%phone%))# ILIKE (不区分大小写)# 动态条件conditions[]ifmin_price:conditions.append(Product.pricemin_price)ifcategory:conditions.append(Product.categorycategory)ifconditions:stmtstmt.where(and_(*conditions))其它! 比较运算符的用法和python差不多。需要注意条件本身是同步构建的Product.price 1000 只是个表达式对象不涉及 IO所以不需要 await。真正异步的只有 db.execute(stmt) 这一步。所有条件运算符都返回新的条件对象可任意组合、赋值给变量再动态拼进 where()。还有SQLAlchemy 支持用 | 和 来表示 OR 和 AND因为条件对象重载了这些位运算符。如stmtselect(Product).where(((Product.category电子)|(Product.category家电))(Product.price1000))这会生成 WHERE (category‘电子’ OR category‘家电’) AND price 1000看起来比 or_、and_ 更直观。呃虽然这里很方便但貌似会有些陷阱之类的更推荐使用or_和and_再补充一个one_or_none()方法 是 scalars() 之后才能用的结果提取方法之一返回 0 条或 1 条记录。如果查出多条直接抛异常。3.聚合操作 (Count / Group By / Avg / Sum)聚合查询和普通查询最本质的区别在于它返回的不再是完整的模型对象而是经过计算后的统计值数字、分组字段等。因此整个处理流程都不同。如下fromsqlalchemyimportfuncapp.get(/stats)asyncdefget_stats(db:AsyncSessionDepends(get_db)):# 统计每个类别的平均价格和总数stmt(select(Product.category,func.avg(Product.price).label(avg_price),func.count(Product.id).label(total)).group_by(Product.category).having(func.count(Product.id)1)# 过滤分组)resultawaitdb.execute(stmt)# 使用 mappings() 可以直接转换成字典格式非常方便返回 JSONreturnresult.mappings().all()为什么不能用 scalars()像上面说的scalars() 的工作原理是提取每一行中的第一个元素并期望这个元素是 ORM 实体。但聚合查询的 select() 里放的是select(Product.category,func.avg(...),func.count(...))第一个元素是 Product.category只是一个普通字符串字段。如果强行调用 scalars()你只能拿到每行的第一个字段category后面的平均值、计数全部丢失。更关键的是聚合查询的结果没有 ORM 实体所以本来就不该用 scalars()。正确的处理方式mappings()mappings() 把每一行结果转换成一个 RowMapping 对象行为类似字典。调用 .all() 后得到字典列表直接就能被 FastAPI 序列化成 JSON。resultawaitdb.execute(stmt)dataresult.mappings().all()# 返回示例# [# {category: 电子, avg_price: 3500.0, total: 12},# {category: 家电, avg_price: 2100.0, total: 8}# ]如果你想直接获取元组也可以用 result.all()但处理起来不如字典方便所以 mappings() 是聚合查询的标配。func 是什么func 是 SQLAlchemy 提供的SQL 函数生成器。你调用 func.xxx() 就相当于生成一个 SQL 函数表达式。fromsqlalchemyimportfunc func.count(Product.id)# COUNT(products.id)func.avg(Product.price)# AVG(products.price)func.sum(Product.stock)# SUM(products.stock)func.min(Product.price)# MIN(products.price)func.max(Product.price)# MAX(products.price)func.now()# 数据库当前时间不是聚合函数但同样用法重要这些函数的返回值就是计算后的数值不是 ORM 对象再次印证不能用 scalars()。label()——给计算结果起别名.group_by(Product.category)聚合函数的返回值在行里默认没有易读的名字必须用 .label(“别名”) 给它命名这样到了 mappings() 返回的字典里键名就是你指定的别名。如果不加 label()字典的键会直接是数据库生成的奇怪名字如 avg_1不利于前端对接。group_by() 分组告诉数据库按哪个字段分组进行统计。例如.group_by(Product.category)SQL 会变成 GROUP BY products.category。你可以在 select() 中同时选择分组字段和多个聚合函数但要注意非聚合字段必须出现在 group_by 里SQL 严格模式要求否则数据库会报错。having()——过滤分组结果.having(func.count(Product.id)1)和数据库中的类似。。。4. 分页查询基本公式分页由offset和limit配合实现offset(page-1)*size stmtselect(Product).offset(offset).limit(size)page页码从 1 开始 如果是第一页那就不用跳过数据所以offset是0以此类推size每页条数offset跳过前 N 行limit最多取 M 行如果只写offset不加limit数据库会跳过前 N 行后返回所有剩余行无法控制返回量。limit和size的数值一样它们的意义也一样。必须加上order_by()保证结果稳定没有排序的分页是“随机分页”。数据库在不指定ORDER BY时返回顺序不确定可能导致同一记录出现在不同页或分页结果重复、遗漏。务必加上稳定的排序字段如主键id或创建时间created_atstmt(select(Product).order_by(Product.id.asc()).offset((page-1)*size).limit(size))排序字段应唯一若业务排序字段可能重复可以用第二个字段兜底例如.order_by(Product.price.desc(), Product.id.asc())。desc降序asc升序5.连接查询5.1 relationship的简单说明多表关系在 SQLAlchemy 里靠两个东西表达ForeignKey在数据库层面建立外键约束说明“这列引用另一张表的哪一列”。relationship在 Python 层面声明模型之间的关联让你能通过属性直接访问关联对象如 product.category、category.products。像下面这样写fromsqlalchemyimportString,ForeignKeyfromsqlalchemy.ormimportMapped,mapped_column,relationshipclassCategory(Base):__tablename__categoriesid:Mapped[int]mapped_column(primary_keyTrue)name:Mapped[str]mapped_column(String(50),uniqueTrue)# relationship让 Category 能直接拿到它的所有 Productproducts:Mapped[list[Product]]relationship(back_populatescategory,lazyselectin,# 异步安全)classProduct(Base):__tablename__productsid:Mapped[int]mapped_column(primary_keyTrue)name:Mapped[str]mapped_column(String(100))price:Mapped[float]# 外键 —— 数据库里实际存在的列category_id:Mapped[int]mapped_column(ForeignKey(categories.id))# relationship让 Product 能直接拿到它所属的 Categorycategory:Mapped[Category]relationship(back_populatesproducts,lazyselectin,)relationship的back_populates参数后面填的是对方模型relationship的属性名字符串形式连接。ForeignKey 是数据库里实际存储的“硬约束”relationship 是 Python 代码里让你方便导航对象图的“软通道”。relationship 完全不影响表结构它只影响 Python 对象的属性导航。比如说在上述例子中建立了这两个连接之后我们在路由或者其它地方中就可以像下面这样使用。fromfastapiimportDepends,HTTPExceptionfromsqlalchemy.ext.asyncioimportAsyncSessionapp.get(/categories/{category_id})asyncdefget_category(category_id:int,db:AsyncSessionDepends(get_db)):# 用 db.get 按主键查分类categoryawaitdb.get(Category,category_id)ifnotcategory:raiseHTTPException(status_code404,detail分类不存在)# 直接通过 category.products 就能拿到商品列表已经预加载好了return{id:category.id,name:category.name,products:[{id:p.id,name:p.name,price:p.price}forpincategory.products]}5.2 连接查询ORM 中的连接查询本质上就是在映射 SQL 的多表查询。在 SQLAlchemy 异步里我们会用不同方式来实现 JOIN、子查询、IN 过滤等目的都是把原本分散在多条 SQL 中的数据用尽量少的数据库往返拿回来。连接查询分为两类关系预加载selectinload / joinedload和显式 JOIN。5.2.1 显示JOIN不仅可以连表还能只投影部分字段、使用复杂连接条件、基于关联表过滤主表等。它就是传统 SQL 里 JOIN 的直接映射。stmtselect(要查的实体或列).join(目标表/实体,连接条件).where(...)内连接.join(Post, User.id Post.user_id) 就对应 SQL 的 INNER JOIN … ON。外连接只需要在join中添加isouterTrue就表示外连接左外连接也可以使用 .outerjoin(Post, 条件) 简写效果相同。stmt(select(User.name,Post.title).join(Post,User.idPost.user_id,isouterTrue)# isouterTrue 表示外连接)resultawaitsession.execute(stmt)rowsresult.all()需要 右外连接Rare交换表位置即可。自连接都是一样的用aliased取个别名就行。fromsqlalchemy.ormimportaliased UserAliasaliased(User)# 相当于起别名 u2stmt(select(User.name,UserAlias.name).join(UserAlias,(User.nameUserAlias.name)(User.id!UserAlias.id)))resultawaitsession.execute(stmt)rowsresult.all()二新增更新和删除操作1. 新增数据 (Create)新增数据通常分为三步创建实例、添加到 Session、提交事务。fromsqlalchemy.ext.asyncioimportAsyncSessionfrom.modelsimportUserfrom.schemasimportUserCreateasyncdefcreate_user(db:AsyncSession,user_in:UserCreate):# 1. 创建模型实例同步不需要 awaitnew_userUser(nameuser_in.name,emailuser_in.email,hashed_passwordfake_password# 实际开发中需加密)# 2. 将对象添加到数据库会话同步操作不需要 awaitdb.add(new_user)# 3. 提交事务异步需要 awaitawaitdb.commit()# 4. 刷新对象获取数据库生成的字段如自增 ID异步需要 awaitawaitdb.refresh(new_user)returnnew_user如果需要提前拿到数据库生成的id等字段才需要refresh。而且如果使用注入依赖的方法且依赖会自动提交事务就需要flush才能返回又自动生成的字段的对象。像是下面这样写router.post(/users)asyncdefcreate_user(user_in:UserCreate,db:AsyncSessionDepends(get_db)):new_userUser(name...,email...,hashed_password...)db.add(new_user)awaitdb.flush()# ① 发送 INSERTid 被数据库生成awaitdb.refresh(new_user)# ② 从数据库重新读取对象id 被填充returnnew_user# 此时 new_user.id 有值2.更新数据 (Update)更新通常有两种方式先查询后修改最常用或 直接批量更新。方式 A先查询再修改属性推荐用于单个对象fromsqlalchemyimportselectfromsqlalchemy.ext.asyncioimportAsyncSessionasyncdefupdate_user_email(db:AsyncSession,user_id:int,new_email:str):stmtselect(User).where(User.iduser_id)# 2. 执行查询获取结果resultawaitdb.execute(stmt)db_userresult.scalar_one_or_none()# 返回单个对象或 None# 3. 如果存在修改属性ifdb_user:db_user.emailnew_email# 4. 提交事务自动检测变更并生成 UPDATEawaitdb.commit()# 5. 刷新对象如有必要例如获取数据库默认值awaitdb.refresh(db_user)returndb_user方式 B使用 update() 语句批量或高性能语法如下stmtupdate(表模型).where(条件).values(要更新的字段新值)fromsqlalchemyimportupdatefromsqlalchemy.ext.asyncioimportAsyncSessionasyncdefupdate_multiple_users(db:AsyncSession):stmt(update(User).where(User.name张三).values(name老张))awaitdb.execute(stmt)# 异步执行更新语句awaitdb.commit()# 异步提交事务3.删除数据删除同样分为“查询后删除”和“批量删除”。方式 A先查询后删除fromsqlalchemyimportselectfromsqlalchemy.ext.asyncioimportAsyncSessionasyncdefdelete_user(db:AsyncSession,user_id:int)-bool:# 1. 构建查询语句查找用户stmtselect(User).where(User.iduser_id)resultawaitdb.execute(stmt)db_userresult.scalar_one_or_none()# 获取单个对象或 None# 2. 如果存在删除ifdb_user:awaitdb.delete(db_user)# 异步删除标记删除awaitdb.commit()# 提交事务returnTruereturnFalse方式 B使用 delete() 语句fromsqlalchemyimportdeletefromsqlalchemy.ext.asyncioimportAsyncSessionasyncdefbulk_delete_users(db:AsyncSession)-int:stmtdelete(User).where(User.id100)resultawaitdb.execute(stmt)# 异步执行awaitdb.commit()# 异步提交returnresult.rowcount# 可选返回被删除的行数总结 FastAPI ORM 操作速查表操作类型核心函数/方法关键返回处理注意点查询对象select(Model).scalars().all() / .first()第一个参数决定了scalars()拿到的类型条件过滤.where()支持和 符号聚合统计func.count() / avg().mappings().all()必须配合.label()否则 JSON 键名会乱码分页.limit().offset()配合.order_by()没有排序的分页是“逻辑炸弹”结果会随机跳变新增/删除db.add()/db.delete()await db.commit()不 commit 不生效想拿新 ID 就refresh更新obj.attr valueawait db.commit()推荐“先查后改”这样能触发 SQLAlchemy 的属性监听