flask-gcp-pubsub

Google Pub/Sub as task manager, like Celery can do


Keywords
cloud, pubsub, tasks, flask
License
GPL-3.0
Install
pip install flask-gcp-pubsub==0.2.11

Documentation

Flask GCP Pub/Sub

Lite distributed task queue using Google Cloud Platform (GCP) Pub/Sub

PyPI version PyPI downloads GNU GPLv3

🤔 What does this package does?

As Celery, but in a lighter version, this package allows you to run operations asynchronously in your Flask project, but without the choice of the broker: it only uses GCP Pub/Sub.

Technically, this package can run without Flask, but, historically, it comes to have a quick-win for migrating to GCP Cloud Run using the Pub/Sub system, from an existing project using Flask + Celery.

This package aims to remove some painful tasks by:

  • Creating one dedicated topic for each function
  • Creating one dedicated reusable subscription for each function

We do not recommand this package for the following cases:

  • You need to reuse your development in a multi-cloud context
  • You have high volume of messages to process (not tested)

This package is given "as it", without garantees, under the GPLv3 License.

🚀 Getting started

Prerequisites

Installation

pip install flask-gcp-pubsub

Full example

demo.py

#!/usr/bin/env python
# coding: utf-8

from flask import Flask, make_response
from flask_gcp_pubsub import PubSub

app = Flask(__name__)
pubsub = PubSub(
    app,
    project_id='<project_id>',
    gcp_credentials_file='./creds.json'
)

@pubsub.task
def my_task(msg1, msg2):
    """Awesome delayed execution"""
    print('test', msg1, msg2)
    return 'ok'

@app.route('/test')
def route_test():
    """Launch delayed execution"""
    my_task.delay('test1', 'test2')
    return make_response('ok', 200)

WARNING: do not forget to replace <project_id> with you GCP project ID (not number) and to downloed the JSON-formatted key from GCP Console.

wsgi.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Imports
from demo import app

# Start
if __name__ == '__main__':
    app.run()

wsgi_delayed.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Imports
from demo import pubsub

# Start
if __name__ == '__main__':
    pubsub.run()

This command will launch the Flask server:

flask run --port 9090

This command will launch the asynchronous tasks manager:

python wsgi_delayed.py

You can now navigate to http://localhost:9090/test And if everything goes OK, you just have to check the content of the output in console, which should look something like that:

Start consumers
status=received message_id=6860318059876990 function=my_task
test test1 test2
status=processed message_id=6860318059876990 function=my_task result=ok execution_time=6.818771362304688e-05

Bucket notification

You can also create a task based on GCP Storage, by receiving a notification on any supported event from a bucket.

@pubsub.bucket('bucket-flask-gcp', events=['OBJECT_FINALIZE'])
def my_bucket_notifications_create(*args, **kwargs):
    print('FINALIZE', args, kwargs)


@pubsub.bucket('bucket-flask-gcp', events=['OBJECT_DELETE'])
def my_bucket_notifications_delete(*args, **kwargs):
    print('DELETE', args, kwargs)

For the specific Storage product, Google create a specific Service Account for specific actions, that you cannot choose. You can found it here.

You have to add the Pub/Sub Admin role for that particular Service Account in IAM.

The kwargs returns all attributes of the Pub/Sub notification.

If you change the function name, the auto-clean included at start-up cannot works. As you cannot excess 10 events per bucket, do not forget to clean previous subscription with commands:

gcloud storage buckets notifications list gs://<bucket_name>
gcloud storage buckets notifications delete gs://<bucket_name>

Configuration

Configuration can be done using keyword arguments in class instantiation and/or flask environment variable (set with config.update). If both method used for one configuration key, the class instanciation is primary.

Flask env variable Keyword argument Usage How-to get?
PUBSUB_PROJECT_ID project_id GCP project ID See console.cloud.google.com
PUBSUB_CREDENTIALS_JSON gcp_credentials_json Service account credentials, as JSON string format See IAM in console.cloud.google.com
PUBSUB_CREDENTIALS_FILE gcp_credentials_file Servicce account credentials, as JSON local file See IAM in console.cloud.google.com
PUBSUB_CONCURRENT_CONSUMERS concurrent_consumers Number of simultaneous consumer (default: 4)
PUBSUB_CONCURRENT_MESSAGES concurrent_messages Number of messages pull from topic per consumer per call (default: 2)
PUBSUB_TOPIC_PREFIX topic_prefix Prefix for all topic used in the instance, useful for feature branches using same project.
PUBSUB_AUTO_SETUP auto_setup Enable the auto-setup for Topics creation and Bucket notifications to Pub/Sub (default: false)
PUBSUB_DEADLINE deadline Set deadline retry for all Pub/Sub operations (default: 300)
PUBSUB_PULL_RETURN_IMMEDIATELY return_immediately Enable return immediately flag for pulling (default: false)

🔮 Roadmap

  • Priority in the treatment of messages per functions
  • Logging instead of print (+ option to format as JSON)
  • Contributing manual
  • Documentation about Flask configuration keys and their counterpart on PubSub direct call

TO BE CONFIRMED

  • Region selection (default: all regions) - can be edited in Storage Rules of Topic for the moment