Domain API
This section documents the domain layer components.
Agent
Base Agent
Base AI Agent
Abstract base class for all AI agents in the AIECS system.
- class aiecs.domain.agent.base_agent.OperationTimer[source]
Bases:
objectContext manager for timing operations and tracking metrics.
Automatically records operation duration and can be used to track operation-level performance metrics.
Example
- with agent.track_operation_time(“llm_call”) as timer:
result = llm.generate(prompt)
# timer.duration contains the elapsed time in seconds
- __init__(operation_name, agent=None)[source]
Initialize operation timer.
- Parameters:
operation_name (str) – Name of the operation being timed
agent (BaseAIAgent | None) – Optional agent instance for automatic metrics recording
- class aiecs.domain.agent.base_agent.CacheConfig[source]
Bases:
objectConfiguration for tool result caching.
Provides control over caching behavior to improve performance and reduce costs by avoiding redundant tool executions. Supports TTL-based expiration, size limits, and automatic cleanup.
Key Features: - TTL-based cache expiration (default and per-tool) - Size limits to prevent memory exhaustion - Automatic cleanup when capacity threshold reached - Configurable cache key generation - Input hashing for large parameters
- default_ttl
Default time-to-live in seconds for cached entries (default: 300 = 5 minutes)
- Type:
- tool_specific_ttl
Dictionary mapping tool names to custom TTL values (overrides default_ttl)
Examples
# Example 1: Basic caching configuration config = CacheConfig(
enabled=True, default_ttl=300, # 5 minutes max_cache_size=1000
)
# Example 2: Per-tool TTL overrides config = CacheConfig(
enabled=True, default_ttl=300, tool_specific_ttl={
“search”: 600, # Search results cached for 10 minutes “calculator”: 3600, # Calculator results cached for 1 hour “weather”: 1800 # Weather data cached for 30 minutes
}
)
# Example 3: Aggressive caching for expensive operations config = CacheConfig(
enabled=True, default_ttl=3600, # 1 hour default max_cache_size=5000, max_memory_mb=500, cleanup_threshold=0.95 # Cleanup at 95% capacity
)
# Example 4: Disable caching for time-sensitive tools config = CacheConfig(
enabled=False # Disable caching entirely
)
# Example 5: Cache with timestamp-aware keys config = CacheConfig(
enabled=True, default_ttl=300, include_timestamp_in_key=True # Include timestamp for time-sensitive caching
)
- __init__(enabled=True, default_ttl=300, tool_specific_ttl=None, max_cache_size=1000, max_memory_mb=100, cleanup_interval=60, cleanup_threshold=0.9, include_timestamp_in_key=False, hash_large_inputs=True)
- class aiecs.domain.agent.base_agent.BaseAIAgent[source]
Bases:
SkillCapableMixin,ABCAbstract base class for AI agents.
Provides common functionality for agent lifecycle management, state management, memory, goals, and metrics tracking.
This base class supports extensive flexibility and advanced features:
Tool Flexibility: - Accept tool names (List[str]) for backward compatibility - Accept pre-configured tool instances (Dict[str, BaseTool]) with preserved state - Automatic tool loading and validation
LLM Client Flexibility: - Accept any object implementing LLMClientProtocol (duck typing) - No requirement for BaseLLMClient inheritance - Custom LLM client wrappers fully supported
Advanced Features: - ContextEngine integration for persistent conversation history - Custom config managers for dynamic configuration - Checkpointers for state persistence (LangGraph compatible) - Agent collaboration (delegation, peer review, consensus) - Agent learning from experiences - Resource management (rate limiting, quotas) - Performance tracking and health monitoring - Tool result caching - Parallel tool execution - Streaming responses - Error recovery strategies
Examples
# Example 1: Basic agent with tool names (backward compatible) agent = HybridAgent(
agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools=[“search”, “calculator”] # Tool names loaded by subclass
)
# Example 2: Agent with tool instances (preserves tool state) from aiecs.tools import BaseTool
- class StatefulSearchTool(BaseTool):
- def __init__(self, api_key: str):
self.api_key = api_key self.call_count = 0 # State preserved
- async def run_async(self, query: str):
self.call_count += 1 return f”Search results for: {query}”
- agent = HybridAgent(
agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools={
“search”: StatefulSearchTool(api_key=”…”), “calculator”: CalculatorTool()
}, llm_client=OpenAIClient()
) # Tool state (call_count) is preserved across agent operations
# Example 3: Agent with custom LLM client (no BaseLLMClient inheritance) class CustomLLMClient:
- agent = LLMAgent(
agent_id=”agent1”, name=”My LLM Agent”, agent_type=AgentType.CONVERSATIONAL, config=config, llm_client=CustomLLMClient() # Works without BaseLLMClient!
)
# Example 4: Agent with ContextEngine for persistent storage from aiecs.domain.context import ContextEngine
context_engine = ContextEngine() await context_engine.initialize()
- agent = HybridAgent(
agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools=[“search”], llm_client=OpenAIClient(), context_engine=context_engine # Enables persistent conversation history
) # Conversation history persists across agent restarts
# Example 5: Agent with custom config manager class DatabaseConfigManager:
- async def get_config(self, key: str):
# Load from database return await db.get_config(key)
- async def update_config(self, key: str, value: Any):
# Update in database await db.update_config(key, value)
- agent = HybridAgent(
agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools=[“search”], llm_client=OpenAIClient(), config_manager=DatabaseConfigManager() # Dynamic config loading
)
# Example 6: Agent with checkpointer for LangGraph integration class RedisCheckpointer:
- async def save(self, agent_id: str, state: Dict[str, Any]):
await redis.set(f”checkpoint:{agent_id}”, json.dumps(state))
- async def load(self, agent_id: str) -> Optional[Dict[str, Any]]:
data = await redis.get(f”checkpoint:{agent_id}”) return json.loads(data) if data else None
- agent = HybridAgent(
agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools=[“search”], llm_client=OpenAIClient(), checkpointer=RedisCheckpointer() # LangGraph-compatible checkpointing
)
# Example 7: Agent with collaboration features agent_registry = {
“agent2”: other_agent_instance, “agent3”: another_agent_instance
}
- agent = HybridAgent(
agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools=[“search”], llm_client=OpenAIClient(), collaboration_enabled=True, agent_registry=agent_registry # Enable delegation and peer review
)
# Delegate task to another agent result = await agent.delegate_task(
task_description=”Analyze this data”, target_agent_id=”agent2”
)
# Example 8: Agent with learning enabled from aiecs.domain.agent.models import ResourceLimits
- agent = HybridAgent(
agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools=[“search”], llm_client=OpenAIClient(), learning_enabled=True # Learn from past experiences
)
# Record experience await agent.record_experience(
task_type=”data_analysis”, approach=”parallel_tools”, success=True, execution_time=2.5
)
# Get recommended approach based on history approach = await agent.get_recommended_approach(“data_analysis”) print(f”Recommended: {approach}”)
# Example 9: Agent with resource limits from aiecs.domain.agent.models import ResourceLimits
- resource_limits = ResourceLimits(
max_concurrent_tasks=5, max_tokens_per_minute=10000, max_tool_calls_per_minute=100
)
- agent = HybridAgent(
agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools=[“search”], llm_client=OpenAIClient(), resource_limits=resource_limits # Rate limiting and quotas
)
# Check resource availability before executing if await agent.check_resource_availability():
result = await agent.execute_task(task, context)
- else:
await agent.wait_for_resources(timeout=30.0)
# Example 10: Agent with performance tracking agent = HybridAgent(
agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools=[“search”], llm_client=OpenAIClient()
)
# Track operation performance with agent.track_operation_time(“data_processing”):
result = await agent.execute_task(task, context)
# Get performance metrics metrics = agent.get_performance_metrics() print(f”Average response time: {metrics[‘avg_response_time’]}s”) print(f”P95 response time: {metrics[‘p95_response_time’]}s”)
# Get health status health = agent.get_health_status() print(f”Health score: {health[‘score’]}”) print(f”Status: {health[‘status’]}”)
# Example 11: Agent with tool caching agent = HybridAgent(
agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools=[“search”], llm_client=OpenAIClient()
)
# Execute tool with caching (30 second TTL) result1 = await agent.execute_tool_with_cache(
tool_name=”search”, operation=”query”, parameters={“q”: “AI”}, cache_ttl=30
)
# Second call uses cache (no API call) result2 = await agent.execute_tool_with_cache(
tool_name=”search”, operation=”query”, parameters={“q”: “AI”}, cache_ttl=30
)
# Get cache statistics stats = agent.get_cache_stats() print(f”Cache hit rate: {stats[‘hit_rate’]:.1%}”)
# Example 12: Agent with parallel tool execution agent = HybridAgent(
agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools=[“search”, “calculator”, “translator”], llm_client=OpenAIClient()
)
# Execute multiple independent tools in parallel (3-5x faster) results = await agent.execute_tools_parallel([
{“tool”: “search”, “operation”: “query”, “parameters”: {“q”: “AI”}}, {“tool”: “calculator”, “operation”: “add”, “parameters”: {“a”: 1, “b”: 2}}, {“tool”: “translator”, “operation”: “translate”, “parameters”: {“text”: “Hello”}}
], max_concurrency=3)
# Example 13: Agent with streaming responses agent = HybridAgent(
agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools=[“search”], llm_client=OpenAIClient()
)
# Stream task execution (tokens + tool calls) async for event in agent.execute_task_streaming(task, context):
- if event[‘type’] == ‘token’:
print(event[‘content’], end=’’, flush=True)
- elif event[‘type’] == ‘tool_call’:
print(f”nCalling {event[‘tool_name’]}…”)
- elif event[‘type’] == ‘result’:
print(f”nFinal result: {event[‘output’]}”)
# Example 14: Agent with error recovery agent = HybridAgent(
agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools=[“search”], llm_client=OpenAIClient()
)
# Execute with automatic recovery strategies result = await agent.execute_with_recovery(
task=task, context=context, strategies=[“retry”, “simplify”, “fallback”, “delegate”]
) # Automatically tries retry → simplify → fallback → delegate if errors occur
- __init__(agent_id, name, agent_type, config, description=None, version='1.0.0', tools=None, llm_client=None, config_manager=None, checkpointer=None, context_engine=None, collaboration_enabled=False, agent_registry=None, learning_enabled=False, resource_limits=None, skill_script_registry=None, skill_registry=None, plugin_registry=None)[source]
Initialize the base agent.
- Parameters:
agent_id (str) – Unique identifier for the agent
name (str) – Agent name
agent_type (AgentType) – Type of agent
config (AgentConfiguration) – Agent configuration
description (str | None) – Optional agent description
version (str) – Agent version
tools (List[str] | Dict[str, BaseTool] | None) – Optional tools - either list of tool names or dict of tool instances. List[str]: Tool names to be loaded by subclass Dict[str, BaseTool]: Pre-configured tool instances with state
llm_client (LLMClientProtocol | None) – Optional LLM client (any object implementing LLMClientProtocol). Supports custom LLM clients without BaseLLMClient inheritance.
config_manager (ConfigManagerProtocol | None) – Optional configuration manager for dynamic config loading
checkpointer (CheckpointerProtocol | None) – Optional checkpointer for state persistence (LangGraph compatible)
context_engine (ContextEngine | None) – Optional ContextEngine instance for persistent conversation history
collaboration_enabled (bool) – Enable agent collaboration features (delegation, peer review)
agent_registry (Dict[str, Any] | None) – Registry of other agents for collaboration (agent_id -> agent instance)
learning_enabled (bool) – Enable agent learning from experiences
resource_limits (Any | None) – Optional resource limits configuration and session management. If provided, enables persistent storage across agent restarts.
skill_script_registry (SkillScriptRegistry | None) – Optional SkillScriptRegistry for managing tools from skill scripts. If provided, enables dynamic tool registration via add_tool(), remove_tool(), etc.
skill_registry (SkillRegistry | None) – Optional SkillRegistry for loading skills by name. If provided along with config.skills_enabled=True, enables skill support.
plugin_registry (PluginRegistry | None) – Optional PluginRegistry for builtin and extension plugins. Defaults to
PluginRegistry.default()when omitted.
Example
# With tool instances and ContextEngine from aiecs.domain.context import ContextEngine
context_engine = ContextEngine() await context_engine.initialize()
- agent = HybridAgent(
agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools={
“search”: SearchTool(api_key=”…”), “calculator”: CalculatorTool()
}, llm_client=CustomLLMClient(), # Custom client, no inheritance needed config_manager=DatabaseConfigManager(), checkpointer=RedisCheckpointer(), context_engine=context_engine # Enables persistent storage
)
# With tool names (backward compatible) agent = HybridAgent(
agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools=[“search”, “calculator”] # Loaded by subclass
)
- property state: AgentState
Get current agent state.
- async initialize()[source]
Initialize the agent.
This method should be called before the agent can be used. Override in subclasses to add initialization logic.
- Raises:
AgentInitializationError – If initialization fails
- Return type:
None
- async reload_plugins()[source]
Explicit plugin reload: shutdown → re-derive config → new manager → initialize (§9.7).
Rejected while the agent is BUSY or
_current_task_idis set. Does not run automatically on configuration changes mid-task.- Return type:
PluginLoadResult
- async shutdown()[source]
Shutdown the agent.
Override in subclasses to add cleanup logic.
- Return type:
None
- add_tool(tool, replace=False)[source]
Add a tool from a skill script to the agent’s registry.
This method registers a lightweight Tool instance (from skill scripts) to the agent’s SkillScriptRegistry. This is separate from BaseTool instances managed via _tool_instances.
- Parameters:
tool (Tool) – Tool instance to register
replace (bool) – If True, replace existing tool with same name
- Raises:
RuntimeError – If no SkillScriptRegistry is configured
SkillScriptRegistryError – If tool already exists and replace=False
- Return type:
None
Example
- tool = Tool(
name=”my-tool”, description=”A custom tool”, execute=my_async_function
) agent.add_tool(tool)
- remove_tool(tool_name)[source]
Remove a tool from the skill script registry.
- Parameters:
tool_name (str) – Name of the tool to remove
- Returns:
True if tool was removed, False if not found
- Raises:
RuntimeError – If no SkillScriptRegistry is configured
- Return type:
- get_tool(tool_name)[source]
Get a tool from the skill script registry by name.
- Parameters:
tool_name (str) – Name of the tool to retrieve
- Returns:
Tool instance if found, None otherwise
- Return type:
Tool | None
- property skill_script_registry: SkillScriptRegistry | None
Get the skill script registry, if configured.
- get_config_manager()[source]
Get the configuration manager.
- Returns:
Configuration manager instance, or None if not configured
- Return type:
ConfigManagerProtocol | None
- abstract async execute_task(task, context)[source]
Execute a task.
- Parameters:
- Returns:
Task execution result
- Raises:
TaskExecutionError – If task execution fails
- Return type:
Note
Subclasses can use _execute_with_retry() to wrap task execution with automatic retry logic based on agent configuration.
- abstract async process_message(message, sender_id=None)[source]
Process an incoming message.
- Parameters:
- Returns:
Response dictionary
- Return type:
Note
Subclasses can use _execute_with_retry() to wrap message processing with automatic retry logic based on agent configuration.
- async add_to_memory(key, value, memory_type=MemoryType.SHORT_TERM, metadata=None)[source]
Add an item to agent memory.
- async clear_memory(memory_type=None)[source]
Clear agent memory.
- Parameters:
memory_type (MemoryType | None) – If specified, clear only this type of memory
- Return type:
None
- set_goal(description, priority=GoalPriority.MEDIUM, success_criteria=None, deadline=None)[source]
Set a new goal for the agent.
- get_goals(status=None)[source]
Get agent goals.
- Parameters:
status (GoalStatus | None) – Filter by status (optional)
- Returns:
List of goals
- Return type:
List[AgentGoal]
- get_goal(goal_id)[source]
Get a specific goal by ID.
- Parameters:
goal_id (str)
- Return type:
AgentGoal | None
- declare_capability(capability_type, level, description=None, constraints=None)[source]
Declare an agent capability.
- get_capabilities()[source]
Get all agent capabilities.
- Return type:
List[AgentCapabilityDeclaration]
- update_metrics(execution_time=None, success=True, quality_score=None, tokens_used=None, tool_calls=None)[source]
Update agent metrics.
- update_cache_metrics(cache_read_tokens=None, cache_creation_tokens=None, cache_hit=None)[source]
Update prompt cache metrics from LLM response.
This method tracks provider-level prompt caching statistics to monitor cache hit rates and token savings.
- Parameters:
- Return type:
None
Example
# After receiving LLM response agent.update_cache_metrics(
cache_read_tokens=response.cache_read_tokens, cache_creation_tokens=response.cache_creation_tokens, cache_hit=response.cache_hit
)
- update_session_metrics(session_status, session_duration=None, session_requests=0)[source]
Update session-level metrics.
This method should be called when a session is created, updated, or ended to track session-level statistics in agent metrics.
- Parameters:
- Return type:
None
Example
# When creating a session agent.update_session_metrics(session_status=”active”)
# When ending a session agent.update_session_metrics(
session_status=”completed”, session_duration=300.5, session_requests=15
)
- track_operation_time(operation_name)[source]
Create a context manager for tracking operation time.
This method returns an OperationTimer that automatically records operation duration and updates agent metrics when the operation completes.
- Parameters:
operation_name (str) – Name of the operation to track
- Returns:
OperationTimer context manager
- Return type:
Example
- with agent.track_operation_time(“llm_call”) as timer:
result = await llm.generate(prompt)
# Metrics are automatically recorded
# Access duration if needed print(f”Operation took {timer.duration} seconds”)
- get_performance_metrics()[source]
Get comprehensive performance metrics.
Returns detailed performance statistics including operation-level metrics, percentiles, and aggregated statistics.
Example
metrics = agent.get_performance_metrics() print(f”P95 latency: {metrics[‘p95_operation_time’]}s”) print(f”Total operations: {metrics[‘total_operations’]}”) for op_name, stats in metrics[‘operations’].items():
print(f”{op_name}: {stats[‘count’]} calls, avg {stats[‘avg_time’]:.3f}s”)
- get_health_status()[source]
Get agent health status with health score calculation.
Calculates a health score (0-100) based on multiple factors: - Success rate (40% weight) - Error rate (30% weight) - Performance (20% weight) - Session health (10% weight)
Example
health = agent.get_health_status() print(f”Health score: {health[‘health_score’]}/100”) print(f”Status: {health[‘status’]}”) # healthy, degraded, unhealthy if health[‘issues’]:
print(f”Issues: {’, ‘.join(health[‘issues’])}”)
- get_comprehensive_status()[source]
Get comprehensive agent status combining all metrics.
Returns a complete view of agent state, health, performance, and operational metrics.
Example
status = agent.get_comprehensive_status() print(f”Agent: {status[‘agent_id’]}”) print(f”State: {status[‘state’]}”) print(f”Health: {status[‘health’][‘status’]} ({status[‘health’][‘health_score’]}/100)”) print(f”Tasks: {status[‘metrics’][‘total_tasks_executed’]}”) print(f”Sessions: {status[‘metrics’][‘total_sessions’]}”)
- reset_metrics()[source]
Reset performance and session metrics.
Resets all metrics to their initial state while preserving agent configuration and state.
Example
# Reset metrics at the start of a new monitoring period agent.reset_metrics()
- Return type:
None
- to_dict()[source]
Serialize agent to dictionary.
Includes health status and performance metrics for comprehensive agent state representation.
- classmethod from_dict(data)[source]
Deserialize agent from dictionary.
- Parameters:
- Returns:
Agent instance
- Raises:
SerializationError – If deserialization fails
- Return type:
- async save_checkpoint(session_id, checkpoint_id=None)[source]
Save agent state checkpoint.
This method saves the current agent state using the configured checkpointer. If no checkpointer is configured, logs a warning and returns None.
- Parameters:
- Returns:
Checkpoint ID if saved successfully, None otherwise
- Return type:
str | None
Example
# Save checkpoint with auto-generated ID checkpoint_id = await agent.save_checkpoint(session_id=”session-123”)
# Save checkpoint with custom ID checkpoint_id = await agent.save_checkpoint(
session_id=”session-123”, checkpoint_id=”v1.0”
)
Note
Requires a checkpointer to be configured during agent initialization. The checkpoint includes full agent state from to_dict().
- async load_checkpoint(session_id, checkpoint_id=None)[source]
Load agent state from checkpoint.
This method loads agent state from a saved checkpoint using the configured checkpointer. If no checkpointer is configured, logs a warning and returns False.
- Parameters:
- Returns:
True if checkpoint loaded successfully, False otherwise
- Return type:
Example
# Load latest checkpoint success = await agent.load_checkpoint(session_id=”session-123”)
# Load specific checkpoint success = await agent.load_checkpoint(
session_id=”session-123”, checkpoint_id=”v1.0”
)
Note
Requires a checkpointer to be configured during agent initialization. This method updates the agent’s internal state from the checkpoint. Not all state may be restorable (e.g., runtime objects, connections).
- async execute_tool(tool_name, parameters)[source]
Execute a single tool with given parameters.
This is a default implementation that subclasses can override. For ToolAgent, this calls _execute_tool with operation from parameters.
- async execute_tools_parallel(tool_calls, max_concurrency=5)[source]
Execute multiple tools in parallel with concurrency limit.
- Parameters:
- Returns:
List of results in same order as tool_calls
- Return type:
Example
- tool_calls = [
{“tool_name”: “search”, “parameters”: {“query”: “AI”}}, {“tool_name”: “calculator”, “parameters”: {“expression”: “2+2”}}, {“tool_name”: “search”, “parameters”: {“query”: “ML”}},
] results = await agent.execute_tools_parallel(tool_calls, max_concurrency=2)
- async analyze_tool_dependencies(tool_calls)[source]
Analyze dependencies between tool calls.
Detects if one tool’s output is used as input to another tool.
- Parameters:
- Returns:
Dict mapping tool index to list of dependency indices
- Return type:
Example
- tool_calls = [
{“tool_name”: “search”, “parameters”: {“query”: “AI”}}, {“tool_name”: “summarize”, “parameters”: {“text”: “${0.result}”}},
] deps = await agent.analyze_tool_dependencies(tool_calls) # deps = {“1”: [“0”]} # Tool 1 depends on tool 0
- async execute_tools_with_dependencies(tool_calls)[source]
Execute tools respecting dependencies using topological sort.
- Parameters:
- Returns:
List of results in same order as tool_calls
- Return type:
Example
- tool_calls = [
{“tool_name”: “search”, “parameters”: {“query”: “AI”}}, {“tool_name”: “summarize”, “parameters”: {“text”: “${0.result}”}},
] results = await agent.execute_tools_with_dependencies(tool_calls)
- async execute_tool_with_cache(tool_name, parameters)[source]
Execute tool with caching support.
- Parameters:
- Returns:
Tool result (from cache or fresh execution)
- Return type:
Example
result = await agent.execute_tool_with_cache(“search”, {“query”: “AI”})
- invalidate_cache(tool_name=None, pattern=None)[source]
Invalidate cache entries.
- Parameters:
- Returns:
Number of entries invalidated
- Return type:
Example
# Invalidate all search results count = agent.invalidate_cache(tool_name=”search”)
# Invalidate all cache count = agent.invalidate_cache()
- get_cache_stats()[source]
Get cache statistics.
Example
stats = agent.get_cache_stats() print(f”Cache size: {stats[‘size’]}”) print(f”Hit rate: {stats[‘hit_rate’]:.1%}”)
- async execute_task_streaming(task, context)[source]
Execute a task with streaming results.
This method streams task execution events as they occur, including: - Status updates (started, thinking, acting, completed) - LLM tokens (for agents with LLM clients) - Tool calls and results (for agents with tools) - Final result
- Parameters:
- Yields:
Dict[str, Any] – Event dictionaries with ‘type’ and event-specific data
- Return type:
AsyncGenerator[Dict[str, Any], None]
- Event types:
‘status’: Status update (e.g., started, thinking, completed)
‘token’: LLM token (for streaming text generation)
‘tool_call’: Tool execution started
‘tool_result’: Tool execution completed
‘result’: Final task result
‘error’: Error occurred
Example
```python async for event in agent.execute_task_streaming(task, context):
- if event[‘type’] == ‘token’:
print(event[‘content’], end=’’, flush=True)
- elif event[‘type’] == ‘tool_call’:
print(f”nCalling tool: {event[‘tool_name’]}”)
- elif event[‘type’] == ‘tool_result’:
print(f”Tool result: {event[‘result’]}”)
- elif event[‘type’] == ‘result’:
print(f”nFinal result: {event[‘output’]}”)
Note
Subclasses should override this method to provide streaming support. Default implementation falls back to non-streaming execute_task.
- async process_message_streaming(message, sender_id=None)[source]
Process a message with streaming response.
This method streams the response text as it’s generated, providing a better user experience for long responses.
- Parameters:
- Yields:
str – Response text tokens/chunks
- Return type:
AsyncGenerator[str, None]
Example
```python async for token in agent.process_message_streaming(“Hello!”):
print(token, end=’’, flush=True)
Note
Subclasses should override this method to provide streaming support. Default implementation falls back to non-streaming process_message.
- async delegate_task(task, required_capabilities=None, target_agent_id=None)[source]
Delegate a task to another capable agent.
- Parameters:
- Returns:
Task execution result from delegated agent
- Raises:
ValueError – If collaboration not enabled or no capable agent found
- Return type:
Example
```python # Delegate to specific agent result = await agent.delegate_task(
task={“description”: “Search for AI papers”}, target_agent_id=”search_agent”
)
# Delegate to any capable agent result = await agent.delegate_task(
task={“description”: “Analyze data”}, required_capabilities=[“data_analysis”, “statistics”]
- async request_peer_review(task, result, reviewer_id=None)[source]
Request peer review of a task result.
- Parameters:
- Returns:
Review result with ‘approved’ (bool), ‘feedback’ (str), ‘reviewer_id’ (str)
- Return type:
- async collaborate_on_task(task, collaborator_ids, strategy='parallel')[source]
Collaborate with other agents on a task.
- Parameters:
- Returns:
Aggregated result based on strategy
- Return type:
- Strategies:
parallel: All agents work simultaneously, results aggregated
sequential: Agents work in order, each building on previous results
consensus: All agents work independently, best result selected by voting
Example
```python # Parallel collaboration result = await agent.collaborate_on_task(
task={“description”: “Analyze market trends”}, collaborator_ids=[“analyst1”, “analyst2”, “analyst3”], strategy=”parallel”
)
# Sequential collaboration (pipeline) result = await agent.collaborate_on_task(
task={“description”: “Research and summarize”}, collaborator_ids=[“researcher”, “summarizer”], strategy=”sequential”
)
# Consensus collaboration result = await agent.collaborate_on_task(
task={“description”: “Make recommendation”}, collaborator_ids=[“expert1”, “expert2”, “expert3”], strategy=”consensus”
- async get_relevant_context(query, context_items, max_items=None, min_relevance_score=0.5)[source]
Get relevant context items using semantic search and relevance scoring.
This method filters and ranks context items based on their relevance to the query, helping agents stay within token limits while maintaining the most important context.
- Parameters:
- Returns:
List of relevant context items, sorted by relevance (highest first)
- Return type:
Example
{“content”: “User prefers concise answers”, “type”: “preference”}, {“content”: “Previous task: data analysis”, “type”: “history”}, {“content”: “System configuration: prod”, “type”: “config”},
]
- relevant = await agent.get_relevant_context(
query=”Analyze sales data”, context_items=context_items, max_items=2, min_relevance_score=0.6
- async score_context_relevance(query, context_item)[source]
Score the relevance of a context item to a query.
Uses multiple signals to determine relevance: - Keyword overlap (basic) - Semantic similarity (if LLM client with embeddings available) - Recency (if timestamp available) - Type priority (if type specified)
- async prune_context(context_items, max_tokens, query=None, preserve_types=None)[source]
Prune context items to fit within token limit.
Uses relevance scoring to keep the most important context while staying within token limits. Optionally preserves certain types of context regardless of relevance.
- Parameters:
- Returns:
Pruned list of context items that fit within token limit
- Return type:
- async record_experience(task, result, approach, tools_used=None)[source]
Record an experience for learning and adaptation.
- async get_recommended_approach(task)[source]
Get recommended approach based on past experiences.
Analyzes similar past experiences to recommend the best approach for the current task.
- Parameters:
- Returns:
Recommended approach dict with ‘approach’, ‘confidence’, ‘reasoning’ or None if no relevant experiences
- Return type:
Example
```python recommendation = await agent.get_recommended_approach(
task={“description”: “Analyze sales data”, “type”: “analysis”}
) if recommendation:
print(f”Recommended: {recommendation[‘approach’]}”) print(f”Confidence: {recommendation[‘confidence’]:.2f}”) print(f”Reasoning: {recommendation[‘reasoning’]}”)
- async get_learning_insights()[source]
Get learning insights and analytics.
Provides analytics about agent learning including success rates, common patterns, and areas for improvement.
Example
`python insights = await agent.get_learning_insights() print(f"Total experiences: {insights['total_experiences']}") print(f"Success rate: {insights['overall_success_rate']:.2%}") print(f"Most common task: {insights['most_common_task_type']}") `
- async adapt_strategy(task)[source]
Adapt strategy based on learning insights.
Analyzes past experiences to suggest strategy adaptations for the current task.
- async check_resource_availability()[source]
Check if resources are available for task execution.
Checks against configured resource limits including: - Concurrent task limits - Token rate limits - Tool call rate limits
- async get_resource_usage()[source]
Get current resource usage statistics.
Example
`python usage = await agent.get_resource_usage() print(f"Active tasks: {usage['active_tasks']}") print(f"Tokens/min: {usage['tokens_per_minute']}") print(f"Tool calls/min: {usage['tool_calls_per_minute']}") `
- async execute_with_recovery(task, context, strategies=None)[source]
Execute task with advanced error recovery strategies.
Tries multiple recovery strategies in sequence until one succeeds: 1. Retry with exponential backoff 2. Simplify task and retry 3. Use fallback approach 4. Delegate to another agent
- Parameters:
- Returns:
Task execution result
- Raises:
TaskExecutionError – If all recovery strategies fail
- Return type:
Context
Context Engine
ContextEngine: Advanced Context and Session Management Engine
This engine extends TaskContext capabilities to provide comprehensive session management, conversation tracking, and persistent storage for BaseAIService.
Key Features: 1. Multi-session management (extends TaskContext from single task to multiple sessions) 2. Redis backend storage for persistence and scalability 3. Conversation history management with optimization 4. Performance metrics and analytics 5. Resource and lifecycle management 6. Integration with BaseServiceCheckpointer
- class aiecs.domain.context.context_engine.DateTimeEncoder[source]
Bases:
JSONEncoderCustom JSON encoder to handle datetime objects.
- default(obj)[source]
Implement this method in a subclass such that it returns a serializable object for
o, or calls the base implementation (to raise aTypeError).For example, to support arbitrary iterators, you could implement default like this:
def default(self, o): try: iterable = iter(o) except TypeError: pass else: return list(iterable) # Let the base class default method raise the TypeError return super().default(o)
- class aiecs.domain.context.context_engine.SessionMetrics[source]
Bases:
objectSession-level performance metrics.
- __init__(session_id, user_id, created_at, last_activity, request_count=0, error_count=0, total_processing_time=0.0, status='active')
- class aiecs.domain.context.context_engine.ConversationMessage[source]
Bases:
objectStructured conversation message.
- class aiecs.domain.context.context_engine.CompressionConfig[source]
Bases:
objectConfiguration for conversation compression.
Provides flexible control over compression behavior with multiple strategies to manage conversation history size and reduce token usage.
Compression Strategies: - truncate: Fast truncation, keeps most recent N messages (no LLM required) - summarize: LLM-based summarization of older messages - semantic: Embedding-based deduplication of similar messages - hybrid: Combination of multiple strategies applied sequentially
Key Features: - Automatic compression triggers based on message count - Custom prompt templates for summarization - Configurable similarity thresholds for semantic deduplication - Performance timeouts to prevent long-running operations
- Attributes:
strategy: Compression strategy to use. One of: “truncate”, “summarize”, “semantic”, “hybrid” max_messages: Maximum messages to keep (for truncation strategy) keep_recent: Always keep N most recent messages (applies to all strategies) summary_prompt_template: Custom prompt template for summarization (uses {messages} placeholder) summary_max_tokens: Maximum tokens for summary output include_summary_in_history: Whether to add summary as system message in history similarity_threshold: Similarity threshold for semantic deduplication (0.0-1.0) embedding_model: Embedding model name for semantic deduplication hybrid_strategies: List of strategies to combine for hybrid mode (default: [“truncate”, “summarize”]) auto_compress_enabled: Enable automatic compression when threshold exceeded auto_compress_threshold: Message count threshold to trigger auto-compression auto_compress_target: Target message count after auto-compression compression_timeout: Maximum time for compression operation in seconds
- Examples:
# Example 1: Basic truncation configuration config = CompressionConfig(
strategy=”truncate”, max_messages=50, keep_recent=10
)
# Example 2: LLM-based summarization config = CompressionConfig(
strategy=”summarize”, keep_recent=10, summary_max_tokens=500, include_summary_in_history=True
)
# Example 3: Semantic deduplication config = CompressionConfig(
strategy=”semantic”, keep_recent=10, similarity_threshold=0.95, embedding_model=”text-embedding-ada-002”
)
# Example 4: Hybrid strategy (truncate then summarize) config = CompressionConfig(
strategy=”hybrid”, hybrid_strategies=[“truncate”, “summarize”], keep_recent=10, summary_max_tokens=500
)
# Example 5: Auto-compression enabled config = CompressionConfig(
auto_compress_enabled=True, auto_compress_threshold=100, auto_compress_target=50, strategy=”summarize”, keep_recent=10
)
# Example 6: Custom summarization prompt config = CompressionConfig(
strategy=”summarize”, summary_prompt_template=(
“Summarize the following conversation focusing on ” “key decisions and action items:
- {messages}”
), summary_max_tokens=300
)
- __init__(strategy='truncate', max_messages=50, keep_recent=10, summary_prompt_template=None, summary_max_tokens=500, include_summary_in_history=True, similarity_threshold=0.95, embedding_model='text-embedding-ada-002', hybrid_strategies=None, auto_compress_enabled=False, auto_compress_threshold=100, auto_compress_target=50, compression_timeout=30.0)
- Parameters:
strategy (str)
max_messages (int)
keep_recent (int)
summary_prompt_template (str | None)
summary_max_tokens (int)
include_summary_in_history (bool)
similarity_threshold (float)
embedding_model (str)
auto_compress_enabled (bool)
auto_compress_threshold (int)
auto_compress_target (int)
compression_timeout (float)
- Return type:
None
- class aiecs.domain.context.context_engine.ContextEngine[source]
Bases:
IStorageBackend,ICheckpointerBackendAdvanced Context and Session Management Engine.
Implements core storage interfaces to provide comprehensive session management with Redis backend storage for BaseAIService and BaseServiceCheckpointer.
This implementation follows the middleware’s core interface pattern, enabling dependency inversion and clean architecture.
Key Features: - Multi-session management with Redis backend - Conversation history management with compression - Performance metrics and analytics - Resource and lifecycle management - Integration with BaseServiceCheckpointer
Compression Strategies: - truncate: Fast truncation (no LLM required) - summarize: LLM-based summarization - semantic: Embedding-based deduplication - hybrid: Combination of multiple strategies
- Examples:
# Example 1: Basic ContextEngine initialization engine = ContextEngine() await engine.initialize()
# Create session session = await engine.create_session(
session_id=”session-123”, user_id=”user-456”
)
# Add conversation messages await engine.add_conversation_message(
session_id=”session-123”, role=”user”, content=”Hello, I need help”
)
# Example 2: ContextEngine with compression (truncation strategy) from aiecs.domain.context.context_engine import CompressionConfig
- compression_config = CompressionConfig(
strategy=”truncate”, max_messages=50, keep_recent=10 # Always keep 10 most recent messages
)
engine = ContextEngine(compression_config=compression_config) await engine.initialize()
# Add many messages for i in range(100):
- await engine.add_conversation_message(
session_id=”session-123”, role=”user” if i % 2 == 0 else “assistant”, content=f”Message {i}”
)
# Compress conversation (truncates to 10 most recent) result = await engine.compress_conversation(“session-123”) print(f”Compressed from {result[‘original_count’]} to {result[‘compressed_count’]} messages”)
# Example 3: ContextEngine with LLM-based summarization from aiecs.llm import OpenAIClient
llm_client = OpenAIClient()
- compression_config = CompressionConfig(
strategy=”summarize”, keep_recent=10, # Keep 10 most recent messages summary_max_tokens=500, include_summary_in_history=True
)
- engine = ContextEngine(
compression_config=compression_config, llm_client=llm_client # Required for summarization
) await engine.initialize()
# Add conversation for i in range(50):
- await engine.add_conversation_message(
session_id=”session-123”, role=”user” if i % 2 == 0 else “assistant”, content=f”Message {i}: Important information about topic {i % 5}”
)
# Compress using summarization result = await engine.compress_conversation(“session-123”, strategy=”summarize”) print(f”Compressed: {result[‘original_count’]} -> {result[‘compressed_count’]} messages”) print(f”Compression ratio: {result[‘compression_ratio’]:.1%}”)
# Example 4: ContextEngine with semantic deduplication compression_config = CompressionConfig(
strategy=”semantic”, keep_recent=10, similarity_threshold=0.95, # Remove messages >95% similar embedding_model=”text-embedding-ada-002”
)
- engine = ContextEngine(
compression_config=compression_config, llm_client=llm_client # Required for embeddings
) await engine.initialize()
# Add conversation with similar messages messages = [
“What’s the weather?”, “What’s the weather today?”, “Tell me about the weather”, “What’s the temperature?”
] for msg in messages:
- await engine.add_conversation_message(
session_id=”session-123”, role=”user”, content=msg
)
# Compress using semantic deduplication result = await engine.compress_conversation(“session-123”, strategy=”semantic”) print(f”Removed {result[‘original_count’] - result[‘compressed_count’]} similar messages”)
# Example 5: ContextEngine with hybrid compression compression_config = CompressionConfig(
strategy=”hybrid”, hybrid_strategies=[“truncate”, “summarize”], # Apply truncate then summarize keep_recent=10, summary_max_tokens=500
)
- engine = ContextEngine(
compression_config=compression_config, llm_client=llm_client
) await engine.initialize()
# Compress using hybrid strategy result = await engine.compress_conversation(“session-123”, strategy=”hybrid”)
# Example 6: Auto-compression on message limit compression_config = CompressionConfig(
auto_compress_enabled=True, auto_compress_threshold=100, # Trigger at 100 messages auto_compress_target=50, # Compress to 50 messages strategy=”summarize”, keep_recent=10
)
- engine = ContextEngine(
compression_config=compression_config, llm_client=llm_client
) await engine.initialize()
# Add messages - auto-compression triggers at 100 for i in range(105):
- await engine.add_conversation_message(
session_id=”session-123”, role=”user” if i % 2 == 0 else “assistant”, content=f”Message {i}”
)
# Check if auto-compression was triggered result = await engine.auto_compress_on_limit(“session-123”) if result:
print(f”Auto-compressed: {result[‘original_count’]} -> {result[‘compressed_count’]}”)
# Example 7: Custom compression prompt template compression_config = CompressionConfig(
strategy=”summarize”, summary_prompt_template=(
“Summarize the following conversation focusing on key decisions, ” “action items, and important facts. Keep it concise:
- {messages}”
), summary_max_tokens=300
)
- engine = ContextEngine(
compression_config=compression_config, llm_client=llm_client
) await engine.initialize()
# Compress with custom prompt result = await engine.compress_conversation(“session-123”)
# Example 8: Get compressed context in different formats engine = ContextEngine(compression_config=compression_config, llm_client=llm_client) await engine.initialize()
# Get as formatted string context_string = await engine.get_compressed_context(
session_id=”session-123”, format=”string”, compress_first=True # Compress before returning
) print(context_string)
# Get as messages list messages = await engine.get_compressed_context(
session_id=”session-123”, format=”messages”, compress_first=False # Use existing compressed version
)
# Get as dictionary context_dict = await engine.get_compressed_context(
session_id=”session-123”, format=”dict”
)
# Example 9: Runtime compression config override engine = ContextEngine(
compression_config=CompressionConfig(strategy=”truncate”), llm_client=llm_client
) await engine.initialize()
# Override compression config for specific operation custom_config = CompressionConfig(
strategy=”summarize”, summary_max_tokens=1000
)
- result = await engine.compress_conversation(
session_id=”session-123”, config_override=custom_config
)
# Example 10: Compression with custom LLM client class CustomLLMClient:
provider_name = “custom”
- async def generate_text(self, messages, **kwargs):
# Custom summarization logic return LLMResponse(content=”Custom summary…”)
- async def get_embeddings(self, texts, model):
# Custom embedding logic return [[0.1] * 1536 for _ in texts]
custom_llm = CustomLLMClient()
compression_config = CompressionConfig(strategy=”semantic”) engine = ContextEngine(
compression_config=compression_config, llm_client=custom_llm # Custom LLM client for compression
) await engine.initialize()
# Compress using custom LLM client result = await engine.compress_conversation(“session-123”, strategy=”semantic”)
- __init__(use_existing_redis=True, compression_config=None, llm_client=None, permanent_backend=None)[source]
Initialize ContextEngine.
- Parameters:
use_existing_redis (bool) – Whether to use the existing Redis client from infrastructure (已弃用: 现在总是创建独立的 RedisClient 实例以避免事件循环冲突)
compression_config (CompressionConfig | None) – Optional compression configuration for conversation compression
llm_client (Any | None) – Optional LLM client for summarization and embeddings (must implement LLMClientProtocol)
permanent_backend (IPermanentStorageBackend | None) – Optional backend for dual-write disk persistence (e.g. ClickHouse). Writes are fire-and-forget; failures do not block Redis path.
- async update_session(session_id, updates=None, increment_requests=False, add_processing_time=0.0, mark_error=False)[source]
Update session with activity and metrics.
- async add_conversation_message(session_id, role, content, metadata=None)[source]
Add message to conversation history.
- async get_conversation_history(session_id, limit=50)[source]
Get conversation history for a session.
- async get_task_context(session_id)[source]
Get TaskContext for a session.
- Parameters:
session_id (str)
- Return type:
TaskContext | None
- async store_checkpoint(thread_id, checkpoint_id, checkpoint_data, metadata=None)[source]
Store checkpoint data for LangGraph workflows.
Automatically converts dataclasses to dictionaries to ensure JSON serialization compatibility.
- async get_checkpoint(thread_id, checkpoint_id=None)[source]
Get checkpoint data. If checkpoint_id is None, get the latest.
- async list_checkpoints(thread_id, limit=10)[source]
List checkpoints for a thread, ordered by creation time (newest first).
- async cleanup_expired_sessions(max_idle_hours=24)[source]
Clean up expired sessions and associated data.
- async put_checkpoint(thread_id, checkpoint_id, checkpoint_data, metadata=None)[source]
Store a checkpoint for LangGraph workflows (ICheckpointerBackend interface).
- async put_writes(thread_id, checkpoint_id, task_id, writes_data)[source]
Store intermediate writes for a checkpoint (ICheckpointerBackend interface).
- async get_writes(thread_id, checkpoint_id)[source]
Get intermediate writes for a checkpoint (ICheckpointerBackend interface).
- async store_task_context(session_id, context)[source]
Store TaskContext for a session (ITaskContextStorage interface).
- async create_conversation_session(session_id, participants, session_type, metadata=None)[source]
Create an isolated conversation session between participants.
- Parameters:
- Returns:
Generated session key for conversation isolation
- Return type:
- async add_agent_communication_message(session_key, sender_id, sender_type, sender_role, recipient_id, recipient_type, recipient_role, content, message_type='communication', metadata=None)[source]
Add a message to an agent communication session.
- Parameters:
session_key (str) – Isolated session key
sender_id (str) – ID of the sender
sender_type (str) – Type of sender (‘master_controller’, ‘agent’, ‘user’)
sender_role (str | None) – Role of sender (for agents)
recipient_id (str) – ID of the recipient
recipient_type (str) – Type of recipient
recipient_role (str | None) – Role of recipient (for agents)
content (str) – Message content
message_type (str) – Type of message
metadata (Dict[str, Any] | None) – Additional message metadata
- Returns:
Success status
- Return type:
- async get_agent_conversation_history(session_key, limit=50, message_types=None)[source]
Get conversation history for an agent communication session.
- async compress_conversation(session_id, strategy=None, config_override=None)[source]
Compress conversation history using specified strategy.
- Parameters:
session_id (str) – Session ID to compress
strategy (str | None) – Compression strategy (overrides config if provided)
config_override (CompressionConfig | None) – Override compression config for this operation
- Returns:
- {
“success”: bool, “strategy”: str, “original_count”: int, “compressed_count”: int, “compression_ratio”: float, “tokens_saved”: int (if applicable), “time_taken”: float
}
- Return type:
Dictionary with compression results
Example
- result = await engine.compress_conversation(
session_id=”session-123”, strategy=”summarize”
) print(f”Compressed from {result[‘original_count’]} to {result[‘compressed_count’]} messages”)
- async auto_compress_on_limit(session_id)[source]
Automatically compress conversation if it exceeds threshold.
Checks if conversation exceeds auto_compress_threshold and compresses to auto_compress_target if needed.
- Parameters:
session_id (str) – Session ID to check
- Returns:
Compression result dict if compression was triggered, None otherwise
- Return type:
Example
# Configure auto-compression config = CompressionConfig(
auto_compress_enabled=True, auto_compress_threshold=100, auto_compress_target=50
) engine = ContextEngine(compression_config=config)
# Check and auto-compress if needed result = await engine.auto_compress_on_limit(session_id) if result:
print(f”Auto-compressed: {result[‘original_count’]} -> {result[‘compressed_count’]}”)
- async get_compressed_context(session_id, format='messages', compress_first=False)[source]
Get conversation context in compressed format.
- Parameters:
- Returns:
“messages”: List[ConversationMessage]
”string”: Formatted string
”dict”: List[Dict[str, Any]]
- Return type:
Conversation in requested format
Example
# Get as formatted string context = await engine.get_compressed_context(
session_id=”session-123”, format=”string”
) print(context)
# Get as messages, compress first messages = await engine.get_compressed_context(
session_id=”session-456”, format=”messages”, compress_first=True
)
Task
Task Context
- class aiecs.domain.task.task_context.ContextUpdate[source]
Bases:
objectRepresents a single update to the context (e.g., message, metadata, or resource).
- class aiecs.domain.task.task_context.TaskContext[source]
Bases:
objectEnhanced context manager for task execution with: - Context history tracking and checkpointing - Resource acquisition and release - Performance tracking - File and model tracking - Persistent storage - Metadata toggles - Enhanced error handling
- add_context_update(update_type, data, metadata=None)[source]
Add a context update (e.g., message, metadata change).
- track_file_operation(file_path, operation, source='task')[source]
Track a file operation (e.g., read, edit).
- aiecs.domain.task.task_context.build_context(data)[source]
Build a simple context dictionary (for backward compatibility).
- aiecs.domain.task.task_context.task_context(data, task_dir='./tasks')[source]
Async context manager for task execution.
- Usage:
- async with task_context(request_data, task_dir=”/path/to/tasks”) as context:
context.add_context_update(“message”, “User input”, {“source”: “user”}) context.track_file_operation(“example.py”, “read”, “tool”) result = await service_instance.run(data, context)
- Parameters:
- Return type:
AsyncGenerator[TaskContext, None]