tom-alertstreams
is a reusable TOM Toolkit app for listening to kafka streams.
tom-alertstreams
provides a management command, readstreams
. There are no urlpatterns
,
no Views, and no templates. The readstreams
management command reads the settings.py
ALERT_STREAMS
configuration and starts listening to each configured Kafka stream. It is not expected
to return, and is intended to run along side your TOM's server component. The ALERT_STREAMS
configuration (see below) tells readstreams
what streams to access, how to access them,
what topics to listen to, and what to do with messages that arrive on a given topic.
-
Install the package into your TOM environment:
pip install tom-alertstreams
-
In your project
settings.py
, addtom_alertstreams
to yourINSTALLED_APPS
setting:INSTALLED_APPS = [ ... 'tom_alertstreams', ]
At this point you can verify the installation by running ./manage.py
to list the available
management commands and see
[tom_alertstreams]
readstreams
in the output.
Each Kafka stream that your TOM listens to (via readstreams
) will have a configuration dictionary
in your settings.py
ALERT_STREAMS
. ALERT_STREAMS
is a list of configuration dictionaries, one
dictionary for each Kafka stream. Here's an example ALERT_STREAMS
configuration for two Kafka streams:
SCiMMA Hopskotch and
GCN Classic over Kafka.
ALERT_STREAMS = [
{
'ACTIVE': True,
'NAME': 'tom_alertstreams.alertstreams.hopskotch.HopskotchAlertStream',
'OPTIONS': {
'URL': 'kafka://kafka.scimma.org/',
# The hop-client requires that the GROUP_ID prefix match the SCIMMA_AUTH_USERNAME
'GROUP_ID': os.getenv('SCIMMA_AUTH_USERNAME', "") + '-' + 'uniqueidforyourapp12345',
'USERNAME': os.getenv('SCIMMA_AUTH_USERNAME', None),
'PASSWORD': os.getenv('SCIMMA_AUTH_PASSWORD', None),
'TOPIC_HANDLERS': {
'sys.heartbeat': 'tom_alertstreams.alertstreams.hopskotch.heartbeat_handler',
'tomtoolkit.test': 'tom_alertstreams.alertstreams.hopskotch.alert_logger',
'hermes.test': 'tom_alertstreams.alertstreams.hopskotch.alert_logger',
'hermes.*': 'regex match public topics here, requires * handler to be defined'
'*': 'default_handler_here'
},
},
},
{
'ACTIVE': True,
'NAME': 'tom_alertstreams.alertstreams.gcn.GCNClassicAlertStream',
# The keys of the OPTIONS dictionary become (lower-case) properties of the AlertStream instance.
'OPTIONS': {
# see https://github.com/nasa-gcn/gcn-kafka-python#to-use for configuration details.
'GCN_CLASSIC_CLIENT_ID': os.getenv('GCN_CLASSIC_CLIENT_ID', None),
'GCN_CLASSIC_CLIENT_SECRET': os.getenv('GCN_CLASSIC_CLIENT_SECRET', None),
'DOMAIN': 'gcn.nasa.gov', # optional, defaults to 'gcn.nasa.gov'
'CONFIG': { # optional
# 'group.id': 'tom_alertstreams-my-custom-group-id',
# 'auto.offset.reset': 'earliest',
# 'enable.auto.commit': False
},
'TOPIC_HANDLERS': {
'gcn.classic.text.LVC_INITIAL': 'tom_alertstreams.alertstreams.alertstream.alert_logger',
'gcn.classic.text.LVC_PRELIMINARY': 'tom_alertstreams.alertstreams.alertstream.alert_logger',
'gcn.classic.text.LVC_RETRACTION': 'tom_alertstreams.alertstreams.alertstream.alert_logger',
},
},
}
]
The configuration dictionary for each AlertStream
subclass will contain these key-value pairs:
-
ACTIVE
: Boolean which tellsreadstreams
to access this stream. Should beTrue
, unless you want to keep a configuration dictionary, but ignore the stream. -
NAME
: The name of theAlertStream
subclass that implements the interface to this Kafka stream.tom_alertstreams
will provideAlertStream
subclasses for major astromical Kafka streams. See below for instructions on Subclassing theAlertStream
base class. -
OPTIONS
: A dictionary of key-value pairs specific to theAlertStream
subclass given byNAME
. The doc string for theAlertStream
subclass should document what is expected. Typically, a URL, authentication information, and a dictionary,TOPIC_HANDLERS
, will be required. See "SubclassingAlertStream
" below. TheAlertStream
subclass will convert the key-value pairs of theOPTIONS
dictionary into properties (and values) of theAlertStream
subclass instance.- The hopskotch alert stream supports a wildcard of
*
for an alert handler topic name. If specified, ALL public topics will be subscribed and use that handler function. A directly specified topic handler will always be used before the*
handler for any topic that is covered twice.
- The hopskotch alert stream supports a wildcard of
As part of your OPTIONS
for each Kafka stream, you need to configure access credentials. Visit these links
to get credentials for Hopskotch and GCN Classic over Kafka.
Set the environment variables with the username and passwords obtained. Do not check them in to your code repository.
Assuming that an AlertStream
subclass exists for the Kafka stream of interest,
the keys of the TOPIC_HANDLERS
dictionary are the topics that will be subscribed to. The values
of the TOPIC_HANDLERS
dictionary specify alert handling methods that will be imported and called
for each alert recieved on that topic. An example is provided,
tom_alerts.alertstreams.alertstream.alert_logger
, which simply logs the alert.
To customize this behaviour according to the needs of your TOM, define an alert handling function for each
topic that you wish to subscribe to. Your TOPIC_HANDLERS
dictionary will have a an entry for each topic
whose key is the topic name and whose value is a string indicating the dot-path to the alert handling function.
When the AlertStream
subclass is instanciated, the OPTIONS
dictionary is read and an alert_handler
dictionary is created. It is keyed by topic name and it's values are the imported callable functions specified by the
dot-path strings. readstreams
will call the alert handler for each alert that comes in on the topic. The signiture
of the alert handling function is specific to the AlertStream
subclasss.
Ideally, As a TOM developer, there is already an AlertStream
-subclass for the alert stream that you
want your TOM to listen to. If so, you need only to configure your TOM to use it in settings.py
ALERT_STREAMS
. If you must implement your own AlertStream
subclass, please get in touch. In the meantime, here's a brief outline:
-
Create subclass of
AlertStream
. -
Create
required_keys
andallowed_keys
class variables in yourAlertStream
-subclass.These are lists of strings refering to the keys of the
OPTIONS
dictionary. The purpose of these is to help TOM developers using yourAlertStream
-subclass with the key-value pairs in theirALERT_STREAMS
OPTIONS
configuration dictionary. -
Implement the
listen()
method.This method will be called by the
readstreams
management command and is not expected to return. It should instanciate your consumer, subscribe to the topics configured inALERT_STREAMS
, and start consuming. The detail of this will depend on the kafka-client used. Seealertstreams.gcn.listen()
andalertstreams.hopskotch.listen()
for examples to follow.The loop which consumes messages in your
listen()
method should extract the topic from each message and callself.alert_handler[topic]()
with the message or message-derived arguments specific to your kafka client. Users of yourAlertStream
-subclass will write these topic-specific alert handling methods and configure them in theTOPIC_HANLDERS
dictionary of theirALERT_STREAMS
configuration. TheAlertStream
base class will set up thealert_handler
dictionary according to your users' configuration. It helps your users to provide an examplealert_hander()
function in your module as an example. (Again, seealertstreams.gcn.listen()
andalertstreams.hopskotch.listen()
, their configurations insettings.py
, and thealertstreams.gcn.alert_logger()
and `alertstreams.hopskotch.alert_logger() methods, for example).