rest-solace

REST API library for Solace Message Broker. Publish, Consume, & Manage!!


Keywords
solace, rest, API, rest-solace, rest_solace, python, pythonic, rest-api, restful
License
Apache-2.0
Install
pip install rest-solace==0.2.1

Documentation

rest-solace

Dynamic XML Badge

rest-solace is a rest based python library for Solace Message Broker that allows you to Publish, Consume, & Manage!!

It is written with the intent to be easy to understand, functional, and pythonic. Input and output parameters for almost every function is always one of int, float, str, bool, list, dict and None; making them directly compatible with json data types.

Note:
Right now the focus of this library is on the 'messaging' mode for solace message VPNs.
In the future I plan to add better support for 'gateway' mode as well.
This library currently uses SEMPv2 for management and only supports basic/common management functions. You are encouraged to contribute to thr repo if you don't find something you wanted.

Check it out at PyPI.
View the code at Github.
Read the docs from Here.

If you like my work, please give a star to the official repository to show support!!

Getting started with Solace:

You can find the docs for this library on this page .

If you are new to solace and confused about the terminology and workflows around it, it is highly recommended that you read this document first. It gives a brief explanation on the different components of solace; and that too within the context of this library.


Sending messages (for message-VPN in messaging mode):

Creating a publisher object:

from rest_solace import MessagingPublisher

publish = MessagingPublisher(user_name= "admin",
                             password=" admin",
                             host= BROKER_IP,
                             rest_vpn_port= VPN_PORT #For 'default' VPN it is 9000
                            )

Publish to a queue and confirm if the message was received by the broker:

#Synchronous method
publish.direct_message_to_queue(queue_name= "my_queue",
                                message= "hello world!!")

#Asynchronous method
import asyncio
coroutine_obj= async_direct_message_to_queue(queue_name= "my_queue",
                                             message= "hello world!!")
asyncio.run(coroutine_obj)

Publish for a topic string and confirm if the message was received by the broker:

#Synchronous method
publish.direct_message_for_topic(topic_string= "test_topic",
                                 message= "hello world!!")

#Asynchronous method
import asyncio
coroutine_obj= publish.async_direct_message_for_topic(topic_string= "test_topic",
                                                      message= "hello world!!")
asyncio.run(coroutine_obj)

Publish to a queue and confirm if the message was received by the broker and spooled into the queue:

#Synchronous method
publish.persistent_message_to_queue(queue_name= "my_queue",
                                    message= "hello world!!",
                                    request_reply= False)

#Asynchronous method
import asyncio
coroutine_obj= publish.async_persistent_message_to_queue(queue_name= "my_queue",
                                                         message= "hello world!!",
                                                         request_reply= False)
asyncio.run(coroutine_obj)

Publish for a topic string and confirm if the message was received by the broker and spooled into a queue:

#Synchronous method
publish.persistent_message_for_topic(topic_string= "test_topic",
                                     message= "hello world!!",
                                     request_reply= False)

#Asynchronous method
import asyncio
coroutine_obj= publish.async_persistent_message_for_topic(topic_string= "test_topic",
                                                          message= "hello world!!",
                                                          request_reply= False)
asyncio.run(coroutine_obj)

Publish to a queue and confirm if the message was received by a consumer by requesting a reply:

#Synchronous method
response = publish.persistent_message_to_queue(queue_name= "my_queue",
                                               message= "hello world!!",
                                               request_reply= True)
print(response)

#Asynchronous method
import asyncio
coroutine_obj= publish.async_persistent_message_to_queue(queue_name= "my_queue",
                                                         message= "hello world!!",
                                                         request_reply= True)
response= asyncio.run(coroutine_obj)
print(response)

Publish for a topic string and confirm if the message was received by a consumer by requesting a reply:

#Synchronous method
response = publish.persistent_message_for_topic(topic_string= "test_topic",
                                                message= "hello world!!"
                                                request_reply= True)
print(response)

#Asynchronous method
import asyncio
coroutine_obj= publish.async_persistent_message_for_topic(topic_string= "test_topic",
                                                          message= "hello world!!"
                                                          request_reply= True)
response= asyncio.run(coroutine_obj)
print(response)

Publish multiple messages in a batch (Asynchronous or Synchronously):

message_data= [
    {
        "direct_message_to_queue": {
            "queue_name": "queue_rest_consumer",
            "message": "direct_message_to_queue"
        }
    },
    {
        "direct_message_for_topic": {
            "topic_string": "my_topic",
            "message": "direct_message_for_topic"
        }
    },
    {
        "persistent_message_to_queue": {
            "queue_name": "queue_rest_consumer",
            "message": "persistent_message_to_queue",
            "request_reply": false
        }
    },
    {
        "persistent_message_to_queue": {
            "queue_name": "queue_rest_consumer",
            "message": "persistent_message_to_queue",
            "request_reply": true
        }
    },
    {
        "persistent_message_for_topic": {
            "topic_string": "my_topic",
            "message": "persistent_message_for_topic",
            "request_reply": false
        }
    },
    {
        "persistent_message_for_topic": {
            "topic_string": "my_topic",
            "message": "persistent_message_for_topic",
            "request_reply": true
        }
    }
]

