Skip to content

[FEATURE] Implement native async iterator support  #83

@awsarron

Description

@awsarron

Problem Statement

The Agent class provides the stream_async function that returns an AsyncIterator:

async def stream_async(self, prompt: str, **kwargs: Any) -> AsyncIterator[Any]:

The current stream_async implementation calls _run_loop and uses threads with a queue to yield streamed data events from the Strands Agents event loop:

def _run_loop(

thread = Thread(target=target_callback, daemon=True)
thread.start()
try:
while True:
item = await queue.get()
if item == _stop_event:
break
if isinstance(item, BaseException):
raise item
yield item
finally:
thread.join()

The problem is that this threading implementation doesn't work with Python features like thread local contextvars. Certain Python libraries (e.g. https://github.com/UKGovernmentBEIS/inspect_ai) use thread local context vars and require them to function correctly. The only reasonable solution for correctly supporting concurrency within a single thread is async.

This issue is to propose, discuss, and refine a true async implementation of the Agent.stream_async function and Strands Agent event loop.

Proposed Solution

No response

Use Case

  • Thread local contextvars
  • A more maintainable implementation of async agent requests
  • Allows for future enhancements that enable streamed tool results

Alternatives Solutions

No response

Additional Context

No response

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions