Functions to simplify day to day BI work.


License
MIT
Install
pip install bi_powerhouse==0.5.1

Documentation

bi_powerhouse

Examples

Utilities

get_secret

##########################################
# get secret from AWS credential manager

from bi_powerhouse.utilities.get_secret import get_secret as get_secret

secret = get_secret('secret_name',
                    aws_access_key_id='access_key',
                    aws_secret_access_key='secret_access_key')

run_query

##########################################
# run a query from a specified sql file on a RDS instance
# the query is a simple SELECT * FROM authors limit 10
# where the authors table is just dummy data

from bi_powerhouse.utilities.run_query import run_query as run_query

##########################################
# first return as dictionary

results = run_query(secret['engine'], 'mysql_test.sql', secret['host'], secret['port'], secret['username'],
                    secret['password'], secret['dbname'], return_as='dict')
                    
>>> results
{'data': [{'id': '1', 'first_name': 'Electa', 'last_name': 'Farrell', 'email': 'tremblay.kallie@example.org', 'birthdate': '1977-11-22', 'added': '1979-01-19 12:34:41'}, {'id': '2', 'first_name': 'Baylee', 'last_name': 'Becker', 'email': 'paucek.mia@example.org', 'birthdate': '1977-10-03', 'added': '1980-11-05 05:29:39'}, {'id': '3', 'first_name': 'Katharina', 'last_name': 'Morissette', 'email': 'okunde@example.org', 'birthdate': '1995-01-02', 'added': '1972-02-27 04:32:07'}, {'id': '4', 'first_name': 'Ebba', 'last_name': 'Metz', 'email': 'joyce.koch@example.net', 'birthdate': '2004-12-29', 'added': '2014-08-31 10:27:43'}, {'id': '5', 'first_name': 'Giovanny', 'last_name': 'Ward', 'email': 'eheller@example.com', 'birthdate': '1975-12-15', 'added': '2010-05-18 18:20:26'}, {'id': '6', 'first_name': 'Lula', 'last_name': 'Volkman', 'email': 'dibbert.layne@example.net', 'birthdate': '1996-08-14', 'added': '2013-09-23 09:09:42'}, {'id': '7', 'first_name': 'Lue', 'last_name': 'Koss', 'email': 'druecker@example.com', 'birthdate': '1994-05-10', 'added': '1971-01-17 15:07:11'}, {'id': '8', 'first_name': 'Gregory', 'last_name': 'Hirthe', 'email': 'veum.merl@example.org', 'birthdate': '1976-10-10', 'added': '2003-06-03 15:20:54'}, {'id': '9', 'first_name': 'Norene', 'last_name': 'Jerde', 'email': 'trinity17@example.net', 'birthdate': '1981-05-09', 'added': '2010-10-24 04:18:08'}, {'id': '10', 'first_name': 'Bradford', 'last_name': 'Jacobson', 'email': 'horace23@example.net', 'birthdate': '2010-02-03', 'added': '1993-07-30 07:29:58'}]}

##########################################              
# then return as a nested list

results = run_query(secret['engine'], 'mysql_test.sql', secret['host'], secret['port'], secret['username'],
                    secret['password'], secret['dbname'], return_as='nested_list')

>>> results
[['id', 'first_name', 'last_name', 'email', 'birthdate', 'added'], ['1', 'Electa', 'Farrell', 'tremblay.kallie@example.org', '1977-11-22', '1979-01-19 12:34:41'], ['2', 'Baylee', 'Becker', 'paucek.mia@example.org', '1977-10-03', '1980-11-05 05:29:39'], ['3', 'Katharina', 'Morissette', 'okunde@example.org', '1995-01-02', '1972-02-27 04:32:07'], ['4', 'Ebba', 'Metz', 'joyce.koch@example.net', '2004-12-29', '2014-08-31 10:27:43'], ['5', 'Giovanny', 'Ward', 'eheller@example.com', '1975-12-15', '2010-05-18 18:20:26'], ['6', 'Lula', 'Volkman', 'dibbert.layne@example.net', '1996-08-14', '2013-09-23 09:09:42'], ['7', 'Lue', 'Koss', 'druecker@example.com', '1994-05-10', '1971-01-17 15:07:11'], ['8', 'Gregory', 'Hirthe', 'veum.merl@example.org', '1976-10-10', '2003-06-03 15:20:54'], ['9', 'Norene', 'Jerde', 'trinity17@example.net', '1981-05-09', '2010-10-24 04:18:08'], ['10', 'Bradford', 'Jacobson', 'horace23@example.net', '2010-02-03', '1993-07-30 07:29:58']]

##########################################
# you can return as a nested list but without headers

results = run_query(secret['engine'], 'mysql_test.sql', secret['host'], secret['port'], secret['username'],
                    secret['password'], secret['dbname'], return_as='nested_list', include_headers=False)

