A framework on top of asyncio for building LLM-based multi-agent systems in Python, with immutable, Pydantic-based messages and a focus on asynchronous token and message streaming between the agents.
TODO mention that the reason for immutable messages is to shift the framework to the functional programming paradigm
TODO mention the feature of "message sequence flattening" and refer to an example that you will provide later in the README
TODO mention that exceptions that happen in "callee" agents are propagated to the caller agents even though the callee agents were being processed in completely detached "asyncio" tasks; refer to an example that you will provide later in the README
TODO create a Discord server for the project
pip install -U miniagents
Here's a simple example of how to define an agent:
from miniagents import miniagent, InteractionContext, MiniAgents
@miniagent
async def my_agent(ctx: InteractionContext) -> None:
async for msg_promise in ctx.message_promises:
ctx.reply(f"You said: {await msg_promise}")
async def main() -> None:
async for msg_promise in my_agent.inquire(["Hello", "World"]):
print(await msg_promise)
if __name__ == "__main__":
MiniAgents().run(main())
This script will print the following lines to the console:
You said: Hello
You said: World
TODO show how (and when) to create class-based agents; point out that separate instances of such class-based agents are created upon each call to those agents - the purpose of turning agents into classes is not for the sake of maintaining agent state, but rather for the sake of breaking down agent functionality into multiple methods (refer to MarkdownHistoryAgent as an example or give an in-place example here in the README ? probably the latter)
MiniAgents provides built-in support for OpenAI and Anthropic language models with possibility to add other integrations.
pip install -U openai
and set your OpenAI
API key in the OPENAI_API_KEY
environment variable before running the example
below.
from miniagents import MiniAgents
from miniagents.ext.llms import OpenAIAgent
# NOTE: "Forking" an agent is a convenient way of creating a new agent instance
# with the specified configuration. Alternatively, you could pass the `model`
# parameter to `OpenAIAgent.inquire()` directly everytime you talk to the
# agent.
gpt_4o_agent = OpenAIAgent.fork(model="gpt-4o-2024-05-13")
async def main() -> None:
reply_sequence = gpt_4o_agent.inquire(
"Hello, how are you?",
system="You are a helpful assistant.",
max_tokens=50,
temperature=0.7,
)
async for msg_promise in reply_sequence:
async for token in msg_promise:
print(token, end="", flush=True)
# MINOR: Let's separate messages with a double newline (even though in
# this particular case we are actually going to receive only one
# message).
print("\n")
if __name__ == "__main__":
MiniAgents().run(main())
Even though OpenAI models return a single assistant response, the
OpenAIAgent.inquire()
method is still designed to return a sequence of
multiple message promises. This generalizes to arbitrary agents, making agents
in the MiniAgents framework easily interchangeable (agents in this framework
support sending and receiving zero or more messages).
You can read agent responses token-by-token as shown above regardless of whether the agent is streaming token by token or returning full messages. The complete message content will just be returned as a single "token" in the latter case.
The dialog_loop
agent is a pre-packaged agent that implements a dialog loop
between a user agent and an assistant agent. Here is how you can use it to set
up an interaction between a user and your agent (can be bare LLM agent, like
OpenAIAgent
or AnthropicAgent
, can also be a custom agent that you define
yourself - a more complex agent that uses LLM agents under the hood but also
introduces more complex behavior, i.e. Retrieval Augmented Generation etc.):
pip install -U anthropic
and set your
Anthropic API key in the ANTHROPIC_API_KEY
environment variable before running
the example below (or just replace AnthropicAgent
with OpenAIAgent
and
"claude-3-5-sonnet-20240620"
with "gpt-4o-2024-05-13"
if you already set up
the previous example).
from miniagents import MiniAgents
from miniagents.ext import (
dialog_loop,
console_user_agent,
MarkdownHistoryAgent,
)
from miniagents.ext.llms import SystemMessage, AnthropicAgent
async def main() -> None:
dialog_loop.kick_off(
SystemMessage(
"Your job is to improve the styling and grammar of the sentences "
"that the user throws at you. Leave the sentences unchanged if "
"they seem fine."
),
user_agent=console_user_agent.fork(
# Write chat history to a markdown file (`CHAT.md` in the current
# working directory by default, fork `MarkdownHistoryAgent` if
# you want to customize the filepath to write to).
history_agent=MarkdownHistoryAgent
),
assistant_agent=AnthropicAgent.fork(
model="claude-3-5-sonnet-20240620",
max_tokens=1000,
),
)
if __name__ == "__main__":
MiniAgents(
# Log LLM prompts and responses to `llm_logs/` folder in the current
# working directory. These logs will have a form of time-stamped
# markdown files - single file per single prompt-response pair.
llm_logger_agent=True
).run(main())
Here is what the interaction might look like if you run this script:
YOU ARE NOW IN A CHAT WITH AN AI ASSISTANT
Press Enter to send your message.
Press Ctrl+Space to insert a newline.
Press Ctrl+C (or type "exit") to quit the conversation.
USER: hi
ANTHROPIC_AGENT: Hello! The greeting "hi" is a casual and commonly used informal
salutation. It's grammatically correct and doesn't require any changes. If you'd
like to provide a more formal or elaborate greeting, you could consider
alternatives such as "Hello," "Good morning/afternoon/evening," or "Greetings."
USER: got it, thanks!
ANTHROPIC_AGENT: You're welcome! The phrase "Got it, thanks!" is a concise and
informal way to express understanding and appreciation. It's perfectly fine as
is for casual communication. If you wanted a slightly more formal version, you
could say:
"I understand. Thank you!"
TODO replace the output above with a gif showing the interaction in real time
Here is how you can implement a dialog loop between an agent and a user from
ground up yourself (for simplicity there is no history agent in this example -
check out in_memory_history_agent
and how it is used if you want to know how
to implement your own history agent too):
from miniagents import miniagent, InteractionContext, MiniAgents
from miniagents.ext import agent_loop, AWAIT
@miniagent
async def user_agent(ctx: InteractionContext) -> None:
async for msg_promise in ctx.message_promises:
print("ASSISTANT: ", end="", flush=True)
async for token in msg_promise:
print(token, end="", flush=True)
print()
ctx.reply(input("USER: "))
@miniagent
async def assistant_agent(ctx: InteractionContext) -> None:
# Turn a sequence of message promises into a single message promise (if
# there had been multiple messages in the sequence they would have had
# been separated by double newlines - this is how `as_single_promise()`
# works by default).
aggregated_message = await ctx.message_promises.as_single_promise()
ctx.reply(f'You said "{aggregated_message}"')
async def main() -> None:
agent_loop.kick_off(agents=[user_agent, AWAIT, assistant_agent])
if __name__ == "__main__":
MiniAgents().run(main())
Output:
USER: hi
ASSISTANT: You said "hi"
USER: nice!
ASSISTANT: You said "nice!"
USER: bye
ASSISTANT: You said "bye"
TODO explain why the presence of AWAIT
sentinel is important in the
example above
TODO or even better - show how to implement agent_loop from scratch
TODO also, use this as an opportunity to bring up mutable_state
dictionary
that can be passed to either the @miniagent
decorator or the fork()
method
(implement a toy chat history agent to demonstrate that)
-
console_input_agent
: Prompts the user for input via the console. -
console_output_agent
: Echoes messages to the console token by token. -
user_agent
: A user agent that echoes messages from the agent that called it, then reads the user input and returns the user input as its response. This agent is an aggregation of the previous two. -
agent_loop
: TODO explain -
agent_chain
: TODO explain
Feel free to explore the source code in the miniagents.ext
package to see how
various agents are implemented and get inspiration for building your own agents!
Let's consider an example that consists of two dummy agents and an aggregator agent that aggregates the responses from the two dummy agents (and also adds some messages of its own):
import asyncio
from miniagents.miniagents import (
MiniAgents,
miniagent,
InteractionContext,
Message,
)
@miniagent
async def agent1(ctx: InteractionContext) -> None:
print("Agent 1 started")
ctx.reply("*** MESSAGE #1 from Agent 1 ***")
print("Agent 1 still working")
ctx.reply("*** MESSAGE #2 from Agent 1 ***")
print("Agent 1 finished")
@miniagent
async def agent2(ctx: InteractionContext) -> None:
print("Agent 2 started")
ctx.reply("*** MESSAGE from Agent 2 ***")
print("Agent 2 finished")
@miniagent
async def aggregator_agent(ctx: InteractionContext) -> None:
print("Aggregator started")
ctx.reply(
[
"*** AGGREGATOR MESSAGE #1 ***",
agent1.inquire(),
agent2.inquire(),
]
)
print("Aggregator still working")
ctx.reply("*** AGGREGATOR MESSAGE #2 ***")
print("Aggregator finished")
async def main() -> None:
print("INQUIRING ON AGGREGATOR")
msg_promises = aggregator_agent.inquire()
print("INQUIRING DONE\n")
print("SLEEPING FOR ONE SECOND")
# This is when the agents will actually start processing (in fact, any
# other kind of task switch would have had the same effect).
await asyncio.sleep(1)
print("SLEEPING DONE\n")
print("PREPARING TO GET MESSAGES FROM AGGREGATOR")
async for msg_promise in msg_promises:
# MessagePromises always resolve into Message objects (or subclasses),
# even if the agent was replying with bare strings
message: Message = await msg_promise
print(message)
# You can safely `await` again. Concrete messages (and tokens, if there was
# token streaming) are cached inside the promises. Message sequences (as
# well as token sequences) are "replayable".
print("TOTAL NUMBER OF MESSAGES FROM AGGREGATOR:", len(await msg_promises))
if __name__ == "__main__":
MiniAgents().run(main())
This script will print the following lines to the console:
INQUIRING ON AGGREGATOR
INQUIRING DONE
SLEEPING FOR ONE SECOND
Aggregator started
Aggregator still working
Aggregator finished
Agent 1 started
Agent 1 still working
Agent 1 finished
Agent 2 started
Agent 2 finished
SLEEPING DONE
PREPARING TO GET MESSAGES FROM AGGREGATOR
*** AGGREGATOR MESSAGE #1 ***
*** MESSAGE #1 from Agent 1 ***
*** MESSAGE #2 from Agent 1 ***
*** MESSAGE from Agent 2 ***
*** AGGREGATOR MESSAGE #2 ***
TOTAL NUMBER OF MESSAGES FROM AGGREGATOR: 5
None of the agent functions start executing upon any of the calls to the
inquire()
method. Instead, in all cases the inquire()
method immediately
returns with promises to "talk to the agent(s)" (promises of sequences
of promises of response messages, to be super precise - see
MessageSequencePromise
and MessagePromise
classes for details).
As long as the global start_asap
setting is set to True
(which is the
default - see the source code of Promising
, the parent class of MiniAgents
context manager for details), the actual agent functions will start processing
at the earliest task switch (the behaviour of asyncio.create_task()
, which is
used under the hood). In this example it is going to be await asyncio.sleep(1)
inside the main()
function, but if this sleep()
wasn't there, it would have
happened upon the first iteration of the async for
loop which is the next
place where a task switch happens.
πͺ EXERCISE FOR READER: Add another await asyncio.sleep(1)
right before
print("Aggregator finished")
in the aggregator_agent
function and then try
to predict how the output will change. After that, run the modified script and
check if your prediction was correct.
start_asap
to False
for
individual agent calls if for some reason you need to:
some_agent.inquire(request_messages_if_any, start_asap=False)
. However,
setting it to False
for the whole system globally is not recommended because
it can lead to deadlocks.
Here's a simple example demonstrating how to use
agent_call = some_agent.initiate_inquiry()
and then do
agent_call.send_message()
two times before calling
agent_call.reply_sequence()
(instead of all-in-one some_agent.inquire()
):
from miniagents import miniagent, InteractionContext, MiniAgents
@miniagent
async def output_agent(ctx: InteractionContext) -> None:
async for msg_promise in ctx.message_promises:
ctx.reply(f"Echo: {await msg_promise}")
async def main() -> None:
agent_call = output_agent.initiate_inquiry()
agent_call.send_message("Hello")
agent_call.send_message("World")
reply_sequence = agent_call.reply_sequence()
async for msg_promise in reply_sequence:
print(await msg_promise)
if __name__ == "__main__":
MiniAgents().run(main())
This will output:
Echo: Hello
Echo: World
There are three ways to use the MiniAgents()
context:
-
Calling its
run()
method with your main function as a parameter (themain()
function in this example should be defined asasync
):MiniAgents().run(main())
-
Using it as an async context manager:
async with MiniAgents(): ... # your async code that works with agents goes here
-
Directly calling its
activate()
(and, potentially,afinalize()
at the end) methods:mini_agents = MiniAgents() mini_agents.activate() try: ... # your async code that works with agents goes here finally: await mini_agents.afinalize()
The third way might be ideal for web applications and other cases when there is
no single function that you can encapsulate with the MiniAgents()
context
manager (or it is unclear what such function would be). You just do
mini_agents.activate()
somewhere upon the init of the server and forget
about it.
from miniagents.ext.llms import UserMessage, SystemMessage, AssistantMessage
user_message = UserMessage("Hello!")
system_message = SystemMessage("System message")
assistant_message = AssistantMessage("Assistant message")
The difference between these message types is in the default values of
the role
field of the message:
-
UserMessage
hasrole="user"
by default -
SystemMessage
hasrole="system"
by default -
AssistantMessage
hasrole="assistant"
by default
You can create custom message types by subclassing Message
.
from miniagents.messages import Message
class CustomMessage(Message):
custom_field: str
message = CustomMessage("Hello", custom_field="Custom Value")
print(message.content) # Output: Hello
print(message.custom_field) # Output: Custom Value
For more advanced usage, check out the examples directory.
There are three main features of MiniAgents the idea of which motivated the creation of this framework:
- It is built around supporting asynchronous token streaming across chains of interconnected agents, making this the core feature of the framework.
- It is very easy to throw bare strings, messages, message promises,
collections, and sequences of messages and message promises (as well as the
promises of the sequences themselves) all together into an agent reply (see
MessageType
). This entire hierarchical structure will be asynchronously resolved in the background into a flat and uniform sequence of message promises (it will be automatically "flattened" in the background). - By default, agents work in so called
start_asap
mode, which is different from the usual way coroutines work where you need to actively await on them and/or iterate over them (in case of asynchronous generators). Instart_asap
mode, every agent, after it was invoked, actively seeks every opportunity to proceed its processing in the background when async tasks switch.
The third feature combines this start_asap
approach with regular async/await
and async generators by using so called streamed promises (see StreamedPromise
and Promise
classes) which were designed to be "replayable" by nature.
It was chosen for messages to be immutable once they are created (see Message
and Frozen
classes) in order to make all of the above possible (because this
way there are no concerns about the state of the message being changed in the
background).
MiniAgents provides a way to persist messages as they are resolved from promises
using the @MiniAgents().on_persist_message
decorator. This allows you to
implement custom logic for storing or logging messages.
Additionally, messages (as well as any other Pydantic models derived from
Frozen
) have a hash_key
property. This property calculates the sha256 hash
of the content of the message and is used as the id of the Messages
(or any
other Frozen
model), much like there are commit hashes in git.
Here's a simple example of how to use the on_persist_message
decorator:
from miniagents import MiniAgents, Message
mini_agents = MiniAgents()
@mini_agents.on_persist_message
async def persist_message(_, message: Message) -> None:
print(f"Persisting message with hash key: {message.hash_key}")
# Here you could save the message to a database or log it to a file
TODO the structure of this section is outdated - update it
Here's an overview of the module structure and hierarchy in the MiniAgents framework:
-
miniagents
: The core package containing the main classes and functions-
miniagents.py
: Defines theMiniAgents
context manager,MiniAgent
class, andminiagent
decorator -
messages.py
: Defines theMessage
class and related message types -
miniagent_typing.py
: Defines type aliases and protocols used in the framework -
utils.py
: Utility functions used throughout the framework -
promising
: Subpackage for the "promising" functionality (promises, streaming, etc.)-
promising.py
: Defines thePromise
andStreamedPromise
classes -
promise_typing.py
: Defines type aliases and protocols for promises -
sequence.py
: Defines theFlatSequence
class for flattening sequences -
sentinels.py
: Defines sentinel objects used in the framework -
errors.py
: Defines custom exception classes -
ext
: Subpackage for extensions to the promising functionality-
frozen.py
: Defines theFrozen
class for immutable Pydantic models
-
-
-
-
miniagents.ext
: Subpackage for pre-packaged agents and extensions-
agent_aggregators.py
: Agents for aggregating other agents (chains, loops, etc.) -
history_agents.py
: Agents for managing conversation history -
misc_agents.py
: Miscellaneous utility agents -
llm
: Subpackage for LLM integrations-
llm_common.py
: Common classes and functions for LLM agents -
openai.py
: OpenAI LLM agent -
anthropic.py
: Anthropic LLM agent
-
-
Here are some of the core concepts in the MiniAgents framework:
-
MiniAgent: A wrapper around an async function that defines an agent's
behavior. Created using the
@miniagent
decorator. - InteractionContext: Passed to each agent function, provides access to incoming messages and allows sending replies.
- Message: Represents a message exchanged between agents. Can contain content, metadata, and nested messages. Immutable once created.
- MessagePromise: A promise of a message that can be streamed token by token.
- MessageSequencePromise: A promise of a sequence of message promises.
- Promise: Represents a value that may not be available yet, but will be resolved in the future.
- StreamedPromise: A promise that can be resolved piece by piece, allowing for streaming.
- Frozen: An immutable Pydantic model with a git-style hash key calculated from its JSON representation.
MiniAgents is released under the MIT License.
-
Different Promise and StreamedPromise resolvers, piece-by-piece streamers,
appenders, and other Promising components should always catch BaseExceptions
and not just Exceptions. This is because many of these components involve
communications between async tasks via
asyncio.Queue
objects. Interrupting these promises withKeyboardInterrupt
(which extends fromBaseException
) instead of letting it go through the queue can lead to hanging promises (a queue waiting forEND_OF_QUEUE
sentinel forever while the task that should send it is dead).
Happy coding with MiniAgents! π