Host your deep learning models easily.

deep-learning, inference, model-serving, python
pip install ventu==0.4.5



pypi versions Python Test Python document Language grade: Python

Serving the deep learning models easily.


pip install ventu


  • only need to implement Model(preprocess, postprocess, inference or batch_inference)
  • request & response data validation using pydantic
  • API document using SpecTree (when run with run_http)
  • backend service using falcon supports both JSON and msgpack
  • dynamic batching with batching using Unix domain socket or TCP
    • errors in one request won't affect others in the same batch
    • load balancing
  • support all the runtime
  • health check
  • monitoring metrics (Prometheus)
    • if you have multiple workers, remember to setup prometheus_multiproc_dir environment variable to a directory
  • inference warm-up

How to use

  • define your request data schema and response data schema with pydantic
    • add examples to schema.Config.schema_extra[examples] for warm-up and health check (optional)
  • inherit ventu.Ventu, implement the preprocess and postprocess methods
  • for standalone HTTP service, implement the inference method, run with run_http
  • for the worker behind dynamic batching service, implement the batch_inference method, run with run_socket

check the document for API details


The demo code can be found in examples.


Install requirements pip install numpy torch transformers httpx

import argparse
import logging

import numpy as np
import torch
from pydantic import BaseModel, confloat, constr
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification

from ventu import Ventu

# request schema used for validation
class Req(BaseModel):
    # the input sentence should be at least 2 characters
    text: constr(min_length=2)

    class Config:
        # examples used for health check and warm-up
        schema_extra = {
            'example': {'text': 'my cat is very cut'},
            'batch_size': 16,

# response schema used for validation
class Resp(BaseModel):
    positive: confloat(ge=0, le=1)
    negative: confloat(ge=0, le=1)

class ModelInference(Ventu):
    def __init__(self, *args, **kwargs):
        # initialize super class with request & response schema, configs
        super().__init__(*args, **kwargs)
        # initialize model and other tools
        self.tokenizer = DistilBertTokenizer.from_pretrained(
        self.model = DistilBertForSequenceClassification.from_pretrained(

    def preprocess(self, data: Req):
        # preprocess a request data (as defined in the request schema)
        tokens = self.tokenizer.encode(data.text, add_special_tokens=True)
        return tokens

    def batch_inference(self, data):
        # batch inference is used in `socket` mode
        data = [torch.tensor(token) for token in data]
        with torch.no_grad():
            result = self.model(torch.nn.utils.rnn.pad_sequence(data, batch_first=True))[0]
        return result.numpy()

    def inference(self, data):
        # inference is used in `http` mode
        with torch.no_grad():
            result = self.model(torch.tensor(data).unsqueeze(0))[0]
        return result.numpy()[0]

    def postprocess(self, data):
        # postprocess a response data (returned data as defined in the response schema)
        scores = (np.exp(data) / np.exp(data).sum(-1, keepdims=True)).tolist()
        return {'negative': scores[0], 'positive': scores[1]}

def create_model():
    logger = logging.getLogger()
    formatter = logging.Formatter(
        fmt='%(asctime)s - %(levelname)s - %(module)s - %(message)s')
    handler = logging.StreamHandler()

    model = ModelInference(Req, Resp, use_msgpack=True)
    return model

def create_app():
    """for gunicorn"""
    return create_model().app

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Ventu service')
    parser.add_argument('--mode', '-m', default='http', choices=('http', 'unix', 'tcp'))
    parser.add_argument('--host', default='localhost')
    parser.add_argument('--port', '-p', default=8080, type=int)
    parser.add_argument('--socket', '-s', default='batching.socket')
    args = parser.parse_args()

    model = create_model()
    if args.mode == 'unix':
    elif args.mode == 'tcp':
        model.run_tcp(, args.port)
        model.run_http(, args.port)

You can run this script as:

  • a single thread HTTP service: python examples/
  • a HTTP service with multiple workers: gunicorn -w 2 -b localhost:8080 ''
    • when run as a HTTP service, can check the follow links:
      • /metrics Prometheus metrics
      • /health health check
      • /inference inference
      • /apidoc/redoc or /apidoc/swagger OpenAPI document
  • an inference worker behind the batching service: python examples/ -m socket (Unix domain socket) or python examples/ -m tcp --host localhost --port 8888 (TCP) (need to run the batching service first)


from concurrent import futures

import httpx
import msgpack

URL = 'http://localhost:8080/inference'
HEADER = {'Content-Type': 'application/msgpack'}
packer = msgpack.Packer(

def request(text):
    return, data=packer.pack({'text': text}), headers=HEADER)

if __name__ == "__main__":
    with futures.ThreadPoolExecutor() as executor:
        text = [
            'They are smart',
            'what is your problem?',
            'I hate that!',
        results =, text)
        for i, resp in enumerate(results):
                f'>> {text[i]} -> [{resp.status_code}]\n'