>>> results
[['1', 'Electa', 'Farrell', 'tremblay.kallie@example.org', '1977-11-22', '1979-01-19 12:34:41'], ['2', 'Baylee', 'Becker', 'paucek.mia@example.org', '1977-10-03', '1980-11-05 05:29:39'], ['3', 'Katharina', 'Morissette', 'okunde@example.org', '1995-01-02', '1972-02-27 04:32:07'], ['4', 'Ebba', 'Metz', 'joyce.koch@example.net', '2004-12-29', '2014-08-31 10:27:43'], ['5', 'Giovanny', 'Ward', 'eheller@example.com', '1975-12-15', '2010-05-18 18:20:26'], ['6', 'Lula', 'Volkman', 'dibbert.layne@example.net', '1996-08-14', '2013-09-23 09:09:42'], ['7', 'Lue', 'Koss', 'druecker@example.com', '1994-05-10', '1971-01-17 15:07:11'], ['8', 'Gregory', 'Hirthe', 'veum.merl@example.org', '1976-10-10', '2003-06-03 15:20:54'], ['9', 'Norene', 'Jerde', 'trinity17@example.net', '1981-05-09', '2010-10-24 04:18:08'], ['10', 'Bradford', 'Jacobson', 'horace23@example.net', '2010-02-03', '1993-07-30 07:29:58']]

##########################################
# you can also have the results go straight into a csv file with or without headers

run_query(secret['engine'], 'mysql_test.sql', secret['host'], secret['port'], secret['username'],
          secret['password'], secret['dbname'], return_as='csv_file', file_name='test.csv')
          
run_query(secret['engine'], 'mysql_test.sql', secret['host'], secret['port'], secret['username'],
          secret['password'], secret['dbname'], return_as='csv_file', file_name='test.csv', 
          include_headers=False)

##########################################          
# ... you can also have that csv file go straight into S3

run_query(secret['engine'], 'mysql_test.sql', secret['host'], secret['port'], secret['username'],
          secret['password'], secret['dbname'], return_as='s3_csv_file', file_name='test.csv',
          aws_bucket_name='ksco92',
          aws_access_key_id='access_key',
          aws_secret_access_key='secret_access_key')

##########################################     
# ... or put it in a pandas dataframe

results = run_query(secret['engine'], 'mysql_test.sql', secret['host'], secret['port'], secret['username'],
                    secret['password'], secret['dbname'], return_as='pd_dataframe')
>>> results
   id first_name   last_name                        email   birthdate  \
0   1     Electa     Farrell  tremblay.kallie@example.org  1977-11-22   
1   2     Baylee      Becker       paucek.mia@example.org  1977-10-03   
2   3  Katharina  Morissette           okunde@example.org  1995-01-02   
3   4       Ebba        Metz       joyce.koch@example.net  2004-12-29   
4   5   Giovanny        Ward          eheller@example.com  1975-12-15   
5   6       Lula     Volkman    dibbert.layne@example.net  1996-08-14   
6   7        Lue        Koss         druecker@example.com  1994-05-10   
7   8    Gregory      Hirthe        veum.merl@example.org  1976-10-10   
8   9     Norene       Jerde        trinity17@example.net  1981-05-09   
9  10   Bradford    Jacobson         horace23@example.net  2010-02-03   

                 added  
0  1979-01-19 12:34:41  
1  1980-11-05 05:29:39  
2  1972-02-27 04:32:07  
3  2014-08-31 10:27:43  
4  2010-05-18 18:20:26  
5  2013-09-23 09:09:42  
6  1971-01-17 15:07:11  
7  2003-06-03 15:20:54  
8  2010-10-24 04:18:08  
9  1993-07-30 07:29:58  

sqs_send

##########################################
# after using run query with return_as='dict' you can use this to send to a SQS queue
# to send each row of the results as json message to an SQS queue

from bi_powerhouse.utilities.sqs_send import sqs_send as sqs_send
from bi_powerhouse.utilities.run_query import run_query as run_query
from bi_powerhouse.utilities.get_secret import get_secret as get_secret
import json

secret = get_secret('secret_name',
                    aws_access_key_id='access_key',
                    aws_secret_access_key='secret_access_key')

results = run_query(secret['engine'], 'mysql_test.sql', secret['host'], secret['port'], secret['username'],
                    secret['password'], secret['dbname'], return_as='dict')

sqs_url = 'sqs_url'

for message in results['data']:
    sqs_send(sqs_url, json.dumps(message),
             aws_access_key_id='access_key',
             aws_secret_access_key='secret_access_key')

sqs_receive

##########################################
# to pull json messages from an SQS and put them on a list

from bi_powerhouse.utilities.sqs_receive import sqs_receive as sqs_receive

sqs_url = 'sqs_url'

messages = sqs_receive(sqs_url, max_messages=10, 
                       aws_access_key_id='access_key', 
                       aws_secret_access_key='secret_access_key')

message_bodies = []
      
for message in messages['Messages']:
    message_bodies.append(message['Body'])

s3_upload

##########################################
# to upload a file to a S3 bucket

from bi_powerhouse.utilities.s3_upload import s3_upload as s3_upload

s3_upload('test.csv', 'bucket_name', '/sub/folder/in/bucket/', aws_access_key_id='access_key', 
          aws_secret_access_key='secret_access_key')

write_to_csv

##########################################
# after running run_query with return_as='nested_list'

from bi_powerhouse.utilities.write_to_csv import write_to_csv as write_to_csv

write_to_csv('test.csv', results)

About running on EC2s

If you want to run this code on AWS EC, you just need to ignore the aws_access_key_id and aws_secret_access_key kwargs. The attached IAM role needs to have full S3, SQS and Secret Manager permissions.

For example, if you want to use get_secret you'll just have to do this:

##########################################
# get secret from AWS credential manager when running on EC2

from bi_powerhouse.utilities.get_secret import get_secret as get_secret

secret = get_secret('secret_name')