chain-pymysql

Easy to use PyMySQL.


Keywords
ChainPyMySql
License
MIT
Install
pip install chain-pymysql==1.0.1

Documentation

Chain-PyMySQL

Easy to use PyMySQL.

对 PyMySQL 进行封装,增加链式操作,方便快捷进行 CURD 操作

注:支持断线自动重连


文档目录


一、安装说明(INSTALLATION)

使用 PIP 安装 或 直接下载源码

  • 全自动安装:easy_install chain-pymysql 或者 pip install chain-pymysql / pip3 install chain-pymysql
  • 半自动安装:先下载 https://pypi.org/project/chain-pymysql/#files ,解压后运行 python setup.py install
  • 手动安装:将 chain-pymysql 目录放置于当前目录或者 site-packages 目录
  • 通过 from chain_pymysql import imysql 来引用

二、数据库连接(CONNECTIONS)

2.1 连接数据库

Method: imysql.connect(options: dict, name='default')

from chain_pymysql import imysql

imysql.connect({
    'host': '127.0.0.1',
    'user': 'root',
    'password': 'root',
    'database': 'test'
})

注: options 参数请参考 pymysql 官方文档: https://pymysql.readthedocs.io/en/latest/modules/connections.html

2.2 切换数据库连接

支持连接多个数据库,使用 switch 方法来动态(或永久)切换数据库连接

Method: imysql.switch(name: str, db_name=None, inplace=False)

switch 方法使用示例

  • 临时切换数据库连接 imysql.switch('db2')
  • 永久切换数据库连接:imysql.switch('db2', inplace=True)
  • 切换连接的时候同时切换数据库:imysql.switch('db2', 'test_member')imysql.switch('db2.test_member')
from chain_pymysql import imysql

imysql.connect(default_config, name='default') # 第一个添加的数据库为默认数据库
imysql.connect(other_config, name='other')

# 使用默认数据库连接查询
imysql.table('table1').limit(10).all()
# 临时使用其他数据库连接来查询(只有本次查询是使用db2,其他查询还是db1)
imysql.switch('other').table('table1').limit(10).all()
# 永久切换数据库连接
imysql.switch('other', inplace=True).table('table1').limit(10).all()
# 这时再查询,默认就是 db2 数据库连接了
imysql.table('table1').limit(10).all() # 这里查询的是 db2 连接里的 table1

2.3 使用同连接的其他数据库

Method: table(table: str, alias='')

table 按 db.table 格式传参即可

members = imysql.table('test_member.member').select('member_code').order_by('member_code desc').limit(10).column()
print(members)
orders = imysql.table('test_order.order').select('order_code, total_amount').where({
    'member_code': ('in', members, False)
}).order_by('order_code desc').all(fetch=True)
print(orders)

2.4 关闭数据库连接

Method: imysql.close(name=None)

Since: 1.0.4

关闭指定数据库连接:imysql.close(name='other')

关闭所有数据库连接:imysql.close()


三、增删改查(CURD)

3.1 增

# 插入一行
insert_id = imysql.table('table1').insert_one({'id': 1, 'name': '张三'})

# 插入多行
effected_rows = imysql.table('table1').insert_many([
    {'id': 2, 'name': '李四'},
    {'id': 3, 'name': '王五'},
    {'id': 4, 'name': '赵六'}
])

程序默认会做数据验证,如果表单数据需要提交代码,则可以把 verify 设置为 False,取消数据验证

insert_id = imysql.table('table1').insert_one({'name': '张三<script>alert(1)</script>'}, verify=False)

3.2 删

# 删除记录
effected_rows = imysql.table('table1').delete({'id': 1})

# 限制删除的行数
effected_rows = imysql.table('table1').delete('id>1', limit=1)

3.3 改

# 修改一行记录
effected_rows = imysql.table('table1').update_one({'id': 3}, {'name': '王六'})
# 修改多行记录
effected_rows = imysql.table('table1').update_many('id IN (3,4)', {'name': '匿名'})
effected_rows = imysql.table('table1').update_many({'id': ['IN', (3, 4)]}, {'name': '匿名'})

3.4 查

注:fetch=True 返回 list,fetch=False(默认)返回 cursor,可用于迭代

3.4.1 字符串条件
results = imysql.table('table1').where('id IN (3,4)').all(fetch=True)
3.4.2 字典条件查询
results = imysql.table('table1').where({'id': 3}).all(fetch=True)
3.4.3 IN 查询
results = imysql.table('table1').where({'id': ['in', (3, 4)]}).all(fetch=True)
3.4.4 LIKE 模糊查询
results = imysql.table('table1').where({'name': ('like', '张%')}).all(fetch=True)
3.4.5 BETWEEN 查询
results = imysql.table('table1').where({'id': ['between', (3, 4)]}).all(fetch=True)
3.4.6 追加查询 and_where
# 特殊情况下使用 and_where
results = imysql.table('table1').where({'id': 3}).and_where({'name': '张三'}).all(fetch=True)