response= publish.send_messages(data= message_data, async_mode= True)
print(response)

Receiving messages and sending back a response:

(You can use your own REST server too. The one included with this library is only for simple uses and testing)

Receive a single message and get the value returned to you:

from rest_solace import Consumer

consumer_obj = Consumer()

#Receive a single message and get the value returned to you.
incoming_message = consumer_obj.startConsumer(host= CONSUMER_HOST,
                                              port= CONSUMER_PORT,
                                              auto_stop= True #Required for single message mode
                                              )
print(incoming_message)

Keep receiving messages and handle them through a callback function:

from rest_solace import Consumer

consumer_obj = Consumer()

def return_uppercase(event:dict, kill_function):
"""Convert request message string to upper case to return as response.
Stops the consumer server if message is "kill".

Args:
    event (dict): contains info about the received request.
    kill_function (function): stops the consumer server if you run it.
Returns:
    str: Returns the incoming message to the publisher in uppercase
"""
byte_string_content= event["content"][1:-1]
regular_string_content= byte_string_content.decode("utf-8")
uppercase_response= str.upper( regular_string_content )

if regular_string_content == "kill":
    kill_function()

return uppercase_response

#You can run this function on a septate thread too if you want.
consumer_obj.startConsumer(host= CONSUMER_HOST,
                           port= CONSUMER_PORT,
                           callback_function= return_uppercase,
                           log= True)

Setting up a message VPN for message broking (in messaging mode):

(This is a bit advance but the library includes lots of utility functions to make initial setup easy)

from rest_solace import Manager

manager = Manager(user_name= admin,
                  password= admin,
                  host= BROKER_IP,
                  semp_port= SEMP_PORT) #Default rest management port is 8080


#Creating a custom message VPN
#(can automatically apply required VPN configuration for rest based communication).
manager.create_message_vpn(
    msgVpnName= NEW_VPN_NAME,
    serviceRestIncomingPlainTextListenPort= VPN_PORT, #Assign it an unused port
    serviceRestMode= "messaging" #auto configuration will be influenced by this parameter
)


#Automatically setting up your Message VPN for rest based communication
manager.auto_rest_messaging_setup_utility(
    msgVpnName= NEW_VPN_NAME,                   #Existing message VPN
    queueName= 'my_queue',                      #Creates a new queue
    subscriptionTopic="test_topic",             #The topic the queue should subscribe to
    restDeliveryPointName='myRDP',              #New RDP to handle incoming messages
    restConsumerName= 'myConsumer',             #A name for your consumer
    remoteHost= CONSUMER_HOST,
    remotePort= CONSUMER_PORT
)


#Doing the same setup manually (Shown for comparison)
manager.update_client_profile(msgVpnName= NEW_VPN_NAME,
                              clientProfileName= "default",
                              allowGuaranteedMsgReceiveEnabled= True,
                              allowGuaranteedMsgSendEnabled= True)
manager.update_client_username(msgVpnName= NEW_VPN_NAME,
                               clientUsername= "default",
                               enabled= True)
manager.create_queue_endpoint(queueName='my_queue', msgVpnName=NEW_VPN_NAME)
manager.subscribe_to_topic_on_queue(msgVpnName= NEW_VPN_NAME,
                                    subscriptionTopic= "test_topic",
                                    queueName= 'my_queue')
manager.create_rest_delivery_point(msgVpnName= NEW_VPN_NAME,
                                   restDeliveryPointName= 'myRDP',
                                   clientProfileName= "default")
manager.specify_rest_consumer(msgVpnName= NEW_VPN_NAME,
                              restDeliveryPointName= 'myRDP',
                              restConsumerName= 'myConsumer',
                              remoteHost= CONSUMER_HOST,
                              remotePort= CONSUMER_PORT)
manager.create_queue_binding(msgVpnName= NEW_VPN_NAME,
                             restDeliveryPointName= 'myRDP',
                             queueBindingName= 'my_queue',
                             postRequestTarget= '/')


#Turning your RDP off and on again (Useful if solace has trouble connecting to your consumer)
manager.restart_rest_delivery_point(msgVpnName= NEW_VPN_NAME, restDeliveryPointName= 'myRDP')

Future plans:

  • Add ability to specify host details separately for each message sending call.
  • Adding a fast API + unicorn based consumer server option (Since fastAPI has more stability and better performance even if it has less features).
  • Adding support for more management APIs and adding the relevant docs.