Python Orator 查询构造器速查手记

orator 是轻量级的数据库查询构造器, 提供便利的接口可以创建和执行查询操作,可以在大多数数据库中使用.

安装

pip install orator

配置, 获取数据库管理器实例

from orator import DatabaseManager

config = {
    'mysql': {
        'driver': 'mysql',
        'host': 'localhost',
        'database': 'database',
        'user': 'root',
        'password': '',
        'prefix': ''
    }
}

db = DatabaseManager(config)

多重数据库设置

# 设置多个数据库
config = {
    'default': 'mysql',
    'mysql': {
        'driver': 'mysql',
        'host': 'localhost',
        'database': 'database',
        'user': 'root',
        'password': '',
        'prefix': ''
    }
    'sqlserver':{
        'driver': 'sqlserver',
        'host': 'localhost',
        'database': 'database',
        'user': 'root',
        'password': '',
        'prefix': ''
    }
}
# 设置多个数据库分别做读、写
config = {
    'mysql': {
        'read': {
            'host': '192.168.1.1'
        },
        'write': {
            'host': '192.168.1.2'
        },
        'driver': 'mysql',
        'database': 'database',
        'username': 'root',
        'password': '',
        'prefix': ''
    }
}

查询

conn = db.connection()

# 查询 table users 中所有数据
users = conn.table('users').get()

for user in users:
    print(user['name'])

# 分片查询表中数据
users = conn.table('users').chunk(100)

# 查询某一条数据
user = conn.table('users').where('name', 'linyang').first()

# 查询某一行某一列的数据
user= conn.table('users').where('name', 'linyang').pluck('email')

# 查询某一列的数据
cols = conn.table('users').lists('name')

# select 语句
users = conn.table('users').select('name', 'email').get()

users = conn.table('users').distinct().get()

users = conn.table('users').select('name as names').get()

# 使用 where
users = conn.table('users').where('high', '>', 160).get()

# 使用or
users = conn.table('users').where('high', '>', 160).or_where('name', 'linyang').get() 

#使用where between操作
users = conn.table('users').where_between('age', [25, 35]).get()

#使用where not between操作
users = conn.table('users').where_not_between('age', [25, 35]).get()

#使用where in操作
users = conn.table('users').where_in('id', [1, 2, 3]).get()

users = conn.table('users').where_not_in('id', [1, 2, 3]).get()

#使用where null查询Null记录
users = conn.table('users').where_null('updated_at').get()

#使用order by, group by and having操作
query = conn.table('users').order_by('name', 'desc')
query = query.group_by('count')
query = query.having('count', '>', 100)

users = query.get()

#使用offset and limit操作
# 从第11条开始取5条
users = conn.table('users').skip(10).take(5).get()

users = conn.table('users').offset(10).limit(5).get()

join 操作


#基本join操作
conn.table('users') \
    .join('contacts', 'users.id', '=', 'contacts.user_id') \
    .join('orders', 'users.id', '=', 'orders.user_id') \
    .select('users.id', 'contacts.phone', 'orders.price') \
    .get()

#左连接操作left join
conn.table('users').left_join('posts', 'users.id', '=', 'posts.user_id').get()

# 高级join
clause = JoinClause('contacts').on('users.id', '=', 'contacts.user_id').or_on(...)

conn.table('users').join(clause).get()

#可以加入where等条件
clause = JoinClause('contacts').on('users.id', '=', 'contacts.user_id').where('contacts.user_id', '>', 5)

db.table('users').join(clause).get()

高级 where

#参数分组
conn.table('users') \
    .where('name', '=', 'John') \
    .or_where(
        conn.query().where('votes', '>', 100).where('title', '!=', 'admin')
    ).get()
#这个查询的sql将是这样。
SELECT * FROM users WHERE name = 'John' OR (votes > 100 AND title != 'Admin')

#存在查询
conn.table('users').where_exists(
    conn.table('orders').select(conn.raw(1)).where_raw('order.user_id = users.id')
)

#这个查询的sql将是这样。
SELECT * FROM users
WHERE EXISTS (
    SELECT 1 FROM orders WHERE orders.user_id = users.id
)


聚合查询

# count, max, min, avg, sum。

users = conn.table('users').count()

price = conn.table('orders').max('price')

price = conn.table('orders').min('price')

price = conn.table('orders').avg('price')

total = conn.table('users').sum('votes')

原生表达式

conn.table('users') \
    .select(conn.raw('count(*) as user_count, status')) \
    .where('status', '!=', 1) \
    .group_by('status') \
    .get()

insert

conn.table('users').insert(email='foo@bar.com', votes=0)

conn.table('users').insert({
    'email': 'foo@bar.com',
    'votes': 0
})

conn.table('users').insert([
    {'email': 'foo@bar.com', 'votes': 0},
    {'email': 'bar@baz.com', 'votes': 0}
])

update

#更新表中数据。

conn.table('users').where('id', 1).update(votes=1)

conn.table('users').where('id', 1).update({'votes': 1})

#增加或减少某一列的值。

conn.table('users').increment('votes')  # Increment the value by 1

conn.table('users').increment('votes', 5)  # Increment the value by 5

conn.table('users').decrement('votes')  # Decrement the value by 1

conn.table('users').decrement('votes', 5)

# 指定增加某一行记录的值
conn.table('users').increment('votes', 1, name='John')

delete

#删除数据。
conn.table('users').where('age', '<', 25).delete()

#删除所有的数据。
conn.table('users').delete()

#清空表数据
conn.table('users').truncate()

unions

first = conn.table('users').where_null('first_name')

users = conn.table('users').where_null('last_name').union(first).get()

悲观锁

conn.table('users').where('votes', '>', 100).shared_lock().get()

conn.table('users').where('votes', '>', 100).lock_for_update().get()

ref

orator