TODO:
- Tests
Usage
server.py
from ws_amqp.server import Server
def hello(*args, **kwargs):
return 'hello' + str(args[0])
server = Server()
server.register_endpoint('hello', hello)
if __name__ == '__main__':
try:
server.start()
except KeyboardInterrupt:
server.channel.close()
server.connection.close()
client.py
from ws_amqp.client import Client
client = Client()
# execute method 'hello' on server async
client.publish('hello', ' world')
# call method 'hello' on server and get the result
result = client.call('hello', ' world')
Pickle as a dumper
server.py
import pickle
from ws_amqp.server import Server
def hello(*args, **kwargs):
raise Exception('Some error')
server = Server(dumper=pickle)
server.register_endpoint('hello', hello)
if __name__ == '__main__':
try:
server.start()
except KeyboardInterrupt:
server.channel.close()
server.connection.close()
client.py
import pickle
from ws_amqp.client import Client
client = Client(dumper=pickle)
result = client.call('hello', ' world')
if isinstance(result, Exception):
raise result