Streaming Response Usage๏ƒ

This guide covers how to use agent-level streaming to receive tokens, tool calls, and results as theyโ€™re generated, providing better user experience for long-running operations.

Table of Contents๏ƒ

  1. Overview

  2. Basic Streaming

  3. Streaming Task Execution

  4. Streaming Message Processing

  5. Event Types

  6. Error Handling

  7. Best Practices

Overview๏ƒ

Agent-level streaming provides:

  • Real-Time Feedback: See tokens and tool calls as they happen

  • Better UX: Users see progress instead of waiting for complete response

  • Tool Call Visibility: See which tools are being called in real-time

  • Result Streaming: Receive tool results as theyโ€™re generated

  • Status Updates: Get status updates throughout execution

When to Use Streaming๏ƒ

  • โœ… Long-running tasks

  • โœ… Interactive applications

  • โœ… Real-time user feedback needed

  • โœ… Tool call visibility important

  • โœ… Progressive result display

When NOT to Use Streaming๏ƒ

  • โŒ Simple, fast operations

  • โŒ Batch processing

  • โŒ Complete result needed before processing

  • โŒ Non-interactive applications

Basic Streaming๏ƒ

Pattern 1: Stream Task Execution๏ƒ

Stream task execution with tokens and tool calls.

from aiecs.domain.agent import HybridAgent, AgentConfiguration
from aiecs.llm import OpenAIClient

agent = HybridAgent(
    agent_id="agent-1",
    name="My Agent",
    llm_client=OpenAIClient(),
    tools=["search", "calculator"],
    config=AgentConfiguration()
)

await agent.initialize()

# Stream task execution
async for event in agent.execute_task_streaming(
    {"description": "Research Python and calculate 2+2"},
    {}
):
    if event['type'] == 'token':
        # Stream tokens as they're generated
        print(event['content'], end='', flush=True)
    elif event['type'] == 'tool_call':
        # Tool call started
        print(f"\nCalling {event['tool_name']}...")
    elif event['type'] == 'tool_result':
        # Tool result received
        print(f"\nResult: {event['result']}")
    elif event['type'] == 'started':
        # Lifecycle: task execution started
        print("\n[Started]")
    elif event['type'] == 'completed':
        # Lifecycle: task execution completed (HybridAgent only)
        print(f"\n[Completed in {event['execution_time']:.2f}s]")

Pattern 2: Stream Message Processing๏ƒ

Stream message processing for conversational agents.

# Stream message processing
async for token in agent.process_message_streaming("Hello, how are you?"):
    print(token, end='', flush=True)

Pattern 3: Collect Streamed Results๏ƒ

Collect streamed results for processing.

tokens = []
tool_calls = []
results = []

async for event in agent.execute_task_streaming(task, context):
    if event['type'] == 'token':
        tokens.append(event['content'])
    elif event['type'] == 'tool_call':
        tool_calls.append(event)
    elif event['type'] == 'tool_result':
        results.append(event)

# Process collected results
full_response = ''.join(tokens)
print(f"Full response: {full_response}")
print(f"Tool calls: {len(tool_calls)}")
print(f"Results: {len(results)}")

Streaming Task Execution๏ƒ

Pattern 1: Basic Task Streaming๏ƒ

Stream basic task execution.

async for event in agent.execute_task_streaming(
    {"description": "Answer a question"},
    {}
):
    if event['type'] == 'token':
        print(event['content'], end='', flush=True)
    elif event['type'] == 'result':
        print(f"\nFinal result: {event['output']}")

Pattern 2: ReAct Loop Streaming๏ƒ

Stream ReAct loop execution (HybridAgent). HybridAgent emits dedicated thought / action / observation events for each ReAct phase, plus iteration_start events that mark the boundary between iterations.

async for event in agent.execute_task_streaming(
    {"description": "Research and analyze"},
    {}
):
    if event['type'] == 'token':
        # Stream reasoning tokens
        print(event['content'], end='', flush=True)
    elif event['type'] == 'iteration_start':
        # New ReAct iteration begins
        print(f"\n[Iteration {event['iteration']}]")
    elif event['type'] == 'thought':
        # Reasoning text accumulated for this iteration
        print(f"\n[Thought] {event['content']}")
    elif event['type'] == 'action':
        # Tool action selected by the model
        print(f"\n[Action] {event['tool_name']}")
    elif event['type'] == 'tool_result':
        # Raw tool result
        print(f"\n[Result] {event['result']}")
    elif event['type'] == 'observation':
        # Observation derived from the tool result
        print(f"\n[Observation] {event['content']}")

Pattern 3: Lifecycle Tracking๏ƒ

Track execution lifecycle through streaming. ToolAgent and HybridAgent emit started at the beginning and (in HybridAgent) completed at the end; HybridAgent additionally emits iteration_start per ReAct loop.

