Runnable Pattern Guide
Overview
The Runnable Pattern provides a formal base class for async task components with standardized lifecycle management, configuration, error handling, and retry logic. It’s designed to make building robust, production-ready async components easier and more consistent.
Table of Contents
Why Use Runnable?
Building async components often requires implementing the same patterns repeatedly:
Setup and teardown of resources
Error handling and retry logic
Configuration management
Metrics collection
Circuit breaker for fault tolerance
The Runnable pattern provides all of this out-of-the-box, letting you focus on your component’s core logic.
Core Features
✅ Lifecycle Management: Standardized setup → execute → teardown pattern
✅ Configuration: Type-safe configuration with validation
✅ Retry Logic: Exponential backoff with configurable retries
✅ Circuit Breaker: Prevent cascading failures
✅ Timeout Support: Automatic timeout handling
✅ Metrics: Built-in execution metrics collection
✅ Async Context Manager: Clean resource management with async with
Quick Start
Basic Example
from dataclasses import dataclass
from typing import Dict, Any
from aiecs.common.knowledge_graph import Runnable, RunnableConfig
# 1. Define your configuration
@dataclass
class MyConfig(RunnableConfig):
api_key: str = ""
max_items: int = 100
# 2. Implement your component
class MyComponent(Runnable[MyConfig, Dict[str, Any]]):
async def _setup(self) -> None:
"""Initialize resources"""
self.client = APIClient(self.config.api_key)
print("Component initialized")
async def _execute(self, **kwargs) -> Dict[str, Any]:
"""Main execution logic"""
query = kwargs.get("query", "")
results = await self.client.search(query, limit=self.config.max_items)
return {"results": results, "count": len(results)}
async def _teardown(self) -> None:
"""Cleanup resources"""
await self.client.close()
print("Component cleaned up")
# 3. Use your component
async def main():
config = MyConfig(api_key="your-key", max_items=50)
# Option 1: Manual lifecycle
component = MyComponent(config)
await component.setup()
result = await component.run(query="knowledge graph")
await component.teardown()
# Option 2: Context manager (recommended)
async with MyComponent(config) as component:
result = await component.run(query="knowledge graph")
print(f"Found {result['count']} results")
Lifecycle Management
The Runnable pattern enforces a clear lifecycle:
CREATED → INITIALIZING → READY → RUNNING → COMPLETED/FAILED → STOPPED
Lifecycle Methods
setup() - Initialize Resources
Called once before execution. Use this to:
Initialize connections (database, API clients, etc.)
Load models or data
Validate configuration
Allocate resources
async def _setup(self) -> None:
self.db = await connect_to_database(self.config.db_url)
self.model = load_model(self.config.model_path)
execute() - Core Logic
The main execution method. Implement your component’s logic here:
async def _execute(self, **kwargs) -> ResultType:
# Your logic here
data = await self.db.query(kwargs["query"])
predictions = self.model.predict(data)
return {"predictions": predictions}
teardown() - Cleanup
Called after execution completes. Use this to:
Close connections
Release resources
Save state
Cleanup temporary files
async def _teardown(self) -> None:
await self.db.close()
self.model.cleanup()
State Transitions
component = MyComponent(config)
print(component.state) # RunnableState.CREATED
await component.setup()
print(component.state) # RunnableState.READY
result = await component.execute()
print(component.state) # RunnableState.COMPLETED
await component.teardown()
print(component.state) # RunnableState.STOPPED
Configuration
Basic Configuration
All Runnable components use RunnableConfig as the base configuration class:
@dataclass
class RunnableConfig:
# Retry configuration
max_retries: int = 3
retry_delay: float = 1.0
retry_backoff: float = 2.0
max_retry_delay: float = 30.0
# Timeout configuration
timeout: Optional[float] = None
# Circuit breaker configuration
enable_circuit_breaker: bool = False
circuit_breaker_threshold: int = 5
circuit_breaker_timeout: float = 60.0
# Custom configuration
custom_config: Dict[str, Any] = field(default_factory=dict)
Custom Configuration
Extend RunnableConfig to add your own configuration:
@dataclass
class DataProcessorConfig(RunnableConfig):
# Your custom fields
input_path: str = ""
output_path: str = ""
batch_size: int = 100
# Override defaults
max_retries: int = 5
timeout: float = 300.0
Configuration Validation
Override _validate_config() to add custom validation:
class MyComponent(Runnable[MyConfig, ResultType]):
def _validate_config(self) -> None:
# Call parent validation
super()._validate_config()
# Add custom validation
if not self.config.api_key:
raise ValueError("api_key is required")
if self.config.batch_size <= 0:
raise ValueError("batch_size must be positive")
Error Handling & Retry Logic
Automatic Retries
The run() method automatically retries on failure with exponential backoff:
config = MyConfig(
max_retries=3, # Retry up to 3 times
retry_delay=1.0, # Initial delay: 1 second
retry_backoff=2.0, # Double delay each retry
max_retry_delay=30.0 # Cap delay at 30 seconds
)
component = MyComponent(config)
await component.setup()
# Will retry automatically on failure
# Delays: 1s, 2s, 4s (then fail)
result = await component.run(query="test")
Retry Behavior
Attempt 1: Execute immediately
↓ (fails)
Wait 1.0 seconds
↓
Attempt 2: Retry
↓ (fails)
Wait 2.0 seconds (1.0 * 2.0)
↓
Attempt 3: Retry
↓ (fails)
Wait 4.0 seconds (2.0 * 2.0)
↓
Attempt 4: Final retry
↓ (fails)
Raise exception
Execute vs Run
execute(): Low-level execution without retry logicrun(): Production-ready execution with retries and circuit breaker
# For testing or when you don't want retries
result = await component.execute(query="test")
# For production (recommended)
result = await component.run(query="test")
Custom Error Handling
class MyComponent(Runnable[MyConfig, ResultType]):
async def _execute(self, **kwargs) -> ResultType:
try:
result = await self.risky_operation()
return result
except TemporaryError as e:
# Let retry logic handle it
raise
except PermanentError as e:
# Don't retry permanent errors
logger.error(f"Permanent error: {e}")
raise RuntimeError(f"Cannot recover from: {e}")
Circuit Breaker Pattern
The circuit breaker prevents cascading failures by stopping execution after repeated failures.
How It Works
CLOSED (normal operation)
↓ (threshold failures)
OPEN (reject all requests)
↓ (timeout expires)
HALF-OPEN (try one request)
↓ (success)
CLOSED
Configuration
config = MyConfig(
enable_circuit_breaker=True,
circuit_breaker_threshold=5, # Open after 5 failures
circuit_breaker_timeout=60.0 # Try again after 60 seconds
)
component = MyComponent(config)
await component.setup()
# After 5 failures, circuit opens
for i in range(10):
try:
result = await component.run()
except RuntimeError as e:
if "Circuit breaker is open" in str(e):
print("Circuit breaker protecting system")
await asyncio.sleep(60) # Wait for timeout
Use Cases
API Rate Limiting: Stop calling an API that’s returning errors
Database Failures: Prevent overwhelming a struggling database
Downstream Service Issues: Fail fast when a dependency is down
Timeout Support
Execution Timeout
config = MyConfig(
timeout=30.0 # Timeout after 30 seconds
)
component = MyComponent(config)
await component.setup()
try:
result = await component.run(query="long-running")
except asyncio.TimeoutError:
print("Execution timed out")
No Timeout
config = MyConfig(
timeout=None # No timeout (default)
)
Metrics & Monitoring
Built-in Metrics
Every execution collects metrics automatically:
component = MyComponent(config)
await component.setup()
result = await component.run(query="test")
# Access metrics
metrics = component.metrics
print(f"Duration: {metrics.duration_seconds}s")
print(f"Retries: {metrics.retry_count}")
print(f"Success: {metrics.success}")
print(f"Error: {metrics.error}")
# Export as dictionary
metrics_dict = component.get_metrics_dict()
# {
# "start_time": "2025-11-15T10:30:00",
# "end_time": "2025-11-15T10:30:05",
# "duration_seconds": 5.2,
# "retry_count": 1,
# "success": True,
# "error": None,
# "state": "completed"
# }
Reset Metrics
# Reset metrics between runs
component.reset_metrics()
Integration with Monitoring Systems
class MonitoredComponent(Runnable[MyConfig, ResultType]):
async def _execute(self, **kwargs) -> ResultType:
result = await self.do_work()
# Send metrics to monitoring system
metrics = self.get_metrics_dict()
await monitoring_system.send(metrics)
return result
Advanced Usage
Async Context Manager
The recommended way to use Runnable components:
async with MyComponent(config) as component:
result1 = await component.run(query="first")
result2 = await component.run(query="second")
# Automatic teardown on exit
Multiple Executions
component = MyComponent(config)
await component.setup()
# Execute multiple times
for query in queries:
component.reset_metrics() # Reset metrics for each run
result = await component.run(query=query)
print(f"Query: {query}, Duration: {component.metrics.duration_seconds}s")
await component.teardown()
Composition
class PipelineComponent(Runnable[PipelineConfig, ResultType]):
async def _setup(self) -> None:
# Setup sub-components
self.extractor = ExtractorComponent(self.config.extractor_config)
self.processor = ProcessorComponent(self.config.processor_config)
await self.extractor.setup()
await self.processor.setup()
async def _execute(self, **kwargs) -> ResultType:
# Compose components
extracted = await self.extractor.run(**kwargs)
processed = await self.processor.run(data=extracted)
return processed
async def _teardown(self) -> None:
await self.extractor.teardown()
await self.processor.teardown()
Best Practices
1. Use Context Managers
Always prefer async context managers for automatic cleanup:
# ✅ Good
async with MyComponent(config) as component:
result = await component.run()
# ❌ Avoid
component = MyComponent(config)
await component.setup()
result = await component.run()
await component.teardown() # Easy to forget!
2. Use run() in Production
Use run() instead of execute() to get retry and circuit breaker support:
# ✅ Production
result = await component.run(query="test")
# ⚠️ Testing only
result = await component.execute(query="test")
3. Configure Retries Appropriately
# For idempotent operations (safe to retry)
config = MyConfig(max_retries=5, retry_delay=1.0)
# For non-idempotent operations (be careful)
config = MyConfig(max_retries=0) # No retries
# For critical operations
config = MyConfig(
max_retries=10,
retry_delay=2.0,
enable_circuit_breaker=True
)
4. Validate Configuration Early
class MyComponent(Runnable[MyConfig, ResultType]):
def _validate_config(self) -> None:
super()._validate_config()
# Fail fast on invalid config
if not self.config.required_field:
raise ValueError("required_field must be set")
5. Handle Resources Properly
class MyComponent(Runnable[MyConfig, ResultType]):
async def _setup(self) -> None:
# Acquire resources
self.connection = await create_connection()
async def _teardown(self) -> None:
# Always cleanup, even on error
if hasattr(self, 'connection'):
await self.connection.close()
6. Log Appropriately
The Runnable pattern logs automatically, but you can add custom logging:
import logging
logger = logging.getLogger(__name__)
class MyComponent(Runnable[MyConfig, ResultType]):
async def _execute(self, **kwargs) -> ResultType:
logger.info(f"Processing query: {kwargs.get('query')}")
result = await self.process()
logger.info(f"Processed {len(result)} items")
return result
Real-World Examples
Example 1: API Client Component
from dataclasses import dataclass
from typing import Dict, Any, List
import httpx
from aiecs.common.knowledge_graph import Runnable, RunnableConfig
@dataclass
class APIClientConfig(RunnableConfig):
base_url: str = ""
api_key: str = ""
timeout: float = 30.0
max_retries: int = 3
retry_delay: float = 1.0
class APIClient(Runnable[APIClientConfig, Dict[str, Any]]):
"""Robust API client with retry and circuit breaker"""
async def _setup(self) -> None:
self.client = httpx.AsyncClient(
base_url=self.config.base_url,
headers={"Authorization": f"Bearer {self.config.api_key}"},
timeout=self.config.timeout
)
async def _execute(self, endpoint: str, **params) -> Dict[str, Any]:
response = await self.client.get(endpoint, params=params)
response.raise_for_status()
return response.json()
async def _teardown(self) -> None:
await self.client.aclose()
# Usage
async def main():
config = APIClientConfig(
base_url="https://api.example.com",
api_key="your-key",
max_retries=5,
enable_circuit_breaker=True
)
async with APIClient(config) as client:
data = await client.run(endpoint="/users", limit=100)
print(f"Fetched {len(data['users'])} users")
Example 2: Data Processing Pipeline
from dataclasses import dataclass
from typing import List
import pandas as pd
from aiecs.common.knowledge_graph import Runnable, RunnableConfig
@dataclass
class DataProcessorConfig(RunnableConfig):
input_path: str = ""
output_path: str = ""
batch_size: int = 1000
max_retries: int = 3
class DataProcessor(Runnable[DataProcessorConfig, int]):
"""Process large datasets in batches"""
async def _setup(self) -> None:
# Load data
self.df = pd.read_csv(self.config.input_path)
self.processed_count = 0
async def _execute(self, transform_fn=None) -> int:
# Process in batches
for i in range(0, len(self.df), self.config.batch_size):
batch = self.df.iloc[i:i + self.config.batch_size]
if transform_fn:
batch = transform_fn(batch)
self.processed_count += len(batch)
return self.processed_count
async def _teardown(self) -> None:
# Save results
if hasattr(self, 'df'):
self.df.to_csv(self.config.output_path, index=False)
# Usage
async def main():
config = DataProcessorConfig(
input_path="data.csv",
output_path="processed.csv",
batch_size=5000
)
def clean_data(df):
return df.dropna().drop_duplicates()
async with DataProcessor(config) as processor:
count = await processor.run(transform_fn=clean_data)
print(f"Processed {count} records")
Example 3: Knowledge Graph Builder Component
from dataclasses import dataclass
from typing import Dict, Any
from aiecs.common.knowledge_graph import Runnable, RunnableConfig
from aiecs.infrastructure.graph_storage import InMemoryGraphStore
from aiecs.application.knowledge_graph.builder import GraphBuilder
@dataclass
class KGBuilderConfig(RunnableConfig):
enable_deduplication: bool = True
enable_linking: bool = True
max_retries: int = 3
timeout: float = 300.0
class KGBuilderComponent(Runnable[KGBuilderConfig, Dict[str, Any]]):
"""Knowledge graph builder with robust error handling"""
async def _setup(self) -> None:
# Initialize graph store
self.graph_store = InMemoryGraphStore()
await self.graph_store.initialize()
# Initialize builder
self.builder = GraphBuilder(
graph_store=self.graph_store,
enable_deduplication=self.config.enable_deduplication,
enable_linking=self.config.enable_linking
)
async def _execute(self, text: str, source: str = "unknown") -> Dict[str, Any]:
# Build graph from text
result = await self.builder.build_from_text(text, source)
return {
"entities_added": result.entities_added,
"relations_added": result.relations_added,
"success": result.success,
"errors": result.errors
}
async def _teardown(self) -> None:
# Cleanup graph store
if hasattr(self, 'graph_store'):
# Save state if needed
pass
# Usage
async def main():
config = KGBuilderConfig(
enable_deduplication=True,
max_retries=5,
enable_circuit_breaker=True,
circuit_breaker_threshold=3
)
texts = [
"Alice works at Tech Corp in San Francisco.",
"Bob is a colleague of Alice at Tech Corp.",
"Tech Corp is a technology company."
]
async with KGBuilderComponent(config) as builder:
for i, text in enumerate(texts):
result = await builder.run(text=text, source=f"doc_{i}")
print(f"Added {result['entities_added']} entities, "
f"{result['relations_added']} relations")
# Check metrics
metrics = builder.get_metrics_dict()
print(f"Duration: {metrics['duration_seconds']:.2f}s")
Summary
The Runnable pattern provides:
✅ Standardized lifecycle - Setup, execute, teardown ✅ Automatic retries - Exponential backoff ✅ Circuit breaker - Fault tolerance ✅ Configuration - Type-safe and validated ✅ Metrics - Built-in monitoring ✅ Clean code - Less boilerplate
Use it to build robust, production-ready async components with minimal effort!