Skip to content

async models #306

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open

async models #306

wants to merge 19 commits into from

Conversation

pgrayy
Copy link
Member

@pgrayy pgrayy commented Jun 27, 2025

Description

We are currently working on support for an iterative async stream method on the agent class (#83). As part of this work, we need to convert any component that yields model events into async generator.

Related Issues

#83

Type of Change

  • Bug fix
  • New feature
  • Breaking change
  • Documentation update
  • Other (please describe):

Testing

How have you tested the change? Verify that the changes do not break functionality or introduce warnings in consuming repositories: agents-docs, agents-tools, agents-cli

  • I ran hatch run prepare

Checklist

  • I have read the CONTRIBUTING document
  • I have added any necessary tests that prove my fix is effective or my feature works
  • I have updated the documentation accordingly
  • I have added an appropriate example to the documentation to outline the feature, or no new docs are needed
  • My changes generate no new warnings
  • Any dependent changes have been merged and published

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@@ -497,14 +500,16 @@ def _run_loop(
self.messages.append(new_message)

# Execute the event loop cycle with retry logic for context limits
yield from self._execute_event_loop_cycle(callback_handler, kwargs)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason you can't yield from in an async generator. Consequently, we have to convert to individual yields.

@@ -484,7 +521,7 @@ def test_event_loop_cycle_stop(
tool_execution_handler=tool_execution_handler,
request_state={"stop_event_loop": True},
)
event = list(stream)[-1]
event = [event async for event in stream][-1]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a helper to do this - presumedly we'll need to collect events a lot? (maybe not though)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I defined this helper in conftest.py and named it alist. I renamed my existing alist helper to agenerator.

events = self._run_loop(callback_handler, prompt, kwargs)
for event in events:
async for event in events:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to call stream_async here or unify it so that they're both just plumbing to the same place?

And with #289 we'll now removing the callback_handler override, so I think this gets simpler overall. Maybe coordinate with @Unshure to see what else simplifies down

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

stop_reason, message, metrics, state = event["stop"]
result = AgentResult(stop_reason, message, metrics, state)

result = AgentResult(*event["stop"])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💭 I wonder if the Async Iterator should be emitting something like this anyway (allowing us to unify the __call__ method even more)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See revision.

@@ -161,6 +161,15 @@ def agent(
return agent


@pytest.fixture
def alist():
async def gen(items):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where does items come from here? is this python magic?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixture is returning a function and so items is just the argument you pass to the call. For example:

def test_me(alist):
   await alist([1, 2, 3])

[1, 2, 3] is items. Note though, this has been renamed to agenerator in my revision.

@@ -344,7 +344,7 @@ def format_chunk(self, event: dict[str, Any]) -> StreamEvent:
raise RuntimeError(f"event_type=<{event['type']} | unknown type")

@override
def stream(self, request: dict[str, Any]) -> Iterable[dict[str, Any]]:
async def stream(self, request: dict[str, Any]) -> AsyncGenerator[dict[str, Any], None]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth making this a union and allowing a generator or an async generator? That way we don't have to fake it for providers that aren't fully async - the SDK would handle that instead?

It would also simplify testing/mocking I think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an added benefit, it would more easily let us identify what gaps we have with providers (those that aren't async would be more obvious)

Downside is that it's more work in the SDK, but IMHO I think it might be a valid tradeoff. The SDK accepts both async/non-async and transforms it appropriately. But it emits only async (and wrappers for non-async use cases)

@ryanycoleman ryanycoleman moved this to We're Working On It in Strands Agents Roadmap Jun 27, 2025
@pgrayy pgrayy marked this pull request as draft June 30, 2025 13:35
except Exception as e:
self._end_agent_trace_span(error=e)
raise
return asyncio.run(acall())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that there is overhead in running asyncio.run. It creates an event loop on every invocation. However, this overhead should be negligible compared to the underlying LLM calls.

Copy link
Member Author

@pgrayy pgrayy Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional call out, you cannot call asyncio.run in an async context and so this could technically be a breaking change for some customers. As an example:

async def func_a():
    return 1

def func_b():
    asyncio.run(func_a())
    
asyncio.run(func_b())

The above will result in the following exception:

RuntimeError: asyncio.run() cannot be called from a running event loop

As a fix, we just ask our async callers to use stream_async instead of __call__. Added a note about this for #253.


self._end_agent_trace_span(response=result)
async def acall() -> AgentResult:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to avoid having to define a helper function to invoke stream_async. Unfortunately, asyncio.run cannot call async generators. There is no equivalent method for this either in asyncio. Consequently, I define acall to return (instead of yield) the agent result.

@pgrayy pgrayy temporarily deployed to auto-approve July 1, 2025 19:23 — with GitHub Actions Inactive
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: We're Working On It
Development

Successfully merging this pull request may close these issues.

2 participants