Streaming Response Usage๏
This guide covers how to use agent-level streaming to receive tokens, tool calls, and results as theyโre generated, providing better user experience for long-running operations.
Table of Contents๏
Overview๏
Agent-level streaming provides:
Real-Time Feedback: See tokens and tool calls as they happen
Better UX: Users see progress instead of waiting for complete response
Tool Call Visibility: See which tools are being called in real-time
Result Streaming: Receive tool results as theyโre generated
Status Updates: Get status updates throughout execution
When to Use Streaming๏
โ Long-running tasks
โ Interactive applications
โ Real-time user feedback needed
โ Tool call visibility important
โ Progressive result display
When NOT to Use Streaming๏
โ Simple, fast operations
โ Batch processing
โ Complete result needed before processing
โ Non-interactive applications
Basic Streaming๏
Pattern 1: Stream Task Execution๏
Stream task execution with tokens and tool calls.
from aiecs.domain.agent import HybridAgent, AgentConfiguration
from aiecs.llm import OpenAIClient
agent = HybridAgent(
agent_id="agent-1",
name="My Agent",
llm_client=OpenAIClient(),
tools=["search", "calculator"],
config=AgentConfiguration()
)
await agent.initialize()
# Stream task execution
async for event in agent.execute_task_streaming(
{"description": "Research Python and calculate 2+2"},
{}
):
if event['type'] == 'token':
# Stream tokens as they're generated
print(event['content'], end='', flush=True)
elif event['type'] == 'tool_call':
# Tool call started
print(f"\nCalling {event['tool_name']}...")
elif event['type'] == 'tool_result':
# Tool result received
print(f"\nResult: {event['result']}")
elif event['type'] == 'started':
# Lifecycle: task execution started
print("\n[Started]")
elif event['type'] == 'completed':
# Lifecycle: task execution completed (HybridAgent only)
print(f"\n[Completed in {event['execution_time']:.2f}s]")
Pattern 2: Stream Message Processing๏
Stream message processing for conversational agents.
# Stream message processing
async for token in agent.process_message_streaming("Hello, how are you?"):
print(token, end='', flush=True)
Pattern 3: Collect Streamed Results๏
Collect streamed results for processing.
tokens = []
tool_calls = []
results = []
async for event in agent.execute_task_streaming(task, context):
if event['type'] == 'token':
tokens.append(event['content'])
elif event['type'] == 'tool_call':
tool_calls.append(event)
elif event['type'] == 'tool_result':
results.append(event)
# Process collected results
full_response = ''.join(tokens)
print(f"Full response: {full_response}")
print(f"Tool calls: {len(tool_calls)}")
print(f"Results: {len(results)}")
Streaming Task Execution๏
Pattern 1: Basic Task Streaming๏
Stream basic task execution.
async for event in agent.execute_task_streaming(
{"description": "Answer a question"},
{}
):
if event['type'] == 'token':
print(event['content'], end='', flush=True)
elif event['type'] == 'result':
print(f"\nFinal result: {event['output']}")
Pattern 2: ReAct Loop Streaming๏
Stream ReAct loop execution (HybridAgent). HybridAgent emits dedicated
thought / action / observation events for each ReAct phase, plus
iteration_start events that mark the boundary between iterations.
async for event in agent.execute_task_streaming(
{"description": "Research and analyze"},
{}
):
if event['type'] == 'token':
# Stream reasoning tokens
print(event['content'], end='', flush=True)
elif event['type'] == 'iteration_start':
# New ReAct iteration begins
print(f"\n[Iteration {event['iteration']}]")
elif event['type'] == 'thought':
# Reasoning text accumulated for this iteration
print(f"\n[Thought] {event['content']}")
elif event['type'] == 'action':
# Tool action selected by the model
print(f"\n[Action] {event['tool_name']}")
elif event['type'] == 'tool_result':
# Raw tool result
print(f"\n[Result] {event['result']}")
elif event['type'] == 'observation':
# Observation derived from the tool result
print(f"\n[Observation] {event['content']}")
Pattern 3: Lifecycle Tracking๏
Track execution lifecycle through streaming. ToolAgent and HybridAgent
emit started at the beginning and (in HybridAgent) completed at the
end; HybridAgent additionally emits iteration_start per ReAct loop.
async for event in agent.execute_task_streaming(task, context):
etype = event['type']
if etype == 'started':
print("Agent started")
elif etype == 'iteration_start':
# HybridAgent only โ ReAct iteration boundary
print(f"Iteration {event['iteration']} starting")
elif etype == 'completed':
# HybridAgent only โ symmetric to 'started'
print(f"Agent completed (success={event['success']})")
elif etype == 'result':
# Final result payload (all agents)
print(f"Final output: {event['output']}")
Streaming Message Processing๏
Pattern 1: Conversational Streaming๏
Stream conversational responses.
async for token in agent.process_message_streaming("Tell me about Python"):
print(token, end='', flush=True)
Pattern 2: Multi-Turn Streaming๏
Stream multi-turn conversations.
# First message
async for token in agent.process_message_streaming("Hello"):
print(token, end='', flush=True)
# Second message
async for token in agent.process_message_streaming("What can you do?"):
print(token, end='', flush=True)
Pattern 3: Streaming with Context๏
Stream with session context.
async for token in agent.process_message_streaming(
"Continue our conversation",
sender_id="user-123"
):
print(token, end='', flush=True)
Event Types๏
Token Events๏
Token events contain generated text tokens.
async for event in agent.execute_task_streaming(task, context):
if event['type'] == 'token':
content = event['content'] # Token text
timestamp = event.get('timestamp') # Optional timestamp
print(content, end='', flush=True)
Tool Call Events๏
Tool call events indicate when tools are being called.
async for event in agent.execute_task_streaming(task, context):
if event['type'] == 'tool_call':
tool_name = event['tool_name']
parameters = event.get('parameters', {})
timestamp = event.get('timestamp')
print(f"Calling {tool_name} with {parameters}")
Tool Result Events๏
Tool result events contain tool execution results.
async for event in agent.execute_task_streaming(task, context):
if event['type'] == 'tool_result':
tool_name = event['tool_name']
result = event['result']
success = event.get('success', True)
timestamp = event.get('timestamp')
if success:
print(f"{tool_name} returned: {result}")
else:
print(f"{tool_name} failed: {result}")
Status Events๏
Status events indicate execution status. Emitted by BaseAgent (default
implementation) and LLMAgent; status is always "started" in the
current implementation.
async for event in agent.execute_task_streaming(task, context):
if event['type'] == 'status':
status = event['status'] # currently always "started"
timestamp = event.get('timestamp')
print(f"Status: {status}")
Lifecycle Events๏
ToolAgent and HybridAgent emit dedicated lifecycle events instead of
status. started is emitted at the beginning of streaming execution;
completed is emitted by HybridAgent only, as the symmetric counterpart
to started.
async for event in agent.execute_task_streaming(task, context):
if event['type'] == 'started':
timestamp = event.get('timestamp')
print("Execution started")
elif event['type'] == 'completed':
success = event['success']
execution_time = event['execution_time']
print(f"Execution completed (success={success}, {execution_time:.2f}s)")
ReAct Events (HybridAgent)๏
HybridAgent exposes the ReAct loop as discrete events. iteration_start
marks the boundary between iterations; thought, action, and
observation correspond to the three phases of each iteration.
async for event in agent.execute_task_streaming(task, context):
etype = event['type']
if etype == 'iteration_start':
print(f"Iteration {event['iteration']} starting")
elif etype == 'thought':
print(f"Thought: {event['content']}")
elif etype == 'action':
print(f"Action: {event['tool_name']}({event.get('parameters', {})})")
elif etype == 'observation':
print(f"Observation: {event['content']}")
Tool Streaming Events (ToolAgent / HybridAgent)๏
In addition to tool_call and tool_result, the Function Calling
streaming path emits two finer-grained events:
tool_call_delta: incremental tool-call fragment received from the provider stream (arguments may still be partial).tool_calls_ready: complete batch of tool calls assembled from the stream, ready to be dispatched.
ToolAgent also emits tool_error when a tool invocation fails.
Result Events๏
Result events contain final task results.
async for event in agent.execute_task_streaming(task, context):
if event['type'] == 'result':
output = event['output']
execution_time = event.get('execution_time')
success = event.get('success', True)
print(f"Final result: {output}")
print(f"Execution time: {execution_time}s")
DAWP Boundary Events (HybridAgent + DawpPlugin)๏
When DawpPlugin is enabled with stream_boundary_events: true, the stream
includes optional run-level boundary events for SDK/UI panel management. Step
content still uses homomorphic token / tool_result events with
loop_scope.kind=dawp (see design ยง8.1).
Event type |
When |
Key fields |
|---|---|---|
|
Before first DAWP step |
|
|
After DAWP run ends |
|
Pattern: SDK consumer (recommended)
from aiecs.domain.agent.plugins.dawp import DawpStreamConsumer, effective_loop_scope
consumer = DawpStreamConsumer(
on_run_started=lambda panel: ui.open_dawp_panel(panel.run_id),
on_run_completed=lambda panel: ui.close_dawp_panel(panel.run_id),
)
async for event in agent.execute_task_streaming(task, context):
consumer.consume(event) # handles dawp_run_started / dawp_run_completed
scope = effective_loop_scope(event)
render_stream_event(event, scope=scope) # same renderer; style by scope.kind
Pattern: Manual subscription
async for event in agent.execute_task_streaming(task, context):
scope = event.get("loop_scope") or {"kind": "main"}
if event["type"] == "dawp_run_started":
ui.open_dawp_panel(event["run_id"])
elif event["type"] == "dawp_run_completed":
ui.close_dawp_panel(event["run_id"], success=event["success"])
elif event["type"] == "token":
print(event["content"], end="", flush=True)
Enable boundary events in plugin config:
PluginConfig(
name="dawp",
enabled=True,
options={
"document_path": "workflows/my-flow.dawp.md",
"stream_boundary_events": True,
},
)
Error Events๏
Error events indicate errors during execution.
async for event in agent.execute_task_streaming(task, context):
if event['type'] == 'error':
error = event['error']
timestamp = event.get('timestamp')
print(f"Error: {error}")
Error Handling๏
Pattern 1: Handle Streaming Errors๏
Handle errors during streaming.
try:
async for event in agent.execute_task_streaming(task, context):
if event['type'] == 'error':
print(f"Error: {event['error']}")
break
# Process other events
except Exception as e:
print(f"Streaming failed: {e}")
Pattern 2: Continue on Errors๏
Continue processing despite errors.
async for event in agent.execute_task_streaming(task, context):
if event['type'] == 'error':
logger.error(f"Error: {event['error']}")
# Continue processing other events
continue
# Process event
Pattern 3: Retry on Errors๏
Retry streaming on errors.
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
async for event in agent.execute_task_streaming(task, context):
# Process events
pass
break # Success
except Exception as e:
retry_count += 1
if retry_count >= max_retries:
raise
await asyncio.sleep(1)
Best Practices๏
1. Use Streaming for Long Operations๏
Use streaming for operations that take time:
# Good: Long-running task
async for event in agent.execute_task_streaming(
{"description": "Research and analyze complex topic"},
{}
):
# Stream progress
pass
# Less useful: Fast operation
result = await agent.execute_task(
{"description": "Simple calculation"},
{}
)
2. Provide User Feedback๏
Provide feedback to users during streaming. Use the lifecycle and ReAct
events emitted by ToolAgent / HybridAgent (or status="started" for
LLMAgent) to drive UI affordances:
async for event in agent.execute_task_streaming(task, context):
etype = event['type']
if etype in ('started', 'status'):
show_spinner("Working...")
elif etype == 'thought':
show_spinner("Thinking...")
elif etype in ('action', 'tool_call'):
show_spinner(f"Executing {event.get('tool_name', 'tool')}...")
elif etype in ('completed', 'result', 'error'):
hide_spinner()
3. Handle Partial Results๏
Handle partial results gracefully:
tokens = []
async for event in agent.execute_task_streaming(task, context):
if event['type'] == 'token':
tokens.append(event['content'])
# Display partial result
display_partial(''.join(tokens))
elif event['type'] == 'result':
# Final result
display_final(event['output'])
4. Monitor Streaming Performance๏
Monitor streaming performance:
import time
start = time.time()
token_count = 0
async for event in agent.execute_task_streaming(task, context):
if event['type'] == 'token':
token_count += 1
elif event['type'] == 'result':
duration = time.time() - start
tokens_per_second = token_count / duration
print(f"Streaming rate: {tokens_per_second:.1f} tokens/s")
5. Buffer for Smooth Display๏
Buffer tokens for smooth display:
buffer = []
buffer_size = 10
async for event in agent.execute_task_streaming(task, context):
if event['type'] == 'token':
buffer.append(event['content'])
if len(buffer) >= buffer_size:
# Display buffered tokens
print(''.join(buffer), end='', flush=True)
buffer.clear()
# Display remaining tokens
if buffer:
print(''.join(buffer), end='', flush=True)
Summary๏
Streaming provides:
โ Real-time feedback
โ Better user experience
โ Tool call visibility
โ Progressive result display
โ Status updates
Key Takeaways:
Use for long-running operations
Provide user feedback
Handle partial results
Monitor performance
Buffer for smooth display
For more details, see: