Yet another library for server-sent events.
This library works with httpx to support
synchronous as well as asynchronous workflows but is also usable with other
http frameworks (see below).
sync
import logging
import ssec
def main() -> None:
logging.basicConfig(level=logging.INFO)
for event in ssec.sse(
"https://stream.wikimedia.org/v2/stream/recentchange"
):
print(event)
main()
async
import asyncio
import logging
import ssec
async def main() -> None:
logging.basicConfig(level=logging.INFO)
async for event in ssec.sse_async(
"https://stream.wikimedia.org/v2/stream/recentchange"
):
print(event)
asyncio.run(main())
Although there are already some libraries on the subject
(aiohttp-sse-client,
aiosseclient), these are
unfortunately not entirely correct. In example, both mentioned libraries
asynchronously iterate over the stream content via async for line in response.content
12.
This internally calls aiohttp's
readuntil
method with the default seperator \n
, but the official specification says:
Lines must be separated by either a U+000D CARRIAGE RETURN U+000A LINE FEED (CRLF) character pair, a single U+000A LINE FEED (LF) character, or a single +000D CARRIAGE RETURN (CR) character.
Another point is the error handling, which is often not sufficient to analyze the error or is entirely skipped.
Although this library works with httpx
, it is also possible to use it with
other http frameworks like aiohttp
as long as they provide a method to
iterate over a byte-stream. Unfortunately, it is not possible to handle
reconnection then, so you will have to implement that by yourself. An example
could look like this:
import asyncio
import logging
import aiohttp
import ssec
async def main() -> None:
logging.basicConfig(level=logging.INFO)
chunk_size = 1024
connect_attempt = 1
max_connect_attempts = 5
config = ssec.SSEConfig(reconnect_timeout=3)
async with aiohttp.ClientSession() as session:
while True:
headers = {
"Accept": "text/event-stream",
"Cache-Control": "no-store",
}
if config.last_event_id:
headers["Last-Event-ID"] = config.last_event_id
try:
async with session.get(
"https://stream.wikimedia.org/v2/stream/recentchange",
) as response:
streamer = response.content.iter_chunked(chunk_size)
async for event in ssec.stream_async(streamer, config=config):
print(event)
except aiohttp.ClientError:
if connect_attempt >= max_connect_attempts:
logging.exception("Failed to connect!")
raise
waiting_period = config.reconnect_timeout
message = (
f"Failed to connect. "
f"Reconnect in {waiting_period} seconds "
f"[attempt {connect_attempt}/{max_connect_attempts}]."
)
logging.info(message)
connect_attempt += 1
await asyncio.sleep(waiting_period)
asyncio.run(main())
ssec is written in Python and tries to keep track of the newest version available. Currently3, this is Python 3.12.3. On some operating systems, this version is pre-installed, but on many it is not. This guide will not go into details on the installation process, but there are tons of instructions out there to guide you. A good starting point is the beginners guide.
..via rye:
rye add ssec
..via pip:
pip install ssec
1. Clone this repository to a desired location on your maschine using ssh
:
git git@github.com:sharly-project/ssec.git
2. Change into the project directory:
cd ssec
3. Sync:
rye sync
4. Run an example:
rye run sse_sync_example
- or -
rye run sse_async_example
5. Start coding!
Build the documentation by running the following command in the root directory of the project:
sphinx-build -b html docs/src docs/build
The command requires that the developers edition of
ssec
is installed and the virtual environment is running.
The documentation is then accessible via doc/build/index.html
.
To edit the code base with Visual Studio Code, install the following extensions:
Necessary settings are already included in the .vscode
directory and should
be enabled per default.
Contributing to ssec
is highly appreciated, but comes with some requirements:
-
Type Hints
Write modern python code using type annotations to enable static analysis and potential runtime type checking.
-
Documentation
Write quality documentation using numpydoc docstring conventions.
-
Linting
-
Style
Format your code using ruff.
-
Testing
Write tests for your code using pytest.
Footnotes
-
06. May 2024 ↩