A protocol agnostic RPC client stack for python.
- Built in support for HTTP, Thrift, ThriftMux, Kafka, and Redis (experimental).
- Extensible stack for easy support of other protocols.
- Fully asynchronous API
- Robust load balancing and error detection / recovery.
- Service discovery via ZooKeeper
pip install scales-rpc
Getting started with scales is very simple. For example, lets use it to do an HTTP GET of www.google.com
from scales.http import Http client = Http.NewClient('tcp://www.google.com:80') response = client.Get('/') print(response.text)
The HTTP client is the simplest type, you give it a URI (see service discovery below), and it returns a client with
Post(uri, data) methods. The response is a
requests response object.
Out of the box, scales uses the
ScalesUriParser to parse the URIs passed to NewClient. The
ScalesUriParser supports two protocols,
tcp:// to create a static serverset of host:port pairs (for example
zk:// to create a dynamic serverset based on a ZooKeeper node. ZooKeeper URIs should be in the form of
Monitoring / Metrics
Scales provides an internal metrics tracking system called Varz. A component in scales.varz called the VarzReceiver handles tracking and aggregating metrics. This component can be used as-is, or replaced at runtime via monkey patching to integrate with a custom metrics system.
In addition, a helper class, VarzAggregator, can be used to generate varz aggregations. By default metrics are aggregated to the service level, however this can be customized by passing in a custom key selector to Aggregate.
aggregated_varz = VarzAggregator.Aggregate( VarzReceiver.VARZ_DATA, VarzReceiver.VARZ_METRICS)
The scales core is composed of 4 modules
- Load Balancers
A message is an envelope to carry some data. In scales, there are two main messages,
MethodReturnMessage, representing a request and response.
Sinks are the core message processing unit of scales. In scales, every layer of the RPC stack is a sink. Some examples of sinks are:
Serializer sinks handle serializing a
Messageobject into a stream.
- Transport sinks handle sending and receiving data over a transport (socket, HTTP, etc)
- Dispatch sinks handles initiating a method call, and is called by the tranparent client proxy.
Load balancers are sinks as well, however, they (as well as pools) are important enough to have their own category. Scales provides two load balancers out of the box, the
HeapBalancerSink, and the
HeapBalancerSink maintains a min-heap of all nodes in the serverset, and dispatches requests to the least-loaded node at the time. Nodes detected as down are not dispatched to unless all nodes have failed.
ApertureBalancerSink is a specialization of the
HeapBalancerSink which attempts to maintain the smallest possible subset of the serverset to maintain load within a certain load band. Nodes are added to the aperture when the load average of the serverset reaches a certain amount (2 by default), and are removed when the load average goes below a certain amount (.5 by default). This method is useful for situations where the load the client is generating is small in relation to the size of the serverset. The aperture balancer is the default for all scales clients.
Pools maintain one or more transport sinks to an underlying endpoint and handle request concurrency to that sink. Scales comes with two pool types, the
SingletonPool and the
SingletonPool maintains at most one transport sink, and allows unlimited concurrency to it. This sink is used for transports that allow multiplexing requests over a single connection, such a ThriftMux.
WatermarkPool maintains a sink pool, sized by a low watermark, and a high watermark. The pool grows until it hits the low watermark, and maintains up to that many sinks forever. Once the number of concurrently open sinks reaches the low watermark, new sinks are created for each request, until the number of concurrently open sinks reaches a high watermark. At this point, incomming requests are queued until the number of concurrently open sinks goes below the high watermark.
Out of the box, scales supports five protocols,
kafka (producer only), and and
Thrift (and ThriftMux)
Scales supports calling thrift services via autogenerated python thrift code generated by the Apache Thrift compiler. Serialization and deserialization are handled by the thrift library using the
Scales proxies are created for Thrift and ThriftMux using their respective builders (
scales.thriftmux.ThriftMux), and passing the generated thrift interface.
from my_project.gen_py.example_rpc_service import (ExampleService, ttypes) from scales.thriftmux import ThriftMux client = ThriftMux.NewClient(ExampleService.Iface, 'tcp://localhost:8080') ret = client.passMessage(ttypes.Message('hi!'))
Scales provides a robust, high performance Kafka producer client. It supports discovering the kafka brokers either directly or via ZooKeeper. Sending messages is very simple:
from scales.kafka import Kafka client = Kafka.NewClient("tcp://broker1:9092,broker2:9092") client.Put("my-topic", ["payload 1", "payload 2"])
- Currently messages are distributed across partitions using a least-loaded strategy (via a HeapBalancer). Partition selection via a hash function is unsupported.
- Only the producer API is implemented.
The redis client is a highly experimental wrapper of the python redis client. It is not recommended for production use.
A primary goal of scales was to build a fully asychronous system. All methods should be non-blocking, instead opting to either return an AsyncResult representing the state of the operation, or operate in a continuation passing style.
Transparent Proxy / Dispatcher
The entry point to scales is a transparent proxy, which is generated by the Scales builder, or through helper methods such as
ThriftMux.NewClient. A proxy's job is to intercept all methods defined by a type (an interface) and route it to a dispatcher. Scales provides one dispatcher, the
MessageDispatcher is a special type of
ClientMessageSink that initiates the sink chain. It takes a method call, packages it into a
MethodCallMessage, then fowards it to the next sink. On the response side, it terminates the chain by taking a response method and applying it to a gevent
AsyncResult. The proxy uses this async result to either wait on (in the synchronous case) or return to the caller to use (in the asynchronous case).
Messages (request or response) flow through scales in a cooperative chain. Each sink takes a message from the previous, does something to it, and passes it to the next. If a sink wants to also handle the response message, it installs itself in the response
sink_stack.Push(self, ...). Response messages traverse the stack in a similar cooperative way, with each sink calling the next sink on the stack via
Serializer sinks translate a
MethodCallMessage into a serialized stream. Serializer sinks perform the switch from
stream (and the reverse) during sink stack processing.
Transport sinks are the final sink in the request sink chain, and initiate the response sink chain. As the name implies, transport sinks are
ClientMessageSinks that take message and stream, and send it across some form of transport (for example, Thrift and ThriftMux both use a TCP transport). They also handle reading from the transport and dispatching the response data back up the sink chain for the message that the response belongs to. This may range from simple, in the thrift case, a transport sink can only handle one request concurrently, or more complicated. For example, the ThriftMux transport sink maintains a hash table of tags (read from the response stream) to response sink stacks.
Transport sinks are also responsible for handling timeouts. They can either read the deadline from the message passed in and calculate a timeout from it to be used by the transport, or use the timeout even also on the message to asynchronously trigger a timeout. They are not however, required to notify the client of the timeout. The timeout sink will accomplish that.
In addition, transport sinks must support having multiple concurrent
Open() calls pending. Some upstream sinks may call
Open() more than once on a transport, and rely on this being a safe operation.
Finally, transport sinks must detect when they're underlying connection is no longer usable, and report this up the stack by setting their on_faulted observable. Upstream sinks use this event to trigger reconnection logic, load balancer adjustment, etc.
There are a few common extension points that one would want to use when implementing new features in scales.
The simplest is implementing a sink that simply inspects (and possibly modifies) messages as they pass through the system.