-
Notifications
You must be signed in to change notification settings - Fork 183
async models #306
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
async models #306
Conversation
src/strands/agent/agent.py
Outdated
@@ -497,14 +500,16 @@ def _run_loop( | |||
self.messages.append(new_message) | |||
|
|||
# Execute the event loop cycle with retry logic for context limits | |||
yield from self._execute_event_loop_cycle(callback_handler, kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some reason you can't yield from in an async generator. Consequently, we have to convert to individual yields.
@@ -484,7 +521,7 @@ def test_event_loop_cycle_stop( | |||
tool_execution_handler=tool_execution_handler, | |||
request_state={"stop_event_loop": True}, | |||
) | |||
event = list(stream)[-1] | |||
event = [event async for event in stream][-1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a helper to do this - presumedly we'll need to collect events a lot? (maybe not though)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I defined this helper in conftest.py and named it alist
. I renamed my existing alist
helper to agenerator
.
events = self._run_loop(callback_handler, prompt, kwargs) | ||
for event in events: | ||
async for event in events: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
stop_reason, message, metrics, state = event["stop"] | ||
result = AgentResult(stop_reason, message, metrics, state) | ||
|
||
result = AgentResult(*event["stop"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💭 I wonder if the Async Iterator should be emitting something like this anyway (allowing us to unify the __call__
method even more)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See revision.
tests/strands/agent/test_agent.py
Outdated
@@ -161,6 +161,15 @@ def agent( | |||
return agent | |||
|
|||
|
|||
@pytest.fixture | |||
def alist(): | |||
async def gen(items): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where does items
come from here? is this python magic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixture is returning a function and so items is just the argument you pass to the call. For example:
def test_me(alist):
await alist([1, 2, 3])
[1, 2, 3]
is items
. Note though, this has been renamed to agenerator in my revision.
@@ -344,7 +344,7 @@ def format_chunk(self, event: dict[str, Any]) -> StreamEvent: | |||
raise RuntimeError(f"event_type=<{event['type']} | unknown type") | |||
|
|||
@override | |||
def stream(self, request: dict[str, Any]) -> Iterable[dict[str, Any]]: | |||
async def stream(self, request: dict[str, Any]) -> AsyncGenerator[dict[str, Any], None]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it worth making this a union and allowing a generator or an async generator? That way we don't have to fake it for providers that aren't fully async - the SDK would handle that instead?
It would also simplify testing/mocking I think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As an added benefit, it would more easily let us identify what gaps we have with providers (those that aren't async would be more obvious)
Downside is that it's more work in the SDK, but IMHO I think it might be a valid tradeoff. The SDK accepts both async/non-async and transforms it appropriately. But it emits only async (and wrappers for non-async use cases)
…nto async-components
except Exception as e: | ||
self._end_agent_trace_span(error=e) | ||
raise | ||
return asyncio.run(acall()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that there is overhead in running asyncio.run
. It creates an event loop on every invocation. However, this overhead should be negligible compared to the underlying LLM calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional call out, you cannot call asyncio.run
in an async context and so this could technically be a breaking change for some customers. As an example:
async def func_a():
return 1
def func_b():
asyncio.run(func_a())
asyncio.run(func_b())
The above will result in the following exception:
RuntimeError: asyncio.run() cannot be called from a running event loop
As a fix, we just ask our async callers to use stream_async
instead of __call__
. Added a note about this for #253.
|
||
self._end_agent_trace_span(response=result) | ||
async def acall() -> AgentResult: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to avoid having to define a helper function to invoke stream_async
. Unfortunately, asyncio.run
cannot call async generators. There is no equivalent method for this either in asyncio. Consequently, I define acall
to return (instead of yield) the agent result.
Description
We are currently working on support for an iterative async stream method on the agent class (#83). As part of this work, we need to convert any component that yields model events into async generator.
Related Issues
#83
Type of Change
Testing
How have you tested the change? Verify that the changes do not break functionality or introduce warnings in consuming repositories: agents-docs, agents-tools, agents-cli
hatch run prepare
Checklist
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.