ctodd-python-lib-kafka

Python utilities used for interacting with Apache Kafka


Keywords
python, libraries, Kafka, consumer, producer
License
MIT
Install
pip install ctodd-python-lib-kafka==1.0.2

Documentation

Christopher H. Todd's Python Library For Interacting With Kafka

The ctodd-python-lib-kafka project is responsible for interacting with Apache Kafka. This includes producing and consuming records from topics, utilizing .avro format, and other tasks in creating event driven applications with Python.

Table of Contents

Dependencies

Python Packages

Libraries

kafka_consumer_helpers.py

This library is used to aid in creating kafka consumers.

Functions:

def get_kafka_consumer(
    kafka_brokers,
    consumer_group="default",
    timeout=6000,
    offset_start="latest",
    get_stats=True
):
    """
    Purpose:
        Get a Kafka Consumer Object (not yet connected to a topic)
    Args:
        kafka_brokers (List of Strings): List of host:port combinations for kakfa brokers
        consumer_group (String): Consumer group to consume as. default is "default"
        timeout (String): Timeout in ms if no messages are found (during poll). Default
            is 6000
        offset_start (String): Where to start consuming with respect to the consumer
            group/topic offset. Default is "latest", which ignores any messages in the
            topic before the consumer begins consuming
        get_stats (Bool): Whether or not to print statistics. Default is True
    Return:
        kafka_consumer (Kafka Consumer Obj): Kafka Consumer Object
    """
def consume_topic(kafka_consumer, kafka_topics):
    """
    Purpose:
        Consume Kafka Topics
    Args:
        kafka_consumer (Kafka Consumer Obj): Kafka Consumer Object
        kafka_topics (List of Strings): List of Kafka Topics to Consume.
    Yields:
        msg (Kafka Message Obj): Message Obj returned from the topic
    """

kafka_exceptions.py

File for holding custom exception types that will be generated by the kafka_helpers libraries

Classes:

class TopicNotFound(Exception):
    """
    Purpose:
        The TopicNotFound will be raised when attempting to consume a topic that
        does not exist
    """

kafka_general_helpers.py

This library is used to interact with kafka not specificlly related to consuming or producing messages

Functions:

N/A

kafka_producer_helpers.py

This library is used to aid in creating kafka producers.

Functions:

def get_kafka_producer(kafka_brokers, get_stats=True):
    """
    Purpose:
        Get a Kafka Producer Object (not yet connected to a topic)
    Args:
        kafka_brokers (List of Strings): List of host:port combinations for kakfa brokers
        get_stats (Bool): Whether or not to print statistics. Default is True
    Return:
        kafka_producer (Kafka Producer Obj): Kafka Producer Object
    """
def produce_message(kafka_producer, kafka_topic, msg):
    """
    Purpose:
        Consume Kafka Topics
    Args:
        kafka_producer (Kafka Producer Obj): Kafka Producer Object
        kafka_topic (String): Kafka Topic to Produce message to.
        msg (String): Message to produce to Kafka
    Returns:
        N/A
    """
def produce_results_callback(err, msg):
    """
    Purpose:
        Optional per-message delivery callback (triggered by poll() or
        flush()) when a message has been successfully delivered or
        permanently failed delivery (after retries).
    Args:
        err (String): Error Message
        msg (Object): Kafka Callback Message Object
    Return:
        N/A
    """

Example Scripts

Example executable Python scripts/modules for testing and interacting with the library. These show example use-cases for the libraries and can be used as templates for developing with the libraries or to use as one-off development efforts.

consume_from_kafka_topic.py

    Purpose:
        Consume from a Kafka Topic

    Steps:
        - Connect to Kafka
        - Create Consumer Object
        - Poll Topic
        - Parse Message
        - Print Message

    example script call:
        python3 consume_from_kafka_topic.py --topic="test-env-topic" \
            --broker="0.0.0.0:9092" --consumer-group="test-env-consumer"

produce_to_kafka_topic.py

    Purpose:
        Produce to a Kafka Topic

    Steps:
        - Connect to Kafka
        - Create Producer Object
        - Prompt for Input
        - Parse Input
        - Produce Input to Kafka

    example script call:
        python3 produce_to_kafka_topic.py --topic="test-env-topic" \
            --broker="localhost:9092"

Notes

  • Relies on f-string notation, which is limited to Python3.6. A refactor to remove these could allow for development with Python3.0.x through 3.5.x

TODO

  • Unittest framework in place, but lacking tests