跳转至

查询

Cherry 提供了非常丰富的查询方式以及两种查询style,能够满足大部分的查询需求。

get

根据查询条件,获取指定模型数据,如不存在则抛出异常

    user: User = await User.get(User.id == 1)
    user: User = await User.get(id=1)

get_or_none

根据查询条件,获取指定模型数据,如不存在则返回 None

    user_or_none: Optional[User] = await User.get_or_none(User.name == "user 2")
    user_or_none: Optional[User] = await User.get_or_none(name="user 2")

get_or_create

根据查询条件,获取指定模型数据,如不存在则使用查询条件和 default 中的值来创建它。

返回值类型为 Tuple[Model, bool],第一个值为模型实例,第二个值是 bool 类型,True 表示获取到了模型实例,False 表示创建了模型实例。

1
2
3
4
    user, is_get = await User.get_or_create(
        User.name == "user 3",
        defaults={"id": 3, "age": 15},
    )
1
2
3
4
    user, is_get = await User.get_or_create(
        name="user 3",
        defaults={"id": 3, "age": 15},
    )

filter

根据查询条件来进行进一步的复杂查询。

filter 实际上返回的是一个 QuerySet 对象,它可以继续做链式调用来进行更复杂的查询,最终通过调用 first, all, get, random_onepaginate 来返回结果。

1
2
3
4
5
    user1: Optional[User] = await User.filter(User.age <= 15).first()
    users: list[User] = await User.filter(User.age > 10).all()
    user2: User = await User.filter(User.name == "user 1").get()
    user3: Optional[User] = await User.filter(User.age >= 5).random_one()
    usersp: list[User] = await User.filter(User.age <= 5).paginate(page=1, page_size=3)
1
2
3
4
5
    user1: Optional[User] = await User.filter(age__le=15).first()
    users: list[User] = await User.filter(age__gt=10).all()
    user2: User = await User.filter(name="user 1").get()
    user3: Optional[User] = await User.filter(age__ge=5).random_one()
    usersp: list[User] = await User.filter(age__ge=5).paginate(page=1, page_size=3)

first

返回查询结果的第一个值,如无则返回 None

all

返回查询结果的所有值

get

返回查询结果的值,如果结果有多个,或者没有结果,均会抛出异常

random_one

返回查询结果的随机一个值,如无则返回 None

paginate

根据给定的页数和每页数量,返回查询结果的分页值列表

order_by

根据给定的字段对查询结果进行排序

    user1 = await User.filter(User.age <= 15).order_by(User.age).first()

limit

根据给定的值对查询结果的数量进行限制

    users = await User.filter(User.age > 15).limit(3).all()

offset

根据给定的值对查询结果进行偏移

    users = await User.filter(User.age > 15).offset(1).all()

values

以元组的形式返回模型部分字段。

values 接受多个位置参数,即要获取的模型字段。

values 还有一个关键字参数 flatten,默认为 False,当设置为 True 时,位置参数必须有且仅有一个,返回结果元组会被展平。

1
2
3
4
5
6
7
8
9
    user_name_and_age: Optional[tuple[str, int]] = (
        await User.filter().values(User.name, User.age).first()
    )
    user_name_list: list[str] = (
        await User.filter().values(User.name, flatten=True).all()
    )
    user_name: str = (
        await User.filter(User.age == 15).values(User.name, flatten=True).get()
    )

value_dict

以字典的形式返回模型的部分字段。

value_dict 接受多个位置参数,即要获取的模型字段,若不传入,则为模型全部非关系字段。

1
2
3
4
    user_dict: Optional[dict[str, Any]] = await User.filter().value_dict().first()
    user_name_and_age_dict: list[dict[str, Any]] = (
        await User.filter().value_dict(User.name, User.age).all()
    )

select

selectfilter 的无查询条件的版本,支持与 filter 一样的功能。

all

它是 Model.filter().all() 的简写,返回该模型所有模型数据。

    all_user: list[User] = await User.all()

完整代码

本章完整示例代码
from typing import Any, Optional

import cherry

db = cherry.Database("sqlite+aiosqlite:///:memory:")


class User(cherry.Model):
    id: int = cherry.Field(primary_key=True)
    name: str = cherry.Field(unique=True)
    age: int

    cherry_config = cherry.CherryConfig(tablename="user", database=db)


async def main():
    await db.init()

    users = [User(id=i, name=f"user {i}", age=i * 5) for i in range(1, 11)]
    await User.insert_many(*users)

    # Pythonic style
    user: User = await User.get(User.id == 1)
    # Django style
    user: User = await User.get(id=1)

    # Pythonic style
    user_or_none: Optional[User] = await User.get_or_none(User.name == "user 2")
    # Django style
    user_or_none: Optional[User] = await User.get_or_none(name="user 2")

    # Pythonic style
    user, is_get = await User.get_or_create(
        User.name == "user 3",
        defaults={"id": 3, "age": 15},
    )
    # Django style
    user, is_get = await User.get_or_create(
        name="user 3",
        defaults={"id": 3, "age": 15},
    )

    # Pythonic style
    user1: Optional[User] = await User.filter(User.age <= 15).first()
    users: list[User] = await User.filter(User.age > 10).all()
    user2: User = await User.filter(User.name == "user 1").get()
    user3: Optional[User] = await User.filter(User.age >= 5).random_one()
    usersp: list[User] = await User.filter(User.age <= 5).paginate(page=1, page_size=3)

    # Django style
    user1: Optional[User] = await User.filter(age__le=15).first()
    users: list[User] = await User.filter(age__gt=10).all()
    user2: User = await User.filter(name="user 1").get()
    user3: Optional[User] = await User.filter(age__ge=5).random_one()
    usersp: list[User] = await User.filter(age__ge=5).paginate(page=1, page_size=3)

    user1 = await User.filter(User.age <= 15).order_by(User.age).first()
    users = await User.filter(User.age > 15).limit(3).all()
    users = await User.filter(User.age > 15).offset(1).all()

    user_name_and_age: Optional[tuple[str, int]] = (
        await User.filter().values(User.name, User.age).first()
    )
    user_name_list: list[str] = (
        await User.filter().values(User.name, flatten=True).all()
    )
    user_name: str = (
        await User.filter(User.age == 15).values(User.name, flatten=True).get()
    )

    user_dict: Optional[dict[str, Any]] = await User.filter().value_dict().first()
    user_name_and_age_dict: list[dict[str, Any]] = (
        await User.filter().value_dict(User.name, User.age).all()
    )

    all_user: list[User] = await User.all()


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())