kinesis-stream-consumer

Kinesis stream consumer(reader) written in python.


Keywords
kinesis, consumer, kinesis-consumer, python, kinesis-client, client, kinesis-client-library, kinesis-stream
License
GPL-3.0
Install
pip install kinesis-stream-consumer==1.0.1

Documentation

kinesis-stream-consumer

Kinesis stream consumer channelize through redis along with aws autorefreshable session

Usage

Requirements

  • python >= 3.0
  • boto3 >= 1.13.5
  • kinesis-python >= 0.2.1
  • redis >= 3.5.0

Installation

Install with:

pip install kinesis-stream-consumer

Or, if you're using a development version cloned from this repository:

git clone https://github.com/harshittrivedi78/kinesis-stream-consumer.git
python kinesis-stream-consumer/setup.py install

This will install boto3 >= 1.13.5 and kinesis-python >= 0.2.1 and redis >= 3.5.0

How to use it?

There is two consumer which has to be run parallelly one is kinesis consumer and second is records queue consumer (redis). I have added a example.py file in this code base which can be used to check and test the code.

import threading

from kinesis_stream.consumer import KinesisConsumer
from kinesis_stream.record_queue import RecordQueueConsumer
from kinesis_stream.redis_wrapper import get_redis_conn

redis_conn = get_redis_conn(host="localhost", port=6379, db="0")

stream_name = "test-kinesis-stream"
region = "eu-west-1"

kinesis_consumer = KinesisConsumer(stream_name, region, redis_conn)
record_queue_consumer = RecordQueueConsumer(stream_name, redis_conn)

kinesis_consumer_thread = threading.Thread(name='kinesis_consumer', target=kinesis_consumer.start)
kinesis_consumer_thread.start()

record_queue_consumer_thread = threading.Thread(name='record_queue_consumer', target=record_queue_consumer.start)
record_queue_consumer_thread.start()

Override handle_message func to do some stuff with the kinesis messages.

from kinesis_stream.record_queue import RecordQueueConsumer as BaseRecordQueueConsumer

class RecordQueueConsumer(BaseRecordQueueConsumer):
    def handle_message(self, message):
        # your code
        print(message)