# 一般情况下,扩充字典条件即可
condition = {'id': 3}
if True:
    condition['name'] = '张三'

results = imysql.table('table1').where(condition).all(fetch=True)
3.4.7 OR 查询 or_where
results = imysql.table('table1').where({'id': 3}).or_where({'id': 4}).all(fetch=True)
3.4.8 其他操作符
# 大于等于
results = imysql.table('table1').where({'id': ['>=', 3]}).all(fetch=True)
# 小于
results = imysql.table('table1').where({'id': ['<', 4]}).all(fetch=True)
# 不等于
results = imysql.table('table1').where({'id': ['<>', 3]}).all(fetch=True)
# 不为空
results = imysql.table('table1').where({'id': ['<>', '']}).all(fetch=True)
# NULL
results = imysql.table('table1').where({'name': ['is', None]}).all(fetch=True)
# NOT NULL
results = imysql.table('table1').where({'name': ['is not', None]}).all(fetch=True)
# NOT IN
results = imysql.table('table1').where({'id': ['not in', (3, 4)]}).all(fetch=True)
# NOT LIKE
results = imysql.table('table1').where({'name': ['not like', '张%']}).all(fetch=True)

四、查询构建器(QUERY BUILDER)

4.1 选择字段 select
one = imysql.table('table1').select('id').where('id=3').one()
4.2 联表查询 join
# 默认 LEFT JOIN
results = (
    imysql.table('table1', alias='t1')
    .select('t1.id,t1.name,t2.age')
    .join('table2', alias='t2', on='t1.id=t2.id')
    .all(fetch=True)
)

# RIGHT JOIN
results = (
    imysql.table('table1 t1')
    .select(['t1.id', 't1.name', 't2.age'])
    .join('table2 t2', on='t1.id=t2.id', how='right')
    .all(fetch=True)
)

# INNER JOIN
results = (
    imysql.table('table1 t1')
    .join('table2 t2', on='t1.id=t2.id', how='inner')
    .all(fetch=True)
)

# ON 也可以使用字典,并且支持多条件
results = (
    imysql.table('table1 t1')
    .select('t1.id,t1.name,t2.age')
    .join('table2 t2', how='inner', on={
        # 第三个参数是不要使用引号的意思
        't1.id': ['=', 't2.id', False],
        't2.age': 20,
    })
    .all(fetch=True)
)
4.3 分组及排序 group_by order_by
results = (
    imysql.table('table2')
    .select('age, count(*) as num')

    # 可以使用字符串或list
    .group_by('age')
    # .group_by(['age'])

    # 可以使用字符串或list
    # .order_by('age asc, num desc')
    # .order_by(['age asc', 'num desc'])
    # .order_by(['age', 'num'], ascending=True)
    .order_by(['age', 'num'], ascending=[True, False])

    .all(fetch=True)
)
4.4 结果筛选 having
results = (
    imysql.table('table2')
    .select('age, count(*) as num')
    .group_by('age')
    .having('num > 1')
    .order_by('num desc')
    .all(fetch=True)
)
4.5 分页查询 skip limit

results = imysql.table('table1').order_by('id asc').skip(1).limit(3).all(fetch=True)


五、执行原生SQL(RAW SQL)

注:imysql.execute 返回原生 pymysql.cursors.DictCursor 对象,后继操作须自己处理

5.1 执行原生SQL示例
sql = 'SELECT * FROM table1 WHERE id=%s'
# 解析多行
results = imysql.execute(sql, (3,)).all(fetch=True)
# 解析单行
one = imysql.execute(sql, (3,)).one()
# 其他操作请看 “六、返回值(RETURNED VALUE)”
names = imysql.execute(sql, (3,)).index('id', 'name')

# 切换数据库来执行SQL
imysql.switch('other').execute(sql)
5.2 使用助手函数来拼接SQL(防注入)

gen_condition、gen_order_by、gen_limit 等

condition = dict()
for k, v in request.GET.items():
    if k in ['id', 'name']:
        condition[f't1.{k}'] = v
    elif k in ['age']:
        condition[f't2.{k}'] = v

where = imysql.gen_condition(condition)
order_by = imysql.gen_order_by(request.GET.get('order'), request.GET.get('asc') == '1')
page = int(request.GET.get('page', 1))
size = int(request.GET.get('size', 10))
limit = imysql.gen_limit(skip=(page-1)*size, limit=size)

