rest-solace is a rest based python library for Solace Message Broker that allows you to Publish, Consume, & Manage!!
It is written with the intent to be easy to understand, functional, and pythonic. Input and output parameters for almost every function is always one of int, float, str, bool, list, dict and None; making them directly compatible with json data types.
- Note:
-
Right now the focus of this library is on the 'messaging' mode for solace message VPNs.In the future I plan to add better support for 'gateway' mode as well.This library currently uses SEMPv2 for management and only supports basic/common management functions. You are encouraged to contribute to thr repo if you don't find something you wanted.
If you like my work, please give a star to the official repository to show support!!
You can find the docs for this library on this page .
If you are new to solace and confused about the terminology and workflows around it, it is highly recommended that you read this document first. It gives a brief explanation on the different components of solace; and that too within the context of this library.
from rest_solace import MessagingPublisher
publish = MessagingPublisher(user_name= "admin",
password=" admin",
host= BROKER_IP,
rest_vpn_port= VPN_PORT #For 'default' VPN it is 9000
)
#Synchronous method
publish.direct_message_to_queue(queue_name= "my_queue",
message= "hello world!!")
#Asynchronous method
import asyncio
coroutine_obj= async_direct_message_to_queue(queue_name= "my_queue",
message= "hello world!!")
asyncio.run(coroutine_obj)
#Synchronous method
publish.direct_message_for_topic(topic_string= "test_topic",
message= "hello world!!")
#Asynchronous method
import asyncio
coroutine_obj= publish.async_direct_message_for_topic(topic_string= "test_topic",
message= "hello world!!")
asyncio.run(coroutine_obj)
Publish to a queue and confirm if the message was received by the broker and spooled into the queue:
#Synchronous method
publish.persistent_message_to_queue(queue_name= "my_queue",
message= "hello world!!",
request_reply= False)
#Asynchronous method
import asyncio
coroutine_obj= publish.async_persistent_message_to_queue(queue_name= "my_queue",
message= "hello world!!",
request_reply= False)
asyncio.run(coroutine_obj)
Publish for a topic string and confirm if the message was received by the broker and spooled into a queue:
#Synchronous method
publish.persistent_message_for_topic(topic_string= "test_topic",
message= "hello world!!",
request_reply= False)
#Asynchronous method
import asyncio
coroutine_obj= publish.async_persistent_message_for_topic(topic_string= "test_topic",
message= "hello world!!",
request_reply= False)
asyncio.run(coroutine_obj)
#Synchronous method
response = publish.persistent_message_to_queue(queue_name= "my_queue",
message= "hello world!!",
request_reply= True)
print(response)
#Asynchronous method
import asyncio
coroutine_obj= publish.async_persistent_message_to_queue(queue_name= "my_queue",
message= "hello world!!",
request_reply= True)
response= asyncio.run(coroutine_obj)
print(response)
Publish for a topic string and confirm if the message was received by a consumer by requesting a reply:
#Synchronous method
response = publish.persistent_message_for_topic(topic_string= "test_topic",
message= "hello world!!"
request_reply= True)
print(response)
#Asynchronous method
import asyncio
coroutine_obj= publish.async_persistent_message_for_topic(topic_string= "test_topic",
message= "hello world!!"
request_reply= True)
response= asyncio.run(coroutine_obj)
print(response)
message_data= [
{
"direct_message_to_queue": {
"queue_name": "queue_rest_consumer",
"message": "direct_message_to_queue"
}
},
{
"direct_message_for_topic": {
"topic_string": "my_topic",
"message": "direct_message_for_topic"
}
},
{
"persistent_message_to_queue": {
"queue_name": "queue_rest_consumer",
"message": "persistent_message_to_queue",
"request_reply": false
}
},
{
"persistent_message_to_queue": {
"queue_name": "queue_rest_consumer",
"message": "persistent_message_to_queue",
"request_reply": true
}
},
{
"persistent_message_for_topic": {
"topic_string": "my_topic",
"message": "persistent_message_for_topic",
"request_reply": false
}
},
{
"persistent_message_for_topic": {
"topic_string": "my_topic",
"message": "persistent_message_for_topic",
"request_reply": true
}
}
]
response= publish.send_messages(data= message_data, async_mode= True)
print(response)
(You can use your own REST server too. The one included with this library is only for simple uses and testing)
from rest_solace import Consumer
consumer_obj = Consumer()
#Receive a single message and get the value returned to you.
incoming_message = consumer_obj.startConsumer(host= CONSUMER_HOST,
port= CONSUMER_PORT,
auto_stop= True #Required for single message mode
)
print(incoming_message)
from rest_solace import Consumer
consumer_obj = Consumer()
def return_uppercase(event:dict, kill_function):
"""Convert request message string to upper case to return as response.
Stops the consumer server if message is "kill".
Args:
event (dict): contains info about the received request.
kill_function (function): stops the consumer server if you run it.
Returns:
str: Returns the incoming message to the publisher in uppercase
"""
byte_string_content= event["content"][1:-1]
regular_string_content= byte_string_content.decode("utf-8")
uppercase_response= str.upper( regular_string_content )
if regular_string_content == "kill":
kill_function()
return uppercase_response
#You can run this function on a septate thread too if you want.
consumer_obj.startConsumer(host= CONSUMER_HOST,
port= CONSUMER_PORT,
callback_function= return_uppercase,
log= True)
(This is a bit advance but the library includes lots of utility functions to make initial setup easy)
from rest_solace import Manager
manager = Manager(user_name= admin,
password= admin,
host= BROKER_IP,
semp_port= SEMP_PORT) #Default rest management port is 8080
#Creating a custom message VPN
#(can automatically apply required VPN configuration for rest based communication).
manager.create_message_vpn(
msgVpnName= NEW_VPN_NAME,
serviceRestIncomingPlainTextListenPort= VPN_PORT, #Assign it an unused port
serviceRestMode= "messaging" #auto configuration will be influenced by this parameter
)
#Automatically setting up your Message VPN for rest based communication
manager.auto_rest_messaging_setup_utility(
msgVpnName= NEW_VPN_NAME, #Existing message VPN
queueName= 'my_queue', #Creates a new queue
subscriptionTopic="test_topic", #The topic the queue should subscribe to
restDeliveryPointName='myRDP', #New RDP to handle incoming messages
restConsumerName= 'myConsumer', #A name for your consumer
remoteHost= CONSUMER_HOST,
remotePort= CONSUMER_PORT
)
#Doing the same setup manually (Shown for comparison)
manager.update_client_profile(msgVpnName= NEW_VPN_NAME,
clientProfileName= "default",
allowGuaranteedMsgReceiveEnabled= True,
allowGuaranteedMsgSendEnabled= True)
manager.update_client_username(msgVpnName= NEW_VPN_NAME,
clientUsername= "default",
enabled= True)
manager.create_queue_endpoint(queueName='my_queue', msgVpnName=NEW_VPN_NAME)
manager.subscribe_to_topic_on_queue(msgVpnName= NEW_VPN_NAME,
subscriptionTopic= "test_topic",
queueName= 'my_queue')
manager.create_rest_delivery_point(msgVpnName= NEW_VPN_NAME,
restDeliveryPointName= 'myRDP',
clientProfileName= "default")
manager.specify_rest_consumer(msgVpnName= NEW_VPN_NAME,
restDeliveryPointName= 'myRDP',
restConsumerName= 'myConsumer',
remoteHost= CONSUMER_HOST,
remotePort= CONSUMER_PORT)
manager.create_queue_binding(msgVpnName= NEW_VPN_NAME,
restDeliveryPointName= 'myRDP',
queueBindingName= 'my_queue',
postRequestTarget= '/')
#Turning your RDP off and on again (Useful if solace has trouble connecting to your consumer)
manager.restart_rest_delivery_point(msgVpnName= NEW_VPN_NAME, restDeliveryPointName= 'myRDP')
- Add ability to specify host details separately for each message sending call.
- Adding a fast API + unicorn based consumer server option (Since fastAPI has more stability and better performance even if it has less features).
- Adding support for more management APIs and adding the relevant docs.