psycop3 version of https://github.com/gonzalo123/dbutils
I normally need to perform raw queries when I'm working with Python. Even when I'm using Django and it's ORM I need to execute sql against the database. As I use (almost always) PostgreSQL, I use (as we all do) psycopg2. Psycopg2 is very complete and easy to use but sometimes I need a few helpers to make my usual tasks easier. I don't want to create a complex library on top of psycopg2, only a helper.
Something that I miss from PHP and DBAL library is the way that DBAL helps me to create simple CRUD operations. The idea of this helper functions is to do something similar. Let's start
I've created procedural functions to do all and also a Db class.
Normally all functions needs the cursor parameter.
def fetch_all(cursor, sql, params=None):
cursor.execute(
query=sql,
vars={} if params is None else params)
return cursor.fetchall()
The Db class accepts cursor in the constructor
db = Db(cursor=cursor)
data = db.fetch_all("SELECT * FROM table")
Nothing especial in the connection. I like to use "connection_factory=RealDictConnection" to allow me to access to the recordset as an dictionary. I've created simple factory helper to create connections:
def get_conn(dsn, named=False, autocommit=False):
conn = psycopg2.connect(
dsn=dsn,
connection_factory=RealDictConnection if named else None,
)
conn.autocommit = autocommit
return conn
Sometimes I use RealDictCursor in the cursor instead connection, so I've created a simple helper
def get_cursor(conn, named=True):
return conn.cursor(cursor_factory=RealDictCursor if named else None)
db = Db(cursor)
for reg in db.fetch_all(sql="SELECT email, name from users"):
assert 'user1' == reg['name']
assert 'user1@email.com' == reg['email']
data == db.fetch_one(sql=SQL)
data = db.select(
table='users',
where={'email': 'user1@email.com'}
)
This helper only allows me to perform simple where statements (joined with AND). If I need one complex Where, then I use fetch
Insert one row
db.insert(
table='users',
values={'email': 'user1@email.com', 'name': 'user1'})
Or insert multiple rows (using spycopg2's executemany)
db.insert(
table='users',
values=[
{'email': 'user2@email.com', 'name': 'user2'},
{'email': 'user3@email.com', 'name': 'user3'}
])
We also can use insert_batch to insert all rows within one command
db.insert_batch(
table='users',
values=[
{'email': 'user2@email.com', 'name': 'user2'},
{'email': 'user3@email.com', 'name': 'user3'}
])
db.update(
table='users',
data={'name': 'xxxx'},
identifier={'email': 'user1@email.com'},
)
db.delete(table='users', where={'email': 'user1@email.com'})
Sometimes we need to insert one row or update the row if the primary key already exists. We can do that with two statements: One select and if there isn't any result one insert. Else on update. We can do that with one sql statement. I've created a helper to do that:
Now we can execute an upsert in a simple way:
db.upsert(
table='users',
data={'name': 'yyyy'},
identifier={'email': 'user1@email.com'})
data = db.sp_fetch_one(
function='hello',
params={'name': 'Gonzalo'})
data = db.sp_fetch_all(
function='hello',
params={'name': 'Gonzalo'})
In PHP and DBAL to create one transaction we can use this syntax
<?php
$conn->transactional(function($conn) {
// do stuff
});
In Python we can do the same with context management, but I prefer to use this syntax:
with transactional(conn) as db:
assert 1 == db.insert(
table='users',
values={'email': 'user1@email.com', 'name': 'user1'})
The transactional function is like that (I've created two functions. One with raw cursor and another one with my Db class):
def _get_transactional(conn, named, callback):
try:
with conn as connection:
with get_cursor(conn=connection, named=named) as cursor:
yield callback(cursor)
conn.commit()
except Exception as e:
conn.rollback()
raise e
@contextmanager
def transactional(conn, named=False):
return _get_transactional(conn, named, lambda cursor: Db(cursor))
@contextmanager
def transactional_cursor(conn, named=False):
return _get_transactional(conn, named, lambda cursor: cursor)
Select and fetch helpers returns the recordset, but insert, update, delete and upsert returns the number of affected rows. For example that's the insert function:
def _get_insert_sql(table, values):
keys = values[0].keys() if type(values) is list else values.keys()
raw_sql = "insert into {tbl} ({t_fields}) values ({t_values})"
return psycopg_sql.SQL(raw_sql).format(
tbl=psycopg_sql.Identifier(table),
t_fields=psycopg_sql.SQL(', ').join(map(psycopg_sql.Identifier, keys)),
t_values=psycopg_sql.SQL(', ').join(map(psycopg_sql.Placeholder, keys)))
def insert(cursor, table, values):
sql = _get_insert_sql(table=table, values=values)
cursor.executemany(query=sql, vars=values) if type(values) is list else cursor.execute(sql, values)
return cursor.rowcount
You can try the library running the unit tests. I've also provide one docker-compose.yml file to set up a PostgreSQL database to run the tests
version: '3.6'
services:
pg:
build:
context: .docker/pg
dockerfile: Dockerfile
ports:
- 5432:5432
environment:
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_DB: ${POSTGRES_DB}
PGDATA: /var/lib/postgresql/data/pgdata
You can install the library using pip
pip install dbutils3-gonzalo123