sql = f'SELECT t1.`name`, avg(t2.age) AS age FROM table1 t1 LEFT JOIN table2 t2 ON t1.id = t2.id WHERE {where} GROUP BY t1.`name` {order_by}{limit}'
results = imysql.execute(sql, fetch=True)
for item in results:
    print(item)
5.3 跨库(跨实例、跨连接)联表查询

Method: imysql.execute_cross(sql: str, chunk_size=500)

Since: 1.0.4

注:水平有限,不保证数据100%正确;此功能可用于简单跨库查询数据报表的查询及导出,省去手工拼接数据的麻烦。

使用说明:

  1. 表名必须带连接名前缀,例如:prod1.prod_order.dd_order
  2. 表名必须定义别名,例如:prod1.prod_order.dd_order t1
  3. 建议只使用主库字段做为查询条件,使用跨实例的关联表字段做条件时,查询结果可能会出现意外错误
  4. 查询条件不支持 BETWEEN ... AND ...,请使用 >= 和 < 代替
  5. 支持 LEFT JOIN、RIGHT JOIN 及 INNER JOIN;仅支持主库字段排序,支持 LIMIT

使用示例:

imysql.connect({...}, name='prod1')
imysql.connect({...}, name='prod2')

sql = '''
SELECT
    t1.`order_code`,
    t2.`out_code`
FROM
    prod1.`prod_order`.`dd_order` t1
LEFT JOIN prod2.`prod_logistics`.`wl_storageout` t2 ON t1.`order_code` = t2.`order_code`
WHERE
    t1.`create_time` >= '2022-10-01 00:00:00'
AND t1.`create_time` <= '2022-10-31 23:59:59'
ORDER BY
    t1.`create_time` ASC
'''

results = imysql.execute_cross(sql, chunk_size=500)
for chunk in results:
    for item in chunk:
        print(item)

结合 Flask 导出 csv (部分代码):

from flask import Response

def iter_csv(self, results):
    flag = False
    for chunk in results:
        for item in chunk:
            if flag is False:
                yield ','.join(item.keys()) + '\n'
                flag = True
            yield ','.join(f'"{x}"' if type(x) is str and x.find('\n')>-1 else f'{x}' for x in item.values()) + '\n'

def export(self):
    ...
    results = imysql.execute_cross(sql, chunk_size=500)

    filename = 'Export_%s.csv' % time.strftime('%Y%m%d%H%M%S')

    csv_data = self.iter_csv(results)
    response = Response(csv_data, mimetype='text/csv')
    response.headers['Content-Type'] = 'application/vnd.ms-excel; charset=gb18030'
    response.headers['Content-Disposition'] = f'attachment; filename={filename}'
    return response

六、返回值(RETURNED VALUE)

6.1 统计 count
count = imysql.table('table1').count()
6.2 多行 all
results = imysql.table('table1').limit(10).all(fetch=True)
6.3 单行 one
one = imysql.table('table1').where('id=3').one()
6.4 单个值 scalar
name = imysql.table('table1').select('name').where('id=3').scalar()
6.5 一列 column
column = imysql.table('table1').select('name').limit(3).column()
6.6 结果索引(一维)
result = imysql.table('table1').limit(10).index(key='id', value='name')
6.7 结果索引(二维)
result = imysql.table('table1').limit(10).index(key='id')
6.8 获取SQL
# 方法一:执行前获取
sql = imysql.table('table1').select('id,name').limit(2).get_raw_sql()

# 方法二:执行后获取
sql = imysql.get_last_sql()

注:多线程高并发情况下,方法二可能不太准确

6.9 获取插入的ID
# 方法一:insert_one 方法返回值
id1 = imysql.table('table1').insert_one({'name': '周八'})
# 方法二:get_insert_id
id2 = imysql.get_insert_id()

注:多线程高并发情况下,方法二可能不太准确

6.10 获取影响的行数
# 方法一:函数返回值
result = imysql.table('table1').insert_many([
    {'name': '吴九'},
    {'name': '郑十'},
])
# 方法二:get_effected_rows
effected_rows = imysql.get_effected_rows()

注1:insert_many、update_one、update_many、delete 等函数返回的都是影响行数 注2:多线程高并发情况下,方法二可能不太准确


七、事务支持(TRANSACTION)

  • insert_one、insert_many、update_one、update_many、delete 等操作默认会自动提交事务
  • 如果希望多个操作完成之后再提交事务,上下文管理器 或 装饰器
  • 推荐使用上下文管理器(更灵活,可切换数据库连接),装饰器只才使用默认数据库连接

7.1 上下文管理器
from chain_pymysql import imysql, transaction

with transaction.atomic():
    imysql.table('table1').delete({'name': '匿名'})
    # 下面这句会报错,事务回滚
    imysql.table('table1').insert_one({'name': '匿名', 'age': 18})
  • 捕获异常

    • 方法一:包裹整个上下文管理器,无需自己处理事务回滚
    from chain_pymysql import imysql, transaction
    
    try:
        with transaction.atomic():
            imysql.table('table1').delete({'name': '匿名'})
            # 下面这句会报错,事务回滚
            imysql.table('table1').insert_one({'name': '匿名', 'age': 18})
    except Exception as e:
        print(e)
    • 方法二:在上下文内部捕获异常,自己处理事务回滚
    from chain_pymysql import imysql, transaction
    
    with transaction.atomic() as atomic:
        imysql.table('table1').delete({'name': '匿名'})
        try:
            imysql.table('table1').insert_one({'name': '匿名', 'age': 18})
            # 不抛出异常的情况下,上下文管理器会自动提交事务,不用手动处理
            # atomic.commit()
        except Exception as e:
            # 捕获了异常,需要自己手动回滚事务
            atomic.rollback()
  • 切换数据库连接

other = imysql.switch('other')
with transaction.atomic(other.conn) as atomic:
    other.table('table1').insert_one({'name': '李四'})
    other.table('table1').insert_one({'name': '王五'})
    atomic.rollback()
  • 多个数据库链接

不同连接的事务不能放一个上下文管理器里,要分开写

```python
default = imysql.switch('default')
with transaction.atomic(default.conn):
    default.table('table1').insert_one({'name': '吴九'})
    default.table('table1').insert_one({'name': '郑十'})

other = imysql.switch('other')
with transaction.atomic(other.conn):
    other.table('table1').insert_one({'name': '李四'})
    other.table('table1').insert_one({'name': '王五'})
```
7.2 装饰器
@transaction.atomic
def some_operation():
    imysql.table('table1').delete({'name': '匿名'})
    # 下面这句会报错,事务回滚
    imysql.table('table1').insert_one({'name': '匿名', 'age': 18})

some_operation()
  • 捕获异常

    • 方法一:包裹被装饰的函数,无需自己处理事务回滚
    @transaction.atomic
    def some_operation():
        imysql.table('table1').delete({'name': '匿名'})
        # 下面这句会报错,事务回滚
        imysql.table('table1').insert_one({'name': '匿名', 'age': 18})
    
    try:
        some_operation()
    except Exception as e:
        print(e)
    • 方法二:在被装饰的函数内捕获异常,自己处理事务回滚
    @transaction.atomic
    def some_operation():
        imysql.table('table1').delete({'name': '匿名'})
        try:
            imysql.table('table1').insert_one({'name': '匿名', 'age': 18})
            # 不抛出异常的情况下,装饰器会自动提交事务,不用手动处理
            # imysql.default_conn.commit()
        except Exception as e:
            # 捕获了异常,需要自己手动回滚事务
            imysql.default_conn.rollback()
    
    some_operation()
  • 切换数据库连接

使用装饰器无法灵活切换数据库连接,如需切换数据库连接,需修改全局默认连接

```python
# 切换默认数据库连接为 other
imysql.switch('other', inplace=True)

@transaction.atomic
def some_operation():
    # 这里使用的是 other 连接里默认数据库的 table1 表
    imysql.table('table1').delete({'name': '匿名'})

some_operation()
```

八、单元测试(UNITTEST)

详情请看 chain_pymysql/test.py 文件

https://github.com/Tiacx/china-pymysql/blob/master/test.py


九、防注入(INJECTION)

经过 sqlmap 测试,安全防注入

注1:不保证所有情况下都防注入,建议接收用户提交的数据时做数据类型及格式的验证
注2:如需自己拼接sql,建议使用助手函数,详情请看“5.2 使用助手函数来拼接SQL(防注入)”


十、内置异常(EXCEPTIONS)

所有内置都提供两个一样的属性及方法

即:code、message,get_code(), get_message()


错误代码:

错误码 说明
400 参数错误
403 存在SQL注入

运行时异常:RuntimeError

import chain_pymysql.exceptions

try:
    imysql.table('table1').skip(0).limit(-1).all()
except chain_pymysql.exceptions.RuntimeError as e:
    assert e.code == 400
    print(e.message)

try:
    imysql.table('table1').where({
        'id': ['=', "3' UNION ALL SELECT NULL,NULL,NULL,NULL,NULL,NULL-- zxmL"]
    }).all(fetch=True)
except chain_pymysql.exceptions.RuntimeError as e:
    assert e.get_code() == 403
    print(e.get_message())

十一、请我喝奶茶(DONATION)

如果你觉得这个工具对你有帮助或启发,也可以请我喝☕️

支付宝