sse-starlette

SSE plugin for Starlette


License
BSD-3-Clause
Install
pip install sse-starlette==2.1.0

Documentation

Server Sent Events for Starlette and FastAPI

Downloads PyPI Version Build Status Code Coverage

Implements the Server-Sent Events specification.

Background: https://sysid.github.io/server-sent-events/

Installation:

pip install sse-starlette

Usage:

import asyncio
import uvicorn
from starlette.applications import Starlette
from starlette.routing import Route
from sse_starlette.sse import EventSourceResponse

async def numbers(minimum, maximum):
    for i in range(minimum, maximum + 1):
        await asyncio.sleep(0.9)
        yield dict(data=i)

async def sse(request):
    generator = numbers(1, 5)
    return EventSourceResponse(generator)

routes = [
    Route("/", endpoint=sse)
]

app = Starlette(debug=True, routes=routes)

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000, log_level='info')

Output: output

Caveat: SSE streaming does not work in combination with GZipMiddleware.

Be aware that for proper server shutdown your application must stop all running tasks (generators). Otherwise you might experience the following warnings at shutdown: Waiting for background tasks to complete. (CTRL+C to force quit).

Client disconnects need to be handled in your Request handler (see example.py):

async def endless(req: Request):
    async def event_publisher():
        i = 0
        try:
          while True:
              i += 1
              yield dict(data=i)
              await asyncio.sleep(0.2)
        except asyncio.CancelledError as e:
          _log.info(f"Disconnected from client (via refresh/close) {req.client}")
          # Do any other cleanup, if any
          raise e
    return EventSourceResponse(event_publisher())

Special use cases

Customize Ping

By default, the server sends a ping every 15 seconds. You can customize this by:

  1. setting the ping parameter
  2. by changing the ping event to a comment event so that it is not visible to the client
@router.get("")
async def handle():
    generator = numbers(1, 100)
    return EventSourceResponse(
        generator,
        headers={"Server": "nini"},
        ping=5,
        ping_message_factory=lambda: ServerSentEvent(**{"comment": "You can't see\r\nthis ping"}),
    )

SSE Send Timeout

To avoid 'hanging' connections in case HTTP connection from a certain client was kept open, but the client stopped reading from the connection you can specifiy a send timeout (see #89).

EventSourceResponse(..., send_timeout=5)  # terminate hanging send call after 5s

Fan out Proxies

Fan out proxies usually rely on response being cacheable. To support that, you can set the value of Cache-Control. For example:

return EventSourceResponse(
        generator(), headers={"Cache-Control": "public, max-age=29"}
    )

Error Handling

See example: examples/error_handling.py

Sending Responses without Async Generators

Async generators can expose tricky error and cleanup behavior especially when they are interrupted.

Background: Cleanup in async generators.

Example no_async_generators.py shows an alternative implementation that does not rely on async generators but instead uses memory channels (examples/no_async_generators.py).

Development, Contributing

  1. install pdm: pip install pdm
  2. install dependencies using pipenv: pdm install -d.
  3. To run tests:

Makefile

  • make sure your virtualenv is active
  • check Makefile for available commands and development support, e.g. run the unit tests:
make test
make tox

For integration testing you can use the provided examples in tests and examples.

If you are using Postman, please see: #47 (comment)