查询
Cherry
提供了非常丰富的查询方式以及两种查询style,能够满足大部分的查询需求。
get
根据查询条件,获取指定模型数据,如不存在则抛出异常
get_or_none
根据查询条件,获取指定模型数据,如不存在则返回 None
get_or_create
根据查询条件,获取指定模型数据,如不存在则使用查询条件和 default
中的值来创建它。
返回值类型为 Tuple[Model, bool]
,第一个值为模型实例,第二个值是 bool
类型,True
表示获取到了模型实例,False
表示创建了模型实例。
filter
根据查询条件来进行进一步的复杂查询。
filter
实际上返回的是一个 QuerySet
对象,它可以继续做链式调用来进行更复杂的查询,最终通过调用 first
, all
, get
, random_one
或 paginate
来返回结果。
first
返回查询结果的第一个值,如无则返回 None
all
返回查询结果的所有值
get
返回查询结果的值,如果结果有多个,或者没有结果,均会抛出异常
random_one
返回查询结果的随机一个值,如无则返回 None
paginate
根据给定的页数和每页数量,返回查询结果的分页值列表
order_by
根据给定的字段对查询结果进行排序
| user1 = await User.filter(User.age <= 15).order_by(User.age).first()
|
limit
根据给定的值对查询结果的数量进行限制
| users = await User.filter(User.age > 15).limit(3).all()
|
offset
根据给定的值对查询结果进行偏移
| users = await User.filter(User.age > 15).offset(1).all()
|
values
以元组的形式返回模型部分字段。
values
接受多个位置参数,即要获取的模型字段。
values
还有一个关键字参数 flatten
,默认为 False
,当设置为 True
时,位置参数必须有且仅有一个,返回结果元组会被展平。
| user_name_and_age: Optional[tuple[str, int]] = (
await User.filter().values(User.name, User.age).first()
)
user_name_list: list[str] = (
await User.filter().values(User.name, flatten=True).all()
)
user_name: str = (
await User.filter(User.age == 15).values(User.name, flatten=True).get()
)
|
value_dict
以字典的形式返回模型的部分字段。
value_dict
接受多个位置参数,即要获取的模型字段,若不传入,则为模型全部非关系字段。
| user_dict: Optional[dict[str, Any]] = await User.filter().value_dict().first()
user_name_and_age_dict: list[dict[str, Any]] = (
await User.filter().value_dict(User.name, User.age).all()
)
|
select
select
是 filter
的无查询条件的版本,支持与 filter
一样的功能。
all
它是 Model.filter().all()
的简写,返回该模型所有模型数据。
| all_user: list[User] = await User.all()
|
完整代码
本章完整示例代码
| from typing import Any, Optional
import cherry
db = cherry.Database("sqlite+aiosqlite:///:memory:")
class User(cherry.Model):
id: int = cherry.Field(primary_key=True)
name: str = cherry.Field(unique=True)
age: int
cherry_config = cherry.CherryConfig(tablename="user", database=db)
async def main():
await db.init()
users = [User(id=i, name=f"user {i}", age=i * 5) for i in range(1, 11)]
await User.insert_many(*users)
# Pythonic style
user: User = await User.get(User.id == 1)
# Django style
user: User = await User.get(id=1)
# Pythonic style
user_or_none: Optional[User] = await User.get_or_none(User.name == "user 2")
# Django style
user_or_none: Optional[User] = await User.get_or_none(name="user 2")
# Pythonic style
user, is_get = await User.get_or_create(
User.name == "user 3",
defaults={"id": 3, "age": 15},
)
# Django style
user, is_get = await User.get_or_create(
name="user 3",
defaults={"id": 3, "age": 15},
)
# Pythonic style
user1: Optional[User] = await User.filter(User.age <= 15).first()
users: list[User] = await User.filter(User.age > 10).all()
user2: User = await User.filter(User.name == "user 1").get()
user3: Optional[User] = await User.filter(User.age >= 5).random_one()
usersp: list[User] = await User.filter(User.age <= 5).paginate(page=1, page_size=3)
# Django style
user1: Optional[User] = await User.filter(age__le=15).first()
users: list[User] = await User.filter(age__gt=10).all()
user2: User = await User.filter(name="user 1").get()
user3: Optional[User] = await User.filter(age__ge=5).random_one()
usersp: list[User] = await User.filter(age__ge=5).paginate(page=1, page_size=3)
user1 = await User.filter(User.age <= 15).order_by(User.age).first()
users = await User.filter(User.age > 15).limit(3).all()
users = await User.filter(User.age > 15).offset(1).all()
user_name_and_age: Optional[tuple[str, int]] = (
await User.filter().values(User.name, User.age).first()
)
user_name_list: list[str] = (
await User.filter().values(User.name, flatten=True).all()
)
user_name: str = (
await User.filter(User.age == 15).values(User.name, flatten=True).get()
)
user_dict: Optional[dict[str, Any]] = await User.filter().value_dict().first()
user_name_and_age_dict: list[dict[str, Any]] = (
await User.filter().value_dict(User.name, User.age).all()
)
all_user: list[User] = await User.all()
if __name__ == "__main__":
import asyncio
asyncio.run(main())
|