Chain-PyMySQL
Easy to use PyMySQL.
对 PyMySQL 进行封装,增加链式操作,方便快捷进行 CURD 操作
注:支持断线自动重连
文档目录
- 一、安装说明(INSTALLATION)
- 二、数据库连接(CONNECTIONS)
- 三、增删改查(CURD)
- 四、查询构建器(QUERY BUILDER)
- 五、执行原生SQL(RAW SQL)
- 六、返回值(RETURNED VALUE)
- 七、事务支持(TRANSACTION)
- 八、单元测试(UNITTEST)
- 九、防注入(INJECTION)
- 十、内置异常(EXCEPTIONS)
- 十一、请我喝奶茶(DONATION)
一、安装说明(INSTALLATION)
使用 PIP 安装 或 直接下载源码
- 全自动安装:
easy_install chain-pymysql
或者pip install chain-pymysql
/pip3 install chain-pymysql
- 半自动安装:先下载 https://pypi.org/project/chain-pymysql/#files ,解压后运行
python setup.py install
- 手动安装:将 chain-pymysql 目录放置于当前目录或者 site-packages 目录
- 通过
from chain_pymysql import imysql
来引用
二、数据库连接(CONNECTIONS)
2.1 连接数据库
Method: imysql.connect(options: dict, name='default')
from chain_pymysql import imysql
imysql.connect({
'host': '127.0.0.1',
'user': 'root',
'password': 'root',
'database': 'test'
})
注: options 参数请参考 pymysql 官方文档: https://pymysql.readthedocs.io/en/latest/modules/connections.html
2.2 切换数据库连接
支持连接多个数据库,使用 switch 方法来动态(或永久)切换数据库连接
Method: imysql.switch(name: str, db_name=None, inplace=False)
switch 方法使用示例
- 临时切换数据库连接
imysql.switch('db2')
- 永久切换数据库连接:
imysql.switch('db2', inplace=True)
- 切换连接的时候同时切换数据库:
imysql.switch('db2', 'test_member')
或imysql.switch('db2.test_member')
from chain_pymysql import imysql
imysql.connect(default_config, name='default') # 第一个添加的数据库为默认数据库
imysql.connect(other_config, name='other')
# 使用默认数据库连接查询
imysql.table('table1').limit(10).all()
# 临时使用其他数据库连接来查询(只有本次查询是使用db2,其他查询还是db1)
imysql.switch('other').table('table1').limit(10).all()
# 永久切换数据库连接
imysql.switch('other', inplace=True).table('table1').limit(10).all()
# 这时再查询,默认就是 db2 数据库连接了
imysql.table('table1').limit(10).all() # 这里查询的是 db2 连接里的 table1
2.3 使用同连接的其他数据库
Method: table(table: str, alias='')
table 按 db.table 格式传参即可
members = imysql.table('test_member.member').select('member_code').order_by('member_code desc').limit(10).column()
print(members)
orders = imysql.table('test_order.order').select('order_code, total_amount').where({
'member_code': ('in', members, False)
}).order_by('order_code desc').all(fetch=True)
print(orders)
2.4 关闭数据库连接
Method: imysql.close(name=None)
Since: 1.0.4
关闭指定数据库连接:imysql.close(name='other')
关闭所有数据库连接:imysql.close()
三、增删改查(CURD)
3.1 增
# 插入一行
insert_id = imysql.table('table1').insert_one({'id': 1, 'name': '张三'})
# 插入多行
effected_rows = imysql.table('table1').insert_many([
{'id': 2, 'name': '李四'},
{'id': 3, 'name': '王五'},
{'id': 4, 'name': '赵六'}
])
程序默认会做数据验证,如果表单数据需要提交代码,则可以把 verify 设置为 False,取消数据验证
insert_id = imysql.table('table1').insert_one({'name': '张三<script>alert(1)</script>'}, verify=False)
3.2 删
# 删除记录
effected_rows = imysql.table('table1').delete({'id': 1})
# 限制删除的行数
effected_rows = imysql.table('table1').delete('id>1', limit=1)
3.3 改
# 修改一行记录
effected_rows = imysql.table('table1').update_one({'id': 3}, {'name': '王六'})
# 修改多行记录
effected_rows = imysql.table('table1').update_many('id IN (3,4)', {'name': '匿名'})
effected_rows = imysql.table('table1').update_many({'id': ['IN', (3, 4)]}, {'name': '匿名'})
3.4 查
注:fetch=True 返回 list,fetch=False(默认)返回 cursor,可用于迭代
3.4.1 字符串条件
results = imysql.table('table1').where('id IN (3,4)').all(fetch=True)
3.4.2 字典条件查询
results = imysql.table('table1').where({'id': 3}).all(fetch=True)
3.4.3 IN 查询
results = imysql.table('table1').where({'id': ['in', (3, 4)]}).all(fetch=True)
3.4.4 LIKE 模糊查询
results = imysql.table('table1').where({'name': ('like', '张%')}).all(fetch=True)
3.4.5 BETWEEN 查询
results = imysql.table('table1').where({'id': ['between', (3, 4)]}).all(fetch=True)
3.4.6 追加查询 and_where
# 特殊情况下使用 and_where
results = imysql.table('table1').where({'id': 3}).and_where({'name': '张三'}).all(fetch=True)
# 一般情况下,扩充字典条件即可
condition = {'id': 3}
if True:
condition['name'] = '张三'
results = imysql.table('table1').where(condition).all(fetch=True)
3.4.7 OR 查询 or_where
results = imysql.table('table1').where({'id': 3}).or_where({'id': 4}).all(fetch=True)
3.4.8 其他操作符
# 大于等于
results = imysql.table('table1').where({'id': ['>=', 3]}).all(fetch=True)
# 小于
results = imysql.table('table1').where({'id': ['<', 4]}).all(fetch=True)
# 不等于
results = imysql.table('table1').where({'id': ['<>', 3]}).all(fetch=True)
# 不为空
results = imysql.table('table1').where({'id': ['<>', '']}).all(fetch=True)
# NULL
results = imysql.table('table1').where({'name': ['is', None]}).all(fetch=True)
# NOT NULL
results = imysql.table('table1').where({'name': ['is not', None]}).all(fetch=True)
# NOT IN
results = imysql.table('table1').where({'id': ['not in', (3, 4)]}).all(fetch=True)
# NOT LIKE
results = imysql.table('table1').where({'name': ['not like', '张%']}).all(fetch=True)
四、查询构建器(QUERY BUILDER)
4.1 选择字段 select
one = imysql.table('table1').select('id').where('id=3').one()
4.2 联表查询 join
# 默认 LEFT JOIN
results = (
imysql.table('table1', alias='t1')
.select('t1.id,t1.name,t2.age')
.join('table2', alias='t2', on='t1.id=t2.id')
.all(fetch=True)
)
# RIGHT JOIN
results = (
imysql.table('table1 t1')
.select(['t1.id', 't1.name', 't2.age'])
.join('table2 t2', on='t1.id=t2.id', how='right')
.all(fetch=True)
)
# INNER JOIN
results = (
imysql.table('table1 t1')
.join('table2 t2', on='t1.id=t2.id', how='inner')
.all(fetch=True)
)
# ON 也可以使用字典,并且支持多条件
results = (
imysql.table('table1 t1')
.select('t1.id,t1.name,t2.age')
.join('table2 t2', how='inner', on={
# 第三个参数是不要使用引号的意思
't1.id': ['=', 't2.id', False],
't2.age': 20,
})
.all(fetch=True)
)
4.3 分组及排序 group_by order_by
results = (
imysql.table('table2')
.select('age, count(*) as num')
# 可以使用字符串或list
.group_by('age')
# .group_by(['age'])
# 可以使用字符串或list
# .order_by('age asc, num desc')
# .order_by(['age asc', 'num desc'])
# .order_by(['age', 'num'], ascending=True)
.order_by(['age', 'num'], ascending=[True, False])
.all(fetch=True)
)
4.4 结果筛选 having
results = (
imysql.table('table2')
.select('age, count(*) as num')
.group_by('age')
.having('num > 1')
.order_by('num desc')
.all(fetch=True)
)
4.5 分页查询 skip limit
results = imysql.table('table1').order_by('id asc').skip(1).limit(3).all(fetch=True)
五、执行原生SQL(RAW SQL)
注:imysql.execute 返回原生 pymysql.cursors.DictCursor 对象,后继操作须自己处理
5.1 执行原生SQL示例
sql = 'SELECT * FROM table1 WHERE id=%s'
# 解析多行
results = imysql.execute(sql, (3,)).all(fetch=True)
# 解析单行
one = imysql.execute(sql, (3,)).one()
# 其他操作请看 “六、返回值(RETURNED VALUE)”
names = imysql.execute(sql, (3,)).index('id', 'name')
# 切换数据库来执行SQL
imysql.switch('other').execute(sql)
5.2 使用助手函数来拼接SQL(防注入)
gen_condition、gen_order_by、gen_limit 等
condition = dict()
for k, v in request.GET.items():
if k in ['id', 'name']:
condition[f't1.{k}'] = v
elif k in ['age']:
condition[f't2.{k}'] = v
where = imysql.gen_condition(condition)
order_by = imysql.gen_order_by(request.GET.get('order'), request.GET.get('asc') == '1')
page = int(request.GET.get('page', 1))
size = int(request.GET.get('size', 10))
limit = imysql.gen_limit(skip=(page-1)*size, limit=size)
sql = f'SELECT t1.`name`, avg(t2.age) AS age FROM table1 t1 LEFT JOIN table2 t2 ON t1.id = t2.id WHERE {where} GROUP BY t1.`name` {order_by}{limit}'
results = imysql.execute(sql, fetch=True)
for item in results:
print(item)
5.3 跨库(跨实例、跨连接)联表查询
Method: imysql.execute_cross(sql: str, chunk_size=500)
Since: 1.0.4
注:水平有限,不保证数据100%正确;此功能可用于简单跨库查询数据报表的查询及导出,省去手工拼接数据的麻烦。
使用说明:
- 表名必须带连接名前缀,例如:prod1.
prod_order
.dd_order
-
表名必须定义别名,例如:prod1.
prod_order
.dd_order
t1 - 建议只使用主库字段做为查询条件,使用跨实例的关联表字段做条件时,查询结果可能会出现意外错误
- 查询条件不支持 BETWEEN ... AND ...,请使用 >= 和 < 代替
- 支持 LEFT JOIN、RIGHT JOIN 及 INNER JOIN;仅支持主库字段排序,支持 LIMIT
使用示例:
imysql.connect({...}, name='prod1')
imysql.connect({...}, name='prod2')
sql = '''
SELECT
t1.`order_code`,
t2.`out_code`
FROM
prod1.`prod_order`.`dd_order` t1
LEFT JOIN prod2.`prod_logistics`.`wl_storageout` t2 ON t1.`order_code` = t2.`order_code`
WHERE
t1.`create_time` >= '2022-10-01 00:00:00'
AND t1.`create_time` <= '2022-10-31 23:59:59'
ORDER BY
t1.`create_time` ASC
'''
results = imysql.execute_cross(sql, chunk_size=500)
for chunk in results:
for item in chunk:
print(item)
结合 Flask 导出 csv (部分代码):
from flask import Response
def iter_csv(self, results):
flag = False
for chunk in results:
for item in chunk:
if flag is False:
yield ','.join(item.keys()) + '\n'
flag = True
yield ','.join(f'"{x}"' if type(x) is str and x.find('\n')>-1 else f'{x}' for x in item.values()) + '\n'
def export(self):
...
results = imysql.execute_cross(sql, chunk_size=500)
filename = 'Export_%s.csv' % time.strftime('%Y%m%d%H%M%S')
csv_data = self.iter_csv(results)
response = Response(csv_data, mimetype='text/csv')
response.headers['Content-Type'] = 'application/vnd.ms-excel; charset=gb18030'
response.headers['Content-Disposition'] = f'attachment; filename={filename}'
return response
六、返回值(RETURNED VALUE)
6.1 统计 count
count = imysql.table('table1').count()
6.2 多行 all
results = imysql.table('table1').limit(10).all(fetch=True)
6.3 单行 one
one = imysql.table('table1').where('id=3').one()
6.4 单个值 scalar
name = imysql.table('table1').select('name').where('id=3').scalar()
6.5 一列 column
column = imysql.table('table1').select('name').limit(3).column()
6.6 结果索引(一维)
result = imysql.table('table1').limit(10).index(key='id', value='name')
6.7 结果索引(二维)
result = imysql.table('table1').limit(10).index(key='id')
6.8 获取SQL
# 方法一:执行前获取
sql = imysql.table('table1').select('id,name').limit(2).get_raw_sql()
# 方法二:执行后获取
sql = imysql.get_last_sql()
注:多线程高并发情况下,方法二可能不太准确
6.9 获取插入的ID
# 方法一:insert_one 方法返回值
id1 = imysql.table('table1').insert_one({'name': '周八'})
# 方法二:get_insert_id
id2 = imysql.get_insert_id()
注:多线程高并发情况下,方法二可能不太准确
6.10 获取影响的行数
# 方法一:函数返回值
result = imysql.table('table1').insert_many([
{'name': '吴九'},
{'name': '郑十'},
])
# 方法二:get_effected_rows
effected_rows = imysql.get_effected_rows()
注1:insert_many、update_one、update_many、delete 等函数返回的都是影响行数 注2:多线程高并发情况下,方法二可能不太准确
七、事务支持(TRANSACTION)
- insert_one、insert_many、update_one、update_many、delete 等操作默认会自动提交事务
- 如果希望多个操作完成之后再提交事务,上下文管理器 或 装饰器
- 推荐使用上下文管理器(更灵活,可切换数据库连接),装饰器只才使用默认数据库连接
7.1 上下文管理器
from chain_pymysql import imysql, transaction
with transaction.atomic():
imysql.table('table1').delete({'name': '匿名'})
# 下面这句会报错,事务回滚
imysql.table('table1').insert_one({'name': '匿名', 'age': 18})
-
捕获异常
- 方法一:包裹整个上下文管理器,无需自己处理事务回滚
from chain_pymysql import imysql, transaction try: with transaction.atomic(): imysql.table('table1').delete({'name': '匿名'}) # 下面这句会报错,事务回滚 imysql.table('table1').insert_one({'name': '匿名', 'age': 18}) except Exception as e: print(e)
- 方法二:在上下文内部捕获异常,自己处理事务回滚
from chain_pymysql import imysql, transaction with transaction.atomic() as atomic: imysql.table('table1').delete({'name': '匿名'}) try: imysql.table('table1').insert_one({'name': '匿名', 'age': 18}) # 不抛出异常的情况下,上下文管理器会自动提交事务,不用手动处理 # atomic.commit() except Exception as e: # 捕获了异常,需要自己手动回滚事务 atomic.rollback()
-
切换数据库连接
other = imysql.switch('other')
with transaction.atomic(other.conn) as atomic:
other.table('table1').insert_one({'name': '李四'})
other.table('table1').insert_one({'name': '王五'})
atomic.rollback()
- 多个数据库链接
不同连接的事务不能放一个上下文管理器里,要分开写
```python
default = imysql.switch('default')
with transaction.atomic(default.conn):
default.table('table1').insert_one({'name': '吴九'})
default.table('table1').insert_one({'name': '郑十'})
other = imysql.switch('other')
with transaction.atomic(other.conn):
other.table('table1').insert_one({'name': '李四'})
other.table('table1').insert_one({'name': '王五'})
```
7.2 装饰器
@transaction.atomic
def some_operation():
imysql.table('table1').delete({'name': '匿名'})
# 下面这句会报错,事务回滚
imysql.table('table1').insert_one({'name': '匿名', 'age': 18})
some_operation()
-
捕获异常
- 方法一:包裹被装饰的函数,无需自己处理事务回滚
@transaction.atomic def some_operation(): imysql.table('table1').delete({'name': '匿名'}) # 下面这句会报错,事务回滚 imysql.table('table1').insert_one({'name': '匿名', 'age': 18}) try: some_operation() except Exception as e: print(e)
- 方法二:在被装饰的函数内捕获异常,自己处理事务回滚
@transaction.atomic def some_operation(): imysql.table('table1').delete({'name': '匿名'}) try: imysql.table('table1').insert_one({'name': '匿名', 'age': 18}) # 不抛出异常的情况下,装饰器会自动提交事务,不用手动处理 # imysql.default_conn.commit() except Exception as e: # 捕获了异常,需要自己手动回滚事务 imysql.default_conn.rollback() some_operation()
-
切换数据库连接
使用装饰器无法灵活切换数据库连接,如需切换数据库连接,需修改全局默认连接
```python
# 切换默认数据库连接为 other
imysql.switch('other', inplace=True)
@transaction.atomic
def some_operation():
# 这里使用的是 other 连接里默认数据库的 table1 表
imysql.table('table1').delete({'name': '匿名'})
some_operation()
```
八、单元测试(UNITTEST)
详情请看 chain_pymysql/test.py 文件
九、防注入(INJECTION)
经过 sqlmap 测试,安全防注入
注1:不保证所有情况下都防注入,建议接收用户提交的数据时做数据类型及格式的验证
注2:如需自己拼接sql,建议使用助手函数,详情请看“5.2 使用助手函数来拼接SQL(防注入)”
十、内置异常(EXCEPTIONS)
所有内置都提供两个一样的属性及方法
即:code、message,get_code(), get_message()
错误代码:
错误码 | 说明 |
---|---|
400 | 参数错误 |
403 | 存在SQL注入 |
运行时异常:RuntimeError
import chain_pymysql.exceptions
try:
imysql.table('table1').skip(0).limit(-1).all()
except chain_pymysql.exceptions.RuntimeError as e:
assert e.code == 400
print(e.message)
try:
imysql.table('table1').where({
'id': ['=', "3' UNION ALL SELECT NULL,NULL,NULL,NULL,NULL,NULL-- zxmL"]
}).all(fetch=True)
except chain_pymysql.exceptions.RuntimeError as e:
assert e.get_code() == 403
print(e.get_message())
十一、请我喝奶茶(DONATION)
如果你觉得这个工具对你有帮助或启发,也可以请我喝
☕️