The simple solution for sharing async data streams in Python.
pip install streamx
from streamx import AsyncStream
stream = AsyncStream[int]()
You can place items in a stream using the put method. This method is a coroutine, so you'll need to await it. All iterators receive each item placed in the stream.
await stream.put(1)
await stream.put(2)
await stream.put(3)
To consume a stream, you can use an async for loop. Many tasks can listen to the stream at the same time, and each task will receive each item put in the stream while it is iterating.
async for item in stream:
print(item)
Once you're done placing data into a stream, you should close it to signal to iterators that there will be no more data. This signals to exit the async for loop.
await stream.close()
import asyncio
from streamx import AsyncStream
async def producer(stream: AsyncStream[int]):
async with stream:
for i in range(5):
await stream.put(i)
await asyncio.sleep(1)
async def consumer(stream: AsyncStream[int]):
async for item in stream:
print(item)
async def main():
stream = AsyncStream[int]()
await asyncio.gather(consumer(stream), consumer(stream), producer(stream))
asyncio.run(main())