-
Notifications
You must be signed in to change notification settings - Fork 183
async models #306
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
async models #306
Conversation
src/strands/agent/agent.py
Outdated
@@ -497,14 +500,16 @@ def _run_loop( | |||
self.messages.append(new_message) | |||
|
|||
# Execute the event loop cycle with retry logic for context limits | |||
yield from self._execute_event_loop_cycle(callback_handler, kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some reason you can't yield from in an async generator. Consequently, we have to convert to individual yields.
@@ -484,7 +521,7 @@ def test_event_loop_cycle_stop( | |||
tool_execution_handler=tool_execution_handler, | |||
request_state={"stop_event_loop": True}, | |||
) | |||
event = list(stream)[-1] | |||
event = [event async for event in stream][-1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a helper to do this - presumedly we'll need to collect events a lot? (maybe not though)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I defined this helper in conftest.py and named it alist
. I renamed my existing alist
helper to agenerator
.
events = self._run_loop(callback_handler, prompt, kwargs) | ||
for event in events: | ||
async for event in events: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
stop_reason, message, metrics, state = event["stop"] | ||
result = AgentResult(stop_reason, message, metrics, state) | ||
|
||
result = AgentResult(*event["stop"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💭 I wonder if the Async Iterator should be emitting something like this anyway (allowing us to unify the __call__
method even more)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See revision.
tests/strands/agent/test_agent.py
Outdated
@@ -161,6 +161,15 @@ def agent( | |||
return agent | |||
|
|||
|
|||
@pytest.fixture | |||
def alist(): | |||
async def gen(items): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where does items
come from here? is this python magic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixture is returning a function and so items is just the argument you pass to the call. For example:
def test_me(alist):
await alist([1, 2, 3])
[1, 2, 3]
is items
. Note though, this has been renamed to agenerator in my revision.
…nto async-components
except Exception as e: | ||
self._end_agent_trace_span(error=e) | ||
raise | ||
return asyncio.run(acall()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that there is overhead in running asyncio.run
. It creates an event loop on every invocation. However, this overhead should be negligible compared to the underlying LLM calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional call out, you cannot call asyncio.run
in an async context and so this could technically be a breaking change for some customers. As an example:
async def func_a():
return 1
def func_b():
asyncio.run(func_a())
async def func_c():
return func_b()
asyncio.run(func_c())
The above will result in the following exception:
RuntimeError: asyncio.run() cannot be called from a running event loop
As a fix, we just ask our async callers to use stream_async
instead of __call__
. Added a note about this for #253.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a fix, we just ask our async callers to use stream_async instead of call. Added a note about this for #253.
is a big lift as they do different things. I think instead we need a call_async
which does the same thing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional call out, you cannot call asyncio.run in an async context and so this could technically be a breaking change for some customers. As an example:
I think we should explore threading in this case. Otherwise it breaks Jupyter notebooks, for instance
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call out. I just tested the notebook use case and it will break without a thread. I'll add threading and expose a new invoke_call
method to replace acall
.
|
||
self._end_agent_trace_span(response=result) | ||
async def acall() -> AgentResult: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to avoid having to define a helper function to invoke stream_async
. Unfortunately, asyncio.run
cannot call async generators. There is no equivalent method for this either in asyncio. Consequently, I define acall
to return (instead of yield) the agent result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think ultimately we might want an async version of __call__
; should we just expose acall
as call_async
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To mirror bedrock naming, I'll create an invoke_async
method. So we will have stream_async
and invoke_async
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make sure this does not go out today if we're releasing.
I'd like to bug bash on this before it shipped a bit
except Exception as e: | ||
self._end_agent_trace_span(error=e) | ||
raise | ||
return asyncio.run(acall()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a fix, we just ask our async callers to use stream_async instead of call. Added a note about this for #253.
is a big lift as they do different things. I think instead we need a call_async
which does the same thing
|
||
self._end_agent_trace_span(response=result) | ||
async def acall() -> AgentResult: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think ultimately we might want an async version of __call__
; should we just expose acall
as call_async
?
except Exception as e: | ||
self._end_agent_trace_span(error=e) | ||
raise | ||
return asyncio.run(acall()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional call out, you cannot call asyncio.run in an async context and so this could technically be a breaking change for some customers. As an example:
I think we should explore threading in this case. Otherwise it breaks Jupyter notebooks, for instance
Description
We are currently working on support for an iterative async stream method on the agent class (#83). As part of this work, we need to convert any component that yields model events into async generator.
Related Issues
#83
Type of Change
Testing
How have you tested the change? Verify that the changes do not break functionality or introduce warnings in consuming repositories: agents-docs, agents-tools, agents-cli
hatch run prepare
Checklist
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.