Skip to content

async models #306

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open

async models #306

wants to merge 19 commits into from

Conversation

pgrayy
Copy link
Member

@pgrayy pgrayy commented Jun 27, 2025

Description

We are currently working on support for an iterative async stream method on the agent class (#83). As part of this work, we need to convert any component that yields model events into async generator.

Related Issues

#83

Type of Change

  • Bug fix
  • New feature
  • Breaking change
  • Documentation update
  • Other (please describe):

Testing

How have you tested the change? Verify that the changes do not break functionality or introduce warnings in consuming repositories: agents-docs, agents-tools, agents-cli

  • I ran hatch run prepare

Checklist

  • I have read the CONTRIBUTING document
  • I have added any necessary tests that prove my fix is effective or my feature works
  • I have updated the documentation accordingly
  • I have added an appropriate example to the documentation to outline the feature, or no new docs are needed
  • My changes generate no new warnings
  • Any dependent changes have been merged and published

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@@ -497,14 +500,16 @@ def _run_loop(
self.messages.append(new_message)

# Execute the event loop cycle with retry logic for context limits
yield from self._execute_event_loop_cycle(callback_handler, kwargs)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason you can't yield from in an async generator. Consequently, we have to convert to individual yields.

@@ -484,7 +521,7 @@ def test_event_loop_cycle_stop(
tool_execution_handler=tool_execution_handler,
request_state={"stop_event_loop": True},
)
event = list(stream)[-1]
event = [event async for event in stream][-1]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a helper to do this - presumedly we'll need to collect events a lot? (maybe not though)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I defined this helper in conftest.py and named it alist. I renamed my existing alist helper to agenerator.

events = self._run_loop(callback_handler, prompt, kwargs)
for event in events:
async for event in events:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to call stream_async here or unify it so that they're both just plumbing to the same place?

And with #289 we'll now removing the callback_handler override, so I think this gets simpler overall. Maybe coordinate with @Unshure to see what else simplifies down

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

stop_reason, message, metrics, state = event["stop"]
result = AgentResult(stop_reason, message, metrics, state)

result = AgentResult(*event["stop"])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💭 I wonder if the Async Iterator should be emitting something like this anyway (allowing us to unify the __call__ method even more)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See revision.

@@ -161,6 +161,15 @@ def agent(
return agent


@pytest.fixture
def alist():
async def gen(items):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where does items come from here? is this python magic?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixture is returning a function and so items is just the argument you pass to the call. For example:

def test_me(alist):
   await alist([1, 2, 3])

[1, 2, 3] is items. Note though, this has been renamed to agenerator in my revision.

@ryanycoleman ryanycoleman moved this to We're Working On It in Strands Agents Roadmap Jun 27, 2025
@pgrayy pgrayy marked this pull request as draft June 30, 2025 13:35
except Exception as e:
self._end_agent_trace_span(error=e)
raise
return asyncio.run(acall())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that there is overhead in running asyncio.run. It creates an event loop on every invocation. However, this overhead should be negligible compared to the underlying LLM calls.

Copy link
Member Author

@pgrayy pgrayy Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional call out, you cannot call asyncio.run in an async context and so this could technically be a breaking change for some customers. As an example:

async def func_a():
    return 1

def func_b():
    asyncio.run(func_a())

async def func_c():
    return func_b()
    
asyncio.run(func_c())

The above will result in the following exception:

RuntimeError: asyncio.run() cannot be called from a running event loop

As a fix, we just ask our async callers to use stream_async instead of __call__. Added a note about this for #253.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a fix, we just ask our async callers to use stream_async instead of call. Added a note about this for #253.

is a big lift as they do different things. I think instead we need a call_async which does the same thing

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional call out, you cannot call asyncio.run in an async context and so this could technically be a breaking change for some customers. As an example:

I think we should explore threading in this case. Otherwise it breaks Jupyter notebooks, for instance

Copy link
Member Author

@pgrayy pgrayy Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call out. I just tested the notebook use case and it will break without a thread. I'll add threading and expose a new invoke_call method to replace acall.


self._end_agent_trace_span(response=result)
async def acall() -> AgentResult:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to avoid having to define a helper function to invoke stream_async. Unfortunately, asyncio.run cannot call async generators. There is no equivalent method for this either in asyncio. Consequently, I define acall to return (instead of yield) the agent result.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ultimately we might want an async version of __call__; should we just expose acall as call_async?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To mirror bedrock naming, I'll create an invoke_async method. So we will have stream_async and invoke_async.

@pgrayy pgrayy temporarily deployed to auto-approve July 1, 2025 19:23 — with GitHub Actions Inactive
Copy link
Member

@zastrowm zastrowm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure this does not go out today if we're releasing.

I'd like to bug bash on this before it shipped a bit

except Exception as e:
self._end_agent_trace_span(error=e)
raise
return asyncio.run(acall())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a fix, we just ask our async callers to use stream_async instead of call. Added a note about this for #253.

is a big lift as they do different things. I think instead we need a call_async which does the same thing


self._end_agent_trace_span(response=result)
async def acall() -> AgentResult:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ultimately we might want an async version of __call__; should we just expose acall as call_async?

except Exception as e:
self._end_agent_trace_span(error=e)
raise
return asyncio.run(acall())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional call out, you cannot call asyncio.run in an async context and so this could technically be a breaking change for some customers. As an example:

I think we should explore threading in this case. Otherwise it breaks Jupyter notebooks, for instance

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: We're Working On It
Development

Successfully merging this pull request may close these issues.

2 participants