async for event in agent.execute_task_streaming(task, context):
    etype = event['type']

    if etype == 'started':
        print("Agent started")
    elif etype == 'iteration_start':
        # HybridAgent only โ€” ReAct iteration boundary
        print(f"Iteration {event['iteration']} starting")
    elif etype == 'completed':
        # HybridAgent only โ€” symmetric to 'started'
        print(f"Agent completed (success={event['success']})")
    elif etype == 'result':
        # Final result payload (all agents)
        print(f"Final output: {event['output']}")

Streaming Message Processing๏ƒ

Pattern 1: Conversational Streaming๏ƒ

Stream conversational responses.

async for token in agent.process_message_streaming("Tell me about Python"):
    print(token, end='', flush=True)

Pattern 2: Multi-Turn Streaming๏ƒ

Stream multi-turn conversations.

# First message
async for token in agent.process_message_streaming("Hello"):
    print(token, end='', flush=True)

# Second message
async for token in agent.process_message_streaming("What can you do?"):
    print(token, end='', flush=True)

Pattern 3: Streaming with Context๏ƒ

Stream with session context.

async for token in agent.process_message_streaming(
    "Continue our conversation",
    sender_id="user-123"
):
    print(token, end='', flush=True)

Event Types๏ƒ

Token Events๏ƒ

Token events contain generated text tokens.

async for event in agent.execute_task_streaming(task, context):
    if event['type'] == 'token':
        content = event['content']  # Token text
        timestamp = event.get('timestamp')  # Optional timestamp
        print(content, end='', flush=True)

Tool Call Events๏ƒ

Tool call events indicate when tools are being called.

async for event in agent.execute_task_streaming(task, context):
    if event['type'] == 'tool_call':
        tool_name = event['tool_name']
        parameters = event.get('parameters', {})
        timestamp = event.get('timestamp')
        
        print(f"Calling {tool_name} with {parameters}")

Tool Result Events๏ƒ

Tool result events contain tool execution results.

async for event in agent.execute_task_streaming(task, context):
    if event['type'] == 'tool_result':
        tool_name = event['tool_name']
        result = event['result']
        success = event.get('success', True)
        timestamp = event.get('timestamp')
        
        if success:
            print(f"{tool_name} returned: {result}")
        else:
            print(f"{tool_name} failed: {result}")

Status Events๏ƒ

Status events indicate execution status. Emitted by BaseAgent (default implementation) and LLMAgent; status is always "started" in the current implementation.

async for event in agent.execute_task_streaming(task, context):
    if event['type'] == 'status':
        status = event['status']  # currently always "started"
        timestamp = event.get('timestamp')

        print(f"Status: {status}")

Lifecycle Events๏ƒ

ToolAgent and HybridAgent emit dedicated lifecycle events instead of status. started is emitted at the beginning of streaming execution; completed is emitted by HybridAgent only, as the symmetric counterpart to started.

async for event in agent.execute_task_streaming(task, context):
    if event['type'] == 'started':
        timestamp = event.get('timestamp')
        print("Execution started")
    elif event['type'] == 'completed':
        success = event['success']
        execution_time = event['execution_time']
        print(f"Execution completed (success={success}, {execution_time:.2f}s)")

ReAct Events (HybridAgent)๏ƒ

HybridAgent exposes the ReAct loop as discrete events. iteration_start marks the boundary between iterations; thought, action, and observation correspond to the three phases of each iteration.

async for event in agent.execute_task_streaming(task, context):
    etype = event['type']
    if etype == 'iteration_start':
        print(f"Iteration {event['iteration']} starting")
    elif etype == 'thought':
        print(f"Thought: {event['content']}")
    elif etype == 'action':
        print(f"Action: {event['tool_name']}({event.get('parameters', {})})")
    elif etype == 'observation':
        print(f"Observation: {event['content']}")

Tool Streaming Events (ToolAgent / HybridAgent)๏ƒ

In addition to tool_call and tool_result, the Function Calling streaming path emits two finer-grained events:

  • tool_call_delta: incremental tool-call fragment received from the provider stream (arguments may still be partial).

  • tool_calls_ready: complete batch of tool calls assembled from the stream, ready to be dispatched.

ToolAgent also emits tool_error when a tool invocation fails.

Result Events๏ƒ

Result events contain final task results.

async for event in agent.execute_task_streaming(task, context):
    if event['type'] == 'result':
        output = event['output']
        execution_time = event.get('execution_time')
        success = event.get('success', True)
        
        print(f"Final result: {output}")
        print(f"Execution time: {execution_time}s")

DAWP Boundary Events (HybridAgent + DawpPlugin)๏ƒ

When DawpPlugin is enabled with stream_boundary_events: true, the stream includes optional run-level boundary events for SDK/UI panel management. Step content still uses homomorphic token / tool_result events with loop_scope.kind=dawp (see design ยง8.1).

Event type

When

Key fields

dawp_run_started

Before first DAWP step

run_id, workflow_id, placement, trigger, loop_scope

dawp_run_completed

After DAWP run ends

run_id, success, step_summaries, loop_scope

Pattern: SDK consumer (recommended)

from aiecs.domain.agent.plugins.dawp import DawpStreamConsumer, effective_loop_scope

consumer = DawpStreamConsumer(
    on_run_started=lambda panel: ui.open_dawp_panel(panel.run_id),
    on_run_completed=lambda panel: ui.close_dawp_panel(panel.run_id),
)

async for event in agent.execute_task_streaming(task, context):
    consumer.consume(event)  # handles dawp_run_started / dawp_run_completed
    scope = effective_loop_scope(event)
    render_stream_event(event, scope=scope)  # same renderer; style by scope.kind

Pattern: Manual subscription

async for event in agent.execute_task_streaming(task, context):
    scope = event.get("loop_scope") or {"kind": "main"}
    if event["type"] == "dawp_run_started":
        ui.open_dawp_panel(event["run_id"])
    elif event["type"] == "dawp_run_completed":
        ui.close_dawp_panel(event["run_id"], success=event["success"])
    elif event["type"] == "token":
        print(event["content"], end="", flush=True)

Enable boundary events in plugin config:

PluginConfig(
    name="dawp",
    enabled=True,
    options={
        "document_path": "workflows/my-flow.dawp.md",
        "stream_boundary_events": True,
    },
)

Error Events๏ƒ

Error events indicate errors during execution.

async for event in agent.execute_task_streaming(task, context):
    if event['type'] == 'error':
        error = event['error']
        timestamp = event.get('timestamp')
        
        print(f"Error: {error}")

Error Handling๏ƒ

Pattern 1: Handle Streaming Errors๏ƒ

Handle errors during streaming.

try:
    async for event in agent.execute_task_streaming(task, context):
        if event['type'] == 'error':
            print(f"Error: {event['error']}")
            break
        # Process other events
except Exception as e:
    print(f"Streaming failed: {e}")

Pattern 2: Continue on Errors๏ƒ

Continue processing despite errors.

async for event in agent.execute_task_streaming(task, context):
    if event['type'] == 'error':
        logger.error(f"Error: {event['error']}")
        # Continue processing other events
        continue
    # Process event

Pattern 3: Retry on Errors๏ƒ

Retry streaming on errors.

max_retries = 3
retry_count = 0

while retry_count < max_retries:
    try:
        async for event in agent.execute_task_streaming(task, context):
            # Process events
            pass
        break  # Success
    except Exception as e:
        retry_count += 1
        if retry_count >= max_retries:
            raise
        await asyncio.sleep(1)

Best Practices๏ƒ

1. Use Streaming for Long Operations๏ƒ

Use streaming for operations that take time:

# Good: Long-running task
async for event in agent.execute_task_streaming(
    {"description": "Research and analyze complex topic"},
    {}
):
    # Stream progress
    pass

# Less useful: Fast operation
result = await agent.execute_task(
    {"description": "Simple calculation"},
    {}
)

2. Provide User Feedback๏ƒ

Provide feedback to users during streaming. Use the lifecycle and ReAct events emitted by ToolAgent / HybridAgent (or status="started" for LLMAgent) to drive UI affordances:

async for event in agent.execute_task_streaming(task, context):
    etype = event['type']
    if etype in ('started', 'status'):
        show_spinner("Working...")
    elif etype == 'thought':
        show_spinner("Thinking...")
    elif etype in ('action', 'tool_call'):
        show_spinner(f"Executing {event.get('tool_name', 'tool')}...")
    elif etype in ('completed', 'result', 'error'):
        hide_spinner()

3. Handle Partial Results๏ƒ

Handle partial results gracefully:

tokens = []
async for event in agent.execute_task_streaming(task, context):
    if event['type'] == 'token':
        tokens.append(event['content'])
        # Display partial result
        display_partial(''.join(tokens))
    elif event['type'] == 'result':
        # Final result
        display_final(event['output'])

4. Monitor Streaming Performance๏ƒ

Monitor streaming performance:

import time

start = time.time()
token_count = 0

async for event in agent.execute_task_streaming(task, context):
    if event['type'] == 'token':
        token_count += 1
    elif event['type'] == 'result':
        duration = time.time() - start
        tokens_per_second = token_count / duration
        print(f"Streaming rate: {tokens_per_second:.1f} tokens/s")

5. Buffer for Smooth Display๏ƒ

Buffer tokens for smooth display:

buffer = []
buffer_size = 10

async for event in agent.execute_task_streaming(task, context):
    if event['type'] == 'token':
        buffer.append(event['content'])
        if len(buffer) >= buffer_size:
            # Display buffered tokens
            print(''.join(buffer), end='', flush=True)
            buffer.clear()
    
# Display remaining tokens
if buffer:
    print(''.join(buffer), end='', flush=True)

Summary๏ƒ

Streaming provides:

  • โœ… Real-time feedback

  • โœ… Better user experience

  • โœ… Tool call visibility

  • โœ… Progressive result display

  • โœ… Status updates

Key Takeaways:

  • Use for long-running operations

  • Provide user feedback

  • Handle partial results

  • Monitor performance

  • Buffer for smooth display

For more details, see: