Python Orator 查询构造器速查手记
orator 是轻量级的数据库查询构造器, 提供便利的接口可以创建和执行查询操作,可以在大多数数据库中使用.
安装
pip install orator
配置, 获取数据库管理器实例
from orator import DatabaseManager
config = {
'mysql': {
'driver': 'mysql',
'host': 'localhost',
'database': 'database',
'user': 'root',
'password': '',
'prefix': ''
}
}
db = DatabaseManager(config)
多重数据库设置
# 设置多个数据库
config = {
'default': 'mysql',
'mysql': {
'driver': 'mysql',
'host': 'localhost',
'database': 'database',
'user': 'root',
'password': '',
'prefix': ''
}
'sqlserver':{
'driver': 'sqlserver',
'host': 'localhost',
'database': 'database',
'user': 'root',
'password': '',
'prefix': ''
}
}
# 设置多个数据库分别做读、写
config = {
'mysql': {
'read': {
'host': '192.168.1.1'
},
'write': {
'host': '192.168.1.2'
},
'driver': 'mysql',
'database': 'database',
'username': 'root',
'password': '',
'prefix': ''
}
}
查询
conn = db.connection()
# 查询 table users 中所有数据
users = conn.table('users').get()
for user in users:
print(user['name'])
# 分片查询表中数据
users = conn.table('users').chunk(100)
# 查询某一条数据
user = conn.table('users').where('name', 'linyang').first()
# 查询某一行某一列的数据
user= conn.table('users').where('name', 'linyang').pluck('email')
# 查询某一列的数据
cols = conn.table('users').lists('name')
# select 语句
users = conn.table('users').select('name', 'email').get()
users = conn.table('users').distinct().get()
users = conn.table('users').select('name as names').get()
# 使用 where
users = conn.table('users').where('high', '>', 160).get()
# 使用or
users = conn.table('users').where('high', '>', 160).or_where('name', 'linyang').get()
#使用where between操作
users = conn.table('users').where_between('age', [25, 35]).get()
#使用where not between操作
users = conn.table('users').where_not_between('age', [25, 35]).get()
#使用where in操作
users = conn.table('users').where_in('id', [1, 2, 3]).get()
users = conn.table('users').where_not_in('id', [1, 2, 3]).get()
#使用where null查询Null记录
users = conn.table('users').where_null('updated_at').get()
#使用order by, group by and having操作
query = conn.table('users').order_by('name', 'desc')
query = query.group_by('count')
query = query.having('count', '>', 100)
users = query.get()
#使用offset and limit操作
# 从第11条开始取5条
users = conn.table('users').skip(10).take(5).get()
users = conn.table('users').offset(10).limit(5).get()
join 操作
#基本join操作
conn.table('users') \
.join('contacts', 'users.id', '=', 'contacts.user_id') \
.join('orders', 'users.id', '=', 'orders.user_id') \
.select('users.id', 'contacts.phone', 'orders.price') \
.get()
#左连接操作left join
conn.table('users').left_join('posts', 'users.id', '=', 'posts.user_id').get()
# 高级join
clause = JoinClause('contacts').on('users.id', '=', 'contacts.user_id').or_on(...)
conn.table('users').join(clause).get()
#可以加入where等条件
clause = JoinClause('contacts').on('users.id', '=', 'contacts.user_id').where('contacts.user_id', '>', 5)
db.table('users').join(clause).get()
高级 where
#参数分组
conn.table('users') \
.where('name', '=', 'John') \
.or_where(
conn.query().where('votes', '>', 100).where('title', '!=', 'admin')
).get()
#这个查询的sql将是这样。
SELECT * FROM users WHERE name = 'John' OR (votes > 100 AND title != 'Admin')
#存在查询
conn.table('users').where_exists(
conn.table('orders').select(conn.raw(1)).where_raw('order.user_id = users.id')
)
#这个查询的sql将是这样。
SELECT * FROM users
WHERE EXISTS (
SELECT 1 FROM orders WHERE orders.user_id = users.id
)
聚合查询
# count, max, min, avg, sum。
users = conn.table('users').count()
price = conn.table('orders').max('price')
price = conn.table('orders').min('price')
price = conn.table('orders').avg('price')
total = conn.table('users').sum('votes')
原生表达式
conn.table('users') \
.select(conn.raw('count(*) as user_count, status')) \
.where('status', '!=', 1) \
.group_by('status') \
.get()
insert
conn.table('users').insert(email='foo@bar.com', votes=0)
conn.table('users').insert({
'email': 'foo@bar.com',
'votes': 0
})
conn.table('users').insert([
{'email': 'foo@bar.com', 'votes': 0},
{'email': 'bar@baz.com', 'votes': 0}
])
update
#更新表中数据。
conn.table('users').where('id', 1).update(votes=1)
conn.table('users').where('id', 1).update({'votes': 1})
#增加或减少某一列的值。
conn.table('users').increment('votes') # Increment the value by 1
conn.table('users').increment('votes', 5) # Increment the value by 5
conn.table('users').decrement('votes') # Decrement the value by 1
conn.table('users').decrement('votes', 5)
# 指定增加某一行记录的值
conn.table('users').increment('votes', 1, name='John')
delete
#删除数据。
conn.table('users').where('age', '<', 25).delete()
#删除所有的数据。
conn.table('users').delete()
#清空表数据
conn.table('users').truncate()
unions
first = conn.table('users').where_null('first_name')
users = conn.table('users').where_null('last_name').union(first).get()
悲观锁
conn.table('users').where('votes', '>', 100).shared_lock().get()
conn.table('users').where('votes', '>', 100).lock_for_update().get()