Agent Domain API Reference
This document provides a comprehensive API reference for all agent domain classes, methods, and models.
Table of Contents
BaseAIAgent
Abstract base class for all AI agents providing core functionality for lifecycle management, state management, memory, goals, and metrics tracking.
Module: aiecs.domain.agent.base_agent
from aiecs.domain.agent import BaseAIAgent
from aiecs.domain.agent.models import AgentType, AgentConfiguration
class BaseAIAgent(ABC):
def __init__(
self,
agent_id: str,
name: str,
agent_type: AgentType,
config: AgentConfiguration,
description: Optional[str] = None,
version: str = "1.0.0",
tools: Optional[Union[List[str], Dict[str, BaseTool]]] = None,
llm_client: Optional[LLMClientProtocol] = None,
config_manager: Optional[ConfigManagerProtocol] = None,
checkpointer: Optional[CheckpointerProtocol] = None,
context_engine: Optional[ContextEngine] = None,
collaboration_enabled: bool = False,
agent_registry: Optional[Dict[str, Any]] = None,
learning_enabled: bool = False,
resource_limits: Optional[ResourceLimits] = None,
)
Lifecycle Methods
async initialize() -> None
Initialize the agent and prepare it for use.
Raises:
AgentInitializationError: If initialization fails
Example:
agent = HybridAgent(...)
await agent.initialize()
async activate() -> None
Activate the agent, transitioning it to ACTIVE state.
Raises:
InvalidStateTransitionError: If transition is invalid
async deactivate() -> None
Deactivate the agent, transitioning it to INACTIVE state.
async shutdown() -> None
Shutdown the agent, cleaning up resources.
State Management
state() -> AgentState
Get current agent state (property).
Returns: AgentState - Current agent state
get_state() -> AgentState
Get current agent state.
Returns: AgentState - Current agent state
Task Execution
async execute_task(task: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]
Execute a task (abstract method, implemented by subclasses).
Parameters:
task(Dict[str, Any]): Task specificationcontext(Dict[str, Any]): Execution context
Returns: Dict[str, Any] - Task execution result
Raises:
TaskExecutionError: If task execution fails
async process_message(message: str, sender_id: Optional[str] = None) -> Dict[str, Any]
Process an incoming message (abstract method, implemented by subclasses).
Parameters:
message(str): Message contentsender_id(Optional[str]): Optional sender identifier
Returns: Dict[str, Any] - Response dictionary
Tool Execution
async execute_tool(tool_name: str, parameters: Dict[str, Any]) -> Any
Execute a tool by name.
Parameters:
tool_name(str): Name of the toolparameters(Dict[str, Any]): Tool parameters
Returns: Any - Tool execution result
Raises:
ToolAccessDeniedError: If tool access is denied
async execute_tools_parallel(tool_calls: List[Dict[str, Any]], max_concurrency: int = 5) -> List[Dict[str, Any]]
Execute multiple tools in parallel with concurrency limit.
Parameters:
tool_calls(List[Dict[str, Any]]): List of tool call dicts with ‘tool_name’ and ‘parameters’max_concurrency(int): Maximum number of concurrent tool executions (default: 5)
Returns: List[Dict[str, Any]] - List of results in same order as tool_calls
Example:
results = await agent.execute_tools_parallel([
{"tool_name": "search", "parameters": {"query": "AI"}},
{"tool_name": "calculator", "parameters": {"operation": "add", "a": 1, "b": 2}}
], max_concurrency=3)
async execute_tool_with_cache(tool_name: str, parameters: Dict[str, Any]) -> Any
Execute tool with caching support.
Parameters:
tool_name(str): Name of the toolparameters(Dict[str, Any]): Tool parameters
Returns: Any - Tool result (from cache or fresh execution)
Example:
result = await agent.execute_tool_with_cache("search", {"query": "AI"})
invalidate_cache(tool_name: Optional[str] = None, pattern: Optional[str] = None) -> int
Invalidate cache entries.
Parameters:
tool_name(Optional[str]): Specific tool name to invalidate (None = all)pattern(Optional[str]): Pattern to match cache keys
Returns: int - Number of entries invalidated
get_cache_stats() -> Dict[str, Any]
Get cache statistics.
Returns: Dict[str, Any] - Cache statistics including size, hits, misses, hit_rate
Streaming
async execute_task_streaming(task: Dict[str, Any], context: Dict[str, Any]) -> AsyncIterator[Dict[str, Any]]
Execute task with streaming response.
Parameters:
task(Dict[str, Any]): Task specificationcontext(Dict[str, Any]): Execution context
Yields: Dict[str, Any] - Event dictionaries with type (‘token’, ‘tool_call’, ‘tool_result’, ‘status’, ‘result’, ‘error’)
Example:
async for event in agent.execute_task_streaming(task, context):
if event['type'] == 'token':
print(event['content'], end='', flush=True)
elif event['type'] == 'tool_call':
print(f"Calling {event['tool_name']}...")
async process_message_streaming(message: str, sender_id: Optional[str] = None) -> AsyncIterator[str]
Process a message with streaming response.
Parameters:
message(str): Message contentsender_id(Optional[str]): Optional sender identifier
Yields: str - Response text tokens/chunks
Memory Management
async add_to_memory(key: str, value: Any, memory_type: MemoryType = MemoryType.SHORT_TERM) -> None
Add value to agent memory.
Parameters:
key(str): Memory keyvalue(Any): Value to storememory_type(MemoryType): Type of memory (default: SHORT_TERM)
async retrieve_memory(key: str, default: Any = None) -> Any
Retrieve value from agent memory.
Parameters:
key(str): Memory keydefault(Any): Default value if key not found
Returns: Any - Stored value or default
async clear_memory(memory_type: Optional[MemoryType] = None) -> None
Clear agent memory.
Parameters:
memory_type(Optional[MemoryType]): Type of memory to clear (None = all)
get_memory_summary() -> Dict[str, Any]
Get memory summary.
Returns: Dict[str, Any] - Memory summary with counts and sizes
Goals Management
set_goal(goal_id: str, description: str, priority: GoalPriority = GoalPriority.MEDIUM, deadline: Optional[datetime] = None) -> AgentGoal
Set a goal for the agent.
Parameters:
goal_id(str): Unique goal identifierdescription(str): Goal descriptionpriority(GoalPriority): Goal priority (default: MEDIUM)deadline(Optional[datetime]): Optional deadline
Returns: AgentGoal - Created goal object
get_goals(status: Optional[GoalStatus] = None) -> List[AgentGoal]
Get agent goals, optionally filtered by status.
Parameters:
status(Optional[GoalStatus]): Filter by status (None = all)
Returns: List[AgentGoal] - List of goals
get_goal(goal_id: str) -> Optional[AgentGoal]
Get a specific goal by ID.
Parameters:
goal_id(str): Goal identifier
Returns: Optional[AgentGoal] - Goal object or None
update_goal_status(goal_id: str, status: GoalStatus, notes: Optional[str] = None) -> None
Update goal status.
Parameters:
goal_id(str): Goal identifierstatus(GoalStatus): New statusnotes(Optional[str]): Optional notes
Configuration Management
get_config() -> AgentConfiguration
Get current agent configuration.
Returns: AgentConfiguration - Current configuration
update_config(updates: Dict[str, Any]) -> None
Update agent configuration.
Parameters:
updates(Dict[str, Any]): Configuration updates
Note: Uses config manager if available, otherwise updates in-memory config
Capabilities
declare_capability(capability_type: str, description: str, level: CapabilityLevel = CapabilityLevel.BASIC) -> None
Declare an agent capability.
Parameters:
capability_type(str): Capability typedescription(str): Capability descriptionlevel(CapabilityLevel): Capability level (default: BASIC)
has_capability(capability_type: str) -> bool
Check if agent has a capability.
Parameters:
capability_type(str): Capability type
Returns: bool - True if agent has capability
get_capabilities() -> List[AgentCapabilityDeclaration]
Get all declared capabilities.
Returns: List[AgentCapabilityDeclaration] - List of capabilities
Metrics and Performance
get_metrics() -> AgentMetrics
Get agent metrics.
Returns: AgentMetrics - Current metrics
update_metrics(execution_time: Optional[float] = None, success: Optional[bool] = None, tokens_used: Optional[int] = None, tool_calls: Optional[int] = None) -> None
Update agent metrics.
Parameters:
execution_time(Optional[float]): Execution time in secondssuccess(Optional[bool]): Whether operation succeededtokens_used(Optional[int]): Tokens usedtool_calls(Optional[int]): Number of tool calls
track_operation_time(operation_name: str) -> OperationTimer
Track operation execution time (context manager).
Parameters:
operation_name(str): Operation name
Returns: OperationTimer - Context manager for tracking
Example:
with agent.track_operation_time("data_processing"):
result = await agent.execute_task(task, context)
get_performance_metrics() -> Dict[str, Any]
Get performance metrics.
Returns: Dict[str, Any] - Performance metrics including avg_response_time, p95_response_time, p99_response_time, operation_metrics
get_health_status() -> Dict[str, Any]
Get agent health status.
Returns: Dict[str, Any] - Health status with score, status, issues, recommendations
get_comprehensive_status() -> Dict[str, Any]
Get comprehensive agent status.
Returns: Dict[str, Any] - Complete status including state, metrics, health, goals, capabilities
reset_metrics() -> None
Reset agent metrics.
Serialization
to_dict() -> Dict[str, Any]
Serialize agent to dictionary.
Returns: Dict[str, Any] - Serialized agent data
@classmethod from_dict(data: Dict[str, Any]) -> BaseAIAgent
Deserialize agent from dictionary.
Parameters:
data(Dict[str, Any]): Serialized agent data
Returns: BaseAIAgent - Deserialized agent instance
Checkpointing
async save_checkpoint(session_id: Optional[str] = None) -> Optional[str]
Save agent state checkpoint.
Parameters:
session_id(Optional[str]): Optional session ID
Returns: Optional[str] - Checkpoint ID if saved
Raises:
SerializationError: If serialization fails
async load_checkpoint(checkpoint_id: str) -> bool
Load agent state from checkpoint.
Parameters:
checkpoint_id(str): Checkpoint ID
Returns: bool - True if loaded successfully
Raises:
SerializationError: If deserialization fails
Agent Collaboration
async delegate_task(task_description: str, target_agent_id: Optional[str] = None, required_capabilities: Optional[List[str]] = None, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]
Delegate task to another agent.
Parameters:
task_description(str): Task descriptiontarget_agent_id(Optional[str]): Specific agent ID (None = auto-select)required_capabilities(Optional[List[str]]): Required capabilitiescontext(Optional[Dict[str, Any]]): Additional context
Returns: Dict[str, Any] - Delegated task result
Raises:
ValueError: If collaboration not enabled or agent not found
async find_capable_agents(required_capabilities: List[str]) -> List[Any]
Find agents with required capabilities.
Parameters:
required_capabilities(List[str]): Required capabilities
Returns: List[Any] - List of capable agents
async request_peer_review(task: Dict[str, Any], result: Dict[str, Any], reviewer_id: Optional[str] = None) -> Dict[str, Any]
Request peer review of a task result.
Parameters:
task(Dict[str, Any]): Original task specificationresult(Dict[str, Any]): Task execution result to reviewreviewer_id(Optional[str]): Specific reviewer agent ID (None = auto-select)
Returns: Dict[str, Any] - Review result with ‘approved’ (bool), ‘feedback’ (str), ‘reviewer_id’ (str)
Raises:
ValueError: If collaboration not enabled or reviewer not found
async collaborate_on_task(task: Dict[str, Any], strategy: str = "parallel", required_capabilities: Optional[List[str]] = None, agent_weights: Optional[Dict[str, float]] = None) -> Dict[str, Any]
Collaborate on task with multiple agents.
Parameters:
task(Dict[str, Any]): Task specificationstrategy(str): Collaboration strategy (‘parallel’, ‘sequential’, ‘consensus’, ‘weighted_consensus’)required_capabilities(Optional[List[str]]): Required capabilitiesagent_weights(Optional[Dict[str, float]]): Agent weights for weighted consensus
Returns: Dict[str, Any] - Collaboration result
Learning and Adaptation
async record_experience(task: Dict[str, Any], result: Dict[str, Any], approach: str, tools_used: Optional[List[str]] = None) -> None
Record an experience for learning and adaptation.
Parameters:
task(Dict[str, Any]): Task specificationresult(Dict[str, Any]): Task execution resultapproach(str): Approach/strategy usedtools_used(Optional[List[str]]): List of tools used
async get_recommended_approach(task: Dict[str, Any]) -> Optional[Dict[str, Any]]
Get recommended approach based on past experiences.
Parameters:
task(Dict[str, Any]): Task specification
Returns: Optional[Dict[str, Any]] - Recommended approach with ‘approach’ (str), ‘confidence’ (float), ‘experience_count’ (int)
async get_learning_insights() -> Dict[str, Any]
Get learning insights from experiences.
Returns: Dict[str, Any] - Learning insights including successful_approaches, failed_approaches, recommendations
async adapt_strategy(task: Dict[str, Any]) -> Dict[str, Any]
Adapt strategy based on past experiences.
Parameters:
task(Dict[str, Any]): Task specification
Returns: Dict[str, Any] - Adapted strategy
Resource Management
async check_resource_availability() -> Dict[str, Any]
Check if resources are available for task execution.
Returns: Dict[str, Any] - Resource status with ‘available’ (bool), ‘reason’ (str), details
async wait_for_resources(timeout: Optional[float] = None) -> bool
Wait for resources to become available.
Parameters:
timeout(Optional[float]): Maximum wait time in seconds (None = use default)
Returns: bool - True if resources became available
async get_resource_usage() -> Dict[str, Any]
Get current resource usage.
Returns: Dict[str, Any] - Resource usage statistics
Error Recovery
async execute_with_recovery(task: Dict[str, Any], context: Dict[str, Any], strategies: Optional[List[str]] = None) -> Dict[str, Any]
Execute task with recovery strategies.
Parameters:
task(Dict[str, Any]): Task specificationcontext(Dict[str, Any]): Execution contextstrategies(Optional[List[str]]): Recovery strategies (None = use default)
Returns: Dict[str, Any] - Task execution result
Raises:
TaskExecutionError: If all recovery strategies fail
Context Management
async get_relevant_context(query: str, max_items: int = 10, min_score: float = 0.5) -> List[Dict[str, Any]]
Get relevant context items for a query.
Parameters:
query(str): Query stringmax_items(int): Maximum number of items (default: 10)min_score(float): Minimum relevance score (default: 0.5)
Returns: List[Dict[str, Any]] - List of relevant context items
async score_context_relevance(context_items: List[Dict[str, Any]], query: str) -> List[float]
Score context items for relevance to query.
Parameters:
context_items(List[Dict[str, Any]]): Context items to scorequery(str): Query string
Returns: List[float] - Relevance scores
async prune_context(context_items: List[Dict[str, Any]], max_tokens: int, query: Optional[str] = None, preserve_types: Optional[List[str]] = None) -> List[Dict[str, Any]]
Prune context items to fit within token limit.
Parameters:
context_items(List[Dict[str, Any]]): Context items to prunemax_tokens(int): Maximum tokensquery(Optional[str]): Optional query for relevance scoringpreserve_types(Optional[List[str]]): Types to preserve
Returns: List[Dict[str, Any]] - Pruned context items
Utility Methods
is_available() -> bool
Check if agent is available.
Returns: bool - True if agent is available
is_busy() -> bool
Check if agent is busy.
Returns: bool - True if agent is busy
Concrete Agent Types
LLMAgent
LLM-powered agent for text generation and reasoning.
Module: aiecs.domain.agent.llm_agent
from aiecs.domain.agent import LLMAgent
from aiecs.llm import BaseLLMClient
class LLMAgent(BaseAIAgent):
def __init__(
self,
agent_id: str,
name: str,
llm_client: Union[BaseLLMClient, LLMClientProtocol],
config: AgentConfiguration,
description: Optional[str] = None,
version: str = "1.0.0",
config_manager: Optional[ConfigManagerProtocol] = None,
checkpointer: Optional[CheckpointerProtocol] = None,
context_engine: Optional[ContextEngine] = None,
collaboration_enabled: bool = False,
agent_registry: Optional[Dict[str, Any]] = None,
learning_enabled: bool = False,
resource_limits: Optional[ResourceLimits] = None,
)
Methods
All methods from BaseAIAgent plus:
async execute_task(task: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]
Execute a task using LLM.
Parameters:
task(Dict[str, Any]): Task specification with ‘description’, ‘prompt’, or ‘task’ fieldcontext(Dict[str, Any]): Execution context
Returns: Dict[str, Any] - Execution result with ‘output’, ‘execution_time’, ‘tokens_used’
async process_message(message: str, sender_id: Optional[str] = None) -> Dict[str, Any]
Process a message using LLM.
Parameters:
message(str): Message contentsender_id(Optional[str]): Optional sender identifier
Returns: Dict[str, Any] - Response dictionary with ‘response’, ‘execution_time’
async execute_task_streaming(task: Dict[str, Any], context: Dict[str, Any]) -> AsyncIterator[Dict[str, Any]]
Execute task with streaming response.
Parameters:
task(Dict[str, Any]): Task specificationcontext(Dict[str, Any]): Execution context
Yields: Dict[str, Any] - Event dictionaries with ‘type’ (‘token’, ‘result’, ‘error’)
ToolAgent
Agent specialized in tool selection and execution.
Module: aiecs.domain.agent.tool_agent
from aiecs.domain.agent import ToolAgent
class ToolAgent(BaseAIAgent):
def __init__(
self,
agent_id: str,
name: str,
tools: Union[List[str], Dict[str, BaseTool]],
config: AgentConfiguration,
description: Optional[str] = None,
version: str = "1.0.0",
config_manager: Optional[ConfigManagerProtocol] = None,
checkpointer: Optional[CheckpointerProtocol] = None,
context_engine: Optional[ContextEngine] = None,
collaboration_enabled: bool = False,
agent_registry: Optional[Dict[str, Any]] = None,
learning_enabled: bool = False,
resource_limits: Optional[ResourceLimits] = None,
)
Methods
All methods from BaseAIAgent plus:
async execute_task(task: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]
Execute a task using tools.
Parameters:
task(Dict[str, Any]): Task specification with ‘tool’, ‘operation’, and ‘parameters’context(Dict[str, Any]): Execution context
Returns: Dict[str, Any] - Execution result with ‘output’, ‘tool_used’, ‘execution_time’
Raises:
TaskExecutionError: If task execution failsToolAccessDeniedError: If tool access is denied
HybridAgent
Hybrid agent combining LLM reasoning with tool execution (ReAct pattern).
Module: aiecs.domain.agent.hybrid_agent
from aiecs.domain.agent import HybridAgent
class HybridAgent(BaseAIAgent):
def __init__(
self,
agent_id: str,
name: str,
llm_client: Union[BaseLLMClient, LLMClientProtocol],
tools: Union[List[str], Dict[str, BaseTool]],
config: AgentConfiguration,
description: Optional[str] = None,
version: str = "1.0.0",
config_manager: Optional[ConfigManagerProtocol] = None,
checkpointer: Optional[CheckpointerProtocol] = None,
context_engine: Optional[ContextEngine] = None,
collaboration_enabled: bool = False,
agent_registry: Optional[Dict[str, Any]] = None,
learning_enabled: bool = False,
resource_limits: Optional[ResourceLimits] = None,
max_iterations: int = 10,
)
Methods
All methods from BaseAIAgent plus:
async execute_task(task: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]
Execute a task using ReAct pattern (Reason → Act → Observe).
Parameters:
task(Dict[str, Any]): Task specification with ‘description’ or ‘prompt’context(Dict[str, Any]): Execution context
Returns: Dict[str, Any] - Execution result with ‘output’, ‘steps’, ‘iterations’, ‘execution_time’
async process_message(message: str, sender_id: Optional[str] = None) -> Dict[str, Any]
Process a message using ReAct pattern.
Parameters:
message(str): Message contentsender_id(Optional[str]): Optional sender identifier
Returns: Dict[str, Any] - Response dictionary
async execute_task_streaming(task: Dict[str, Any], context: Dict[str, Any]) -> AsyncIterator[Dict[str, Any]]
Execute task with streaming response (tokens + tool calls).
Parameters:
task(Dict[str, Any]): Task specificationcontext(Dict[str, Any]): Execution context
Yields: Dict[str, Any] - Event dictionaries with ‘type’ (‘token’, ‘tool_call’, ‘tool_result’, ‘status’, ‘result’)
async _execute_tool_with_observation(tool_name: str, operation: Optional[str], parameters: Dict[str, Any]) -> ToolObservation
Execute a tool and return structured observation.
Parameters:
tool_name(str): Name of the tool to executeoperation(Optional[str]): Optional operation nameparameters(Dict[str, Any]): Tool parameters
Returns: ToolObservation - Structured observation with execution details
get_available_tools() -> List[str]
Get list of available tools.
Returns: List[str] - List of tool names
ConversationMemory and Session
ConversationMemory
Multi-turn conversation handling with session management.
Module: aiecs.domain.agent.memory.conversation
from aiecs.domain.agent.memory import ConversationMemory
class ConversationMemory:
def __init__(
self,
agent_id: str,
context_engine: Optional[ContextEngine] = None,
max_history_length: int = 100,
)
Methods
async add_message(role: str, content: str, session_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> None
Add a message to conversation history.
Parameters:
role(str): Message role (‘user’, ‘assistant’, ‘system’)content(str): Message contentsession_id(Optional[str]): Optional session IDmetadata(Optional[Dict[str, Any]]): Optional metadata
async get_messages(session_id: Optional[str] = None, limit: Optional[int] = None) -> List[LLMMessage]
Get conversation messages.
Parameters:
session_id(Optional[str]): Optional session IDlimit(Optional[int]): Optional limit on number of messages
Returns: List[LLMMessage] - List of messages
async clear(session_id: Optional[str] = None) -> None
Clear conversation history.
Parameters:
session_id(Optional[str]): Optional session ID (None = all sessions)
async get_session(session_id: str) -> Optional[Session]
Get a session by ID.
Parameters:
session_id(str): Session ID
Returns: Optional[Session] - Session object or None
async create_session(session_id: Optional[str] = None, user_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> Session
Create a new session.
Parameters:
session_id(Optional[str]): Optional session ID (auto-generated if None)user_id(Optional[str]): Optional user IDmetadata(Optional[Dict[str, Any]]): Optional metadata
Returns: Session - Created session
async end_session(session_id: str, status: str = "completed") -> None
End a session.
Parameters:
session_id(str): Session IDstatus(str): Session status (‘completed’, ‘failed’, ‘expired’)
async list_sessions(active_only: bool = False) -> List[Session]
List all sessions.
Parameters:
active_only(bool): Only return active sessions
Returns: List[Session] - List of sessions
Session
Conversation session with lifecycle management and metrics tracking.
Module: aiecs.domain.agent.memory.conversation
from aiecs.domain.agent.memory import Session
@dataclass
class Session:
session_id: str
agent_id: str
user_id: Optional[str] = None
created_at: datetime = field(default_factory=datetime.utcnow)
last_activity: datetime = field(default_factory=datetime.utcnow)
status: str = "active"
metadata: Dict[str, Any] = field(default_factory=dict)
messages: List[LLMMessage] = field(default_factory=list)
request_count: int = 0
error_count: int = 0
total_processing_time: float = 0.0
Methods
add_message(role: str, content: str, metadata: Optional[Dict[str, Any]] = None) -> None
Add a message to the session.
Parameters:
role(str): Message rolecontent(str): Message contentmetadata(Optional[Dict[str, Any]]): Optional metadata
track_request(processing_time: float, is_error: bool = False) -> None
Track a request in the session.
Parameters:
processing_time(float): Processing time in secondsis_error(bool): Whether request resulted in error
get_metrics() -> Dict[str, Any]
Get session metrics.
Returns: Dict[str, Any] - Metrics including request_count, error_count, average_processing_time, message_count
is_active() -> bool
Check if session is active.
Returns: bool - True if session is active
end(status: str = "completed") -> None
End the session.
Parameters:
status(str): Session status (‘completed’, ‘failed’, ‘expired’)
is_expired(max_idle_seconds: int = 1800) -> bool
Check if session is expired.
Parameters:
max_idle_seconds(int): Maximum idle time in seconds (default: 1800 = 30 minutes)
Returns: bool - True if session is expired
ContextEngine
Advanced context and session management engine with Redis backend storage.
Module: aiecs.domain.context.context_engine
from aiecs.domain.context import ContextEngine
class ContextEngine:
def __init__(
self,
storage_backend: Optional[IStorageBackend] = None,
compression_config: Optional[CompressionConfig] = None,
)
Initialization
async initialize() -> None
Initialize the context engine.
Raises:
RuntimeError: If initialization fails
Session Management
async create_session(session_id: Optional[str] = None, user_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> str
Create a new session.
Parameters:
session_id(Optional[str]): Optional session ID (auto-generated if None)user_id(Optional[str]): Optional user IDmetadata(Optional[Dict[str, Any]]): Optional metadata
Returns: str - Session ID
async get_session(session_id: str) -> Optional[SessionMetrics]
Get session metrics.
Parameters:
session_id(str): Session ID
Returns: Optional[SessionMetrics] - Session metrics or None
async end_session(session_id: str, status: str = "completed") -> None
End a session.
Parameters:
session_id(str): Session IDstatus(str): Session status
Conversation History
async add_message(session_id: str, role: str, content: str, metadata: Optional[Dict[str, Any]] = None) -> None
Add a message to conversation history.
Parameters:
session_id(str): Session IDrole(str): Message rolecontent(str): Message contentmetadata(Optional[Dict[str, Any]]): Optional metadata
async get_messages(session_id: str, limit: Optional[int] = None, offset: int = 0) -> List[ConversationMessage]
Get conversation messages.
Parameters:
session_id(str): Session IDlimit(Optional[int]): Optional limitoffset(int): Offset for pagination
Returns: List[ConversationMessage] - List of messages
async clear_messages(session_id: str) -> None
Clear conversation messages for a session.
Parameters:
session_id(str): Session ID
Compression Methods
async compress_conversation(session_id: str, strategy: Optional[str] = None, target_length: Optional[int] = None, custom_prompt: Optional[str] = None) -> Dict[str, Any]
Compress conversation history.
Parameters:
session_id(str): Session IDstrategy(Optional[str]): Compression strategy (‘truncate’, ‘summarize’, ‘semantic’, ‘hybrid’) (None = use config)target_length(Optional[int]): Target length in messages (None = use config)custom_prompt(Optional[str]): Custom compression prompt (None = use default)
Returns: Dict[str, Any] - Compression result with ‘original_length’, ‘compressed_length’, ‘strategy’, ‘summary’
Example:
result = await context_engine.compress_conversation(
session_id="session-123",
strategy="summarize",
target_length=10
)
async summarize_conversation(session_id: str, max_messages: Optional[int] = None, custom_prompt: Optional[str] = None) -> str
Summarize conversation history.
Parameters:
session_id(str): Session IDmax_messages(Optional[int]): Maximum messages to summarize (None = all)custom_prompt(Optional[str]): Custom summarization prompt
Returns: str - Conversation summary
async truncate_conversation(session_id: str, keep_last: int) -> int
Truncate conversation history, keeping only the last N messages.
Parameters:
session_id(str): Session IDkeep_last(int): Number of messages to keep
Returns: int - Number of messages removed
async semantic_compress_conversation(session_id: str, query: Optional[str] = None, max_messages: Optional[int] = None) -> Dict[str, Any]
Semantically compress conversation based on relevance.
Parameters:
session_id(str): Session IDquery(Optional[str]): Optional query for relevance scoringmax_messages(Optional[int]): Maximum messages to keep
Returns: Dict[str, Any] - Compression result
Context Storage
async store_context(session_id: str, key: str, value: Any, metadata: Optional[Dict[str, Any]] = None) -> None
Store context data.
Parameters:
session_id(str): Session IDkey(str): Context keyvalue(Any): Context valuemetadata(Optional[Dict[str, Any]]): Optional metadata
async get_context(session_id: str, key: str, default: Any = None) -> Any
Get context data.
Parameters:
session_id(str): Session IDkey(str): Context keydefault(Any): Default value if not found
Returns: Any - Context value or default
async list_context_keys(session_id: str) -> List[str]
List all context keys for a session.
Parameters:
session_id(str): Session ID
Returns: List[str] - List of context keys
Cleanup
async cleanup_expired_sessions(max_idle_seconds: int = 1800) -> int
Clean up expired sessions.
Parameters:
max_idle_seconds(int): Maximum idle time in seconds
Returns: int - Number of sessions cleaned up
Models and Configuration
CompressionConfig
Configuration for conversation compression.
Module: aiecs.domain.context.context_engine
from aiecs.domain.context.context_engine import CompressionConfig
@dataclass
class CompressionConfig:
strategy: str = "truncate"
max_messages: int = 50
keep_recent: int = 10
summary_prompt_template: Optional[str] = None
summary_max_tokens: int = 500
include_summary_in_history: bool = True
similarity_threshold: float = 0.95
embedding_model: str = "text-embedding-ada-002"
hybrid_strategies: List[str] = None
auto_compress_enabled: bool = False
auto_compress_threshold: int = 100
auto_compress_target: int = 50
compression_timeout: float = 30.0
Fields
strategy: str = "truncate"
Compression strategy to use.
Values:
"truncate": Fast truncation, keeps most recent N messages (no LLM required)"summarize": LLM-based summarization of older messages"semantic": Embedding-based deduplication of similar messages"hybrid": Combination of multiple strategies applied sequentially
max_messages: int = 50
Maximum messages to keep (for truncation strategy).
keep_recent: int = 10
Always keep N most recent messages (applies to all strategies).
summary_prompt_template: Optional[str] = None
Custom prompt template for summarization. Uses {messages} placeholder.
Example:
config = CompressionConfig(
summary_prompt_template="Summarize focusing on key decisions:\n\n{messages}"
)
summary_max_tokens: int = 500
Maximum tokens for summary output.
include_summary_in_history: bool = True
Whether to add summary as system message in history.
similarity_threshold: float = 0.95
Similarity threshold for semantic deduplication (0.0-1.0). Messages above this similarity are considered duplicates.
embedding_model: str = "text-embedding-ada-002"
Embedding model name for semantic deduplication.
hybrid_strategies: List[str] = None
List of strategies to combine for hybrid mode. Default: ["truncate", "summarize"].
auto_compress_enabled: bool = False
Enable automatic compression when threshold exceeded.
auto_compress_threshold: int = 100
Message count threshold to trigger auto-compression.
auto_compress_target: int = 50
Target message count after auto-compression.
compression_timeout: float = 30.0
Maximum time for compression operation in seconds.
CacheConfig
Configuration for tool result caching.
Module: aiecs.domain.agent.base_agent
from aiecs.domain.agent.base_agent import CacheConfig
@dataclass
class CacheConfig:
enabled: bool = True
default_ttl: int = 300
tool_specific_ttl: Dict[str, int] = None
max_cache_size: int = 1000
max_memory_mb: int = 100
cleanup_interval: int = 60
cleanup_threshold: float = 0.9
include_timestamp_in_key: bool = False
hash_large_inputs: bool = True
Fields
enabled: bool = True
Enable/disable caching globally.
default_ttl: int = 300
Default time-to-live in seconds for cached entries (default: 300 = 5 minutes).
tool_specific_ttl: Dict[str, int] = None
Dictionary mapping tool names to custom TTL values (overrides default_ttl).
Example:
config = CacheConfig(
tool_specific_ttl={
"search": 600, # 10 minutes
"calculator": 3600 # 1 hour
}
)
max_cache_size: int = 1000
Maximum number of cached entries before cleanup (default: 1000).
max_memory_mb: int = 100
Maximum cache memory usage in MB (approximate, default: 100).
cleanup_interval: int = 60
Interval in seconds between cleanup checks (default: 60).
cleanup_threshold: float = 0.9
Capacity threshold (0.0-1.0) to trigger cleanup (default: 0.9 = 90%).
include_timestamp_in_key: bool = False
Whether to include timestamp in cache key (default: False).
hash_large_inputs: bool = True
Whether to hash inputs larger than 1KB for cache keys (default: True).
Methods
get_ttl(tool_name: str) -> int
Get TTL for a specific tool.
Parameters:
tool_name(str): Name of the tool
Returns: int - TTL in seconds (tool-specific if set, otherwise default)
ResourceLimits
Configuration for agent resource limits and rate limiting.
Module: aiecs.domain.agent.models
from aiecs.domain.agent.models import ResourceLimits
class ResourceLimits(BaseModel):
max_concurrent_tasks: int = 10
max_tokens_per_minute: Optional[int] = None
max_tokens_per_hour: Optional[int] = None
token_burst_size: Optional[int] = None
max_tool_calls_per_minute: Optional[int] = None
max_tool_calls_per_hour: Optional[int] = None
max_memory_mb: Optional[int] = None
task_timeout_seconds: Optional[int] = None
resource_wait_timeout_seconds: int = 60
enforce_limits: bool = True
reject_on_limit: bool = False
Fields
max_concurrent_tasks: int = 10
Maximum number of concurrent tasks (default: 10, minimum: 1).
max_tokens_per_minute: Optional[int] = None
Maximum tokens per minute (None = unlimited).
max_tokens_per_hour: Optional[int] = None
Maximum tokens per hour (None = unlimited).
token_burst_size: Optional[int] = None
Token burst size for token bucket algorithm. If None, uses max_tokens_per_minute.
Example:
limits = ResourceLimits(
max_tokens_per_minute=10000,
token_burst_size=20000 # Allow 2x burst
)
max_tool_calls_per_minute: Optional[int] = None
Maximum tool calls per minute (None = unlimited).
max_tool_calls_per_hour: Optional[int] = None
Maximum tool calls per hour (None = unlimited).
max_memory_mb: Optional[int] = None
Maximum memory usage in MB (None = unlimited).
task_timeout_seconds: Optional[int] = None
Maximum task execution time in seconds (None = unlimited).
resource_wait_timeout_seconds: int = 60
Maximum time to wait for resources in seconds (default: 60).
enforce_limits: bool = True
Whether to enforce resource limits (default: True). If False, limits are monitored but not enforced.
reject_on_limit: bool = False
Reject requests when limit reached vs wait (default: False). If True, requests are rejected immediately when limits are reached. If False, requests wait for resources to become available.
Experience
Model for recording agent learning experiences.
Module: aiecs.domain.agent.models
from aiecs.domain.agent.models import Experience
class Experience(BaseModel):
experience_id: str
agent_id: str
task_type: str
task_description: str
task_complexity: Optional[str] = None
approach: str
tools_used: List[str] = []
execution_time: float
success: bool
quality_score: Optional[float] = None
error_type: Optional[str] = None
error_message: Optional[str] = None
context_size: Optional[int] = None
iterations: Optional[int] = None
lessons_learned: Optional[str] = None
recommended_improvements: Optional[str] = None
timestamp: datetime
metadata: Dict[str, Any] = {}
Fields
experience_id: str
Unique identifier for the experience (auto-generated UUID if not provided).
agent_id: str
ID of the agent that had this experience.
task_type: str
Type/category of task (e.g., “data_analysis”, “search”, “translation”).
task_description: str
Human-readable task description.
task_complexity: Optional[str] = None
Task complexity level. Common values: “simple”, “medium”, “complex”.
approach: str
Approach/strategy used (e.g., “parallel_tools”, “sequential”, “react_loop”).
tools_used: List[str] = []
List of tool names used in execution.
execution_time: float
Execution time in seconds (must be >= 0).
success: bool
Whether task execution succeeded.
quality_score: Optional[float] = None
Quality score from 0.0 to 1.0 (None if not available).
error_type: Optional[str] = None
Type of error if failed (e.g., “timeout”, “validation_error”, “network_error”).
error_message: Optional[str] = None
Error message if execution failed.
context_size: Optional[int] = None
Context size in tokens (if applicable, must be >= 0).
iterations: Optional[int] = None
Number of iterations/attempts (if applicable, must be >= 0).
lessons_learned: Optional[str] = None
Human-readable lessons learned from this experience.
recommended_improvements: Optional[str] = None
Recommended improvements for future tasks.
timestamp: datetime
When the experience occurred (defaults to current UTC time if not provided).
metadata: Dict[str, Any] = {}
Additional experience metadata.
RecoveryStrategy
Recovery strategies for error handling.
Module: aiecs.domain.agent.models
from aiecs.domain.agent.models import RecoveryStrategy
class RecoveryStrategy(str, Enum):
RETRY = "retry"
SIMPLIFY = "simplify"
FALLBACK = "fallback"
DELEGATE = "delegate"
ABORT = "abort"
Enum Values
RETRY = "retry"
Retry the same task with exponential backoff.
Use When:
Transient errors (network, timeout, rate limits)
Temporary failures
Errors likely to succeed on retry
Example:
strategies = [RecoveryStrategy.RETRY]
result = await agent.execute_with_recovery(task, context, strategies)
SIMPLIFY = "simplify"
Simplify the task and retry.
Use When:
Task is too complex
Breaking down helps
Simpler version likely to succeed
Example:
strategies = [RecoveryStrategy.SIMPLIFY]
result = await agent.execute_with_recovery(task, context, strategies)
FALLBACK = "fallback"
Use a fallback approach or alternative method.
Use When:
Alternative approach available
Primary method failed
Fallback method acceptable
Example:
strategies = [RecoveryStrategy.FALLBACK]
result = await agent.execute_with_recovery(task, context, strategies)
DELEGATE = "delegate"
Delegate the task to another capable agent.
Use When:
Other agents available
Current agent lacks capability
Delegation appropriate
Example:
strategies = [RecoveryStrategy.DELEGATE]
result = await agent.execute_with_recovery(task, context, strategies)
Note: Requires collaboration_enabled=True and agent_registry to be configured.
ABORT = "abort"
Abort execution and return error (terminal strategy).
Use When:
All recovery attempts exhausted
Error is terminal
No further recovery possible
Example:
strategies = [RecoveryStrategy.ABORT]
try:
result = await agent.execute_with_recovery(task, context, strategies)
except Exception as e:
# Task aborted
print(f"Task aborted: {e}")
Usage Pattern
Strategies are typically chained together, trying each in sequence:
# Full recovery chain
strategies = [
RecoveryStrategy.RETRY, # Try retry first
RecoveryStrategy.SIMPLIFY, # Then simplify
RecoveryStrategy.FALLBACK, # Then fallback
RecoveryStrategy.DELEGATE, # Then delegate
RecoveryStrategy.ABORT # Finally abort
]
result = await agent.execute_with_recovery(task, context, strategies)
Protocols
Protocols define interfaces for duck typing, enabling flexible integration of custom components without inheritance requirements.
LLMClientProtocol
Protocol for LLM clients enabling duck typing integration.
Module: aiecs.llm.protocols
from aiecs.llm.protocols import LLMClientProtocol
from aiecs.llm.clients.base_client import LLMMessage, LLMResponse
@runtime_checkable
class LLMClientProtocol(Protocol):
provider_name: str
async def generate_text(
self,
messages: List[LLMMessage],
model: Optional[str] = None,
temperature: float = 0.7,
max_tokens: Optional[int] = None,
**kwargs,
) -> LLMResponse
async def stream_text(
self,
messages: List[LLMMessage],
model: Optional[str] = None,
temperature: float = 0.7,
max_tokens: Optional[int] = None,
**kwargs,
) -> AsyncGenerator[str, None]
async def close(self) -> None
async def get_embeddings(
self,
texts: List[str],
model: Optional[str] = None,
**kwargs,
) -> List[List[float]]
Required Attributes
provider_name: str
Provider name identifier (e.g., “openai”, “xai”, “custom”).
Required Methods
async generate_text(messages: List[LLMMessage], model: Optional[str] = None, temperature: float = 0.7, max_tokens: Optional[int] = None, **kwargs) -> LLMResponse
Generate text using the LLM provider’s API.
Parameters:
messages(List[LLMMessage]): List of conversation messagesmodel(Optional[str]): Model name (optional, uses default if not provided)temperature(float): Sampling temperature (0.0 to 1.0, default: 0.7)max_tokens(Optional[int]): Maximum tokens to generate**kwargs: Additional provider-specific parameters
Returns: LLMResponse - Response with generated text and metadata
async stream_text(messages: List[LLMMessage], model: Optional[str] = None, temperature: float = 0.7, max_tokens: Optional[int] = None, **kwargs) -> AsyncGenerator[str, None]
Stream text generation using the LLM provider’s API.
Parameters:
messages(List[LLMMessage]): List of conversation messagesmodel(Optional[str]): Model name (optional, uses default if not provided)temperature(float): Sampling temperature (0.0 to 1.0, default: 0.7)max_tokens(Optional[int]): Maximum tokens to generate**kwargs: Additional provider-specific parameters
Yields: str - Text tokens as they are generated
async close() -> None
Clean up resources (connections, sessions, etc.).
async get_embeddings(texts: List[str], model: Optional[str] = None, **kwargs) -> List[List[float]]
Get embeddings for a list of texts (optional, for semantic compression).
Parameters:
texts(List[str]): List of texts to embedmodel(Optional[str]): Embedding model name**kwargs: Additional provider-specific parameters
Returns: List[List[float]] - List of embedding vectors
Note: Not all LLM clients support embeddings. If not supported, raise NotImplementedError.
Example Implementation
class CustomLLMClient:
provider_name = "custom"
async def generate_text(self, messages, model=None, temperature=0.7, max_tokens=None, **kwargs):
# Custom implementation
return LLMResponse(content="...", provider="custom")
async def stream_text(self, messages, model=None, temperature=0.7, max_tokens=None, **kwargs):
async for token in self._custom_stream():
yield token
async def close(self):
# Cleanup
pass
# Use with agent (no inheritance required!)
agent = HybridAgent(
llm_client=CustomLLMClient(),
...
)
ConfigManagerProtocol
Protocol for custom configuration managers enabling dynamic configuration loading.
Module: aiecs.domain.agent.integration.protocols
from aiecs.domain.agent.integration.protocols import ConfigManagerProtocol
@runtime_checkable
class ConfigManagerProtocol(Protocol):
async def get_config(self, key: str, default: Any = None) -> Any
async def set_config(self, key: str, value: Any) -> None
async def reload_config(self) -> None
Required Methods
async get_config(key: str, default: Any = None) -> Any
Get configuration value by key.
Parameters:
key(str): Configuration keydefault(Any): Default value if key not found
Returns: Any - Configuration value or default
async set_config(key: str, value: Any) -> None
Set configuration value.
Parameters:
key(str): Configuration keyvalue(Any): Configuration value
async reload_config() -> None
Reload configuration from source. This method should refresh any cached configuration data.
Example Implementation
class DatabaseConfigManager:
async def get_config(self, key: str, default: Any = None) -> Any:
return await db.get_config(key, default)
async def set_config(self, key: str, value: Any) -> None:
await db.set_config(key, value)
async def reload_config(self) -> None:
await db.refresh_cache()
# Use with agent
agent = HybridAgent(
config_manager=DatabaseConfigManager(),
...
)
CheckpointerProtocol
Protocol for custom checkpointers enabling state persistence (LangGraph compatible).
Module: aiecs.domain.agent.integration.protocols
from aiecs.domain.agent.integration.protocols import CheckpointerProtocol
@runtime_checkable
class CheckpointerProtocol(Protocol):
async def save_checkpoint(
self, agent_id: str, session_id: str, checkpoint_data: Dict[str, Any]
) -> str
async def load_checkpoint(
self, agent_id: str, session_id: str, checkpoint_id: Optional[str] = None
) -> Optional[Dict[str, Any]]
async def list_checkpoints(self, agent_id: str, session_id: str) -> list[str]
Required Methods
async save_checkpoint(agent_id: str, session_id: str, checkpoint_data: Dict[str, Any]) -> str
Save checkpoint and return checkpoint ID.
Parameters:
agent_id(str): Agent identifiersession_id(str): Session identifiercheckpoint_data(Dict[str, Any]): Checkpoint data to save
Returns: str - Checkpoint ID for later retrieval
async load_checkpoint(agent_id: str, session_id: str, checkpoint_id: Optional[str] = None) -> Optional[Dict[str, Any]]
Load checkpoint data.
Parameters:
agent_id(str): Agent identifiersession_id(str): Session identifiercheckpoint_id(Optional[str]): Specific checkpoint ID (loads latest if None)
Returns: Optional[Dict[str, Any]] - Checkpoint data or None if not found
async list_checkpoints(agent_id: str, session_id: str) -> list[str]
List all checkpoint IDs for a session.
Parameters:
agent_id(str): Agent identifiersession_id(str): Session identifier
Returns: list[str] - List of checkpoint IDs
Example Implementation
class RedisCheckpointer:
async def save_checkpoint(self, agent_id: str, session_id: str, checkpoint_data: Dict[str, Any]) -> str:
checkpoint_id = str(uuid.uuid4())
await redis.set(f"checkpoint:{checkpoint_id}", json.dumps(checkpoint_data))
return checkpoint_id
async def load_checkpoint(self, agent_id: str, session_id: str, checkpoint_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
if checkpoint_id:
data = await redis.get(f"checkpoint:{checkpoint_id}")
return json.loads(data) if data else None
return await self._load_latest(agent_id, session_id)
async def list_checkpoints(self, agent_id: str, session_id: str) -> list[str]:
return await redis.keys(f"checkpoint:{agent_id}:{session_id}:*")
# Use with agent
agent = HybridAgent(
checkpointer=RedisCheckpointer(),
...
)
AgentCollaborationProtocol
Protocol for agent collaboration enabling multi-agent workflows.
Module: aiecs.domain.agent.integration.protocols
from aiecs.domain.agent.integration.protocols import AgentCollaborationProtocol
@runtime_checkable
class AgentCollaborationProtocol(Protocol):
agent_id: str
name: str
capabilities: List[str]
async def execute_task(
self, task: Dict[str, Any], context: Dict[str, Any]
) -> Dict[str, Any]
async def review_result(
self, task: Dict[str, Any], result: Dict[str, Any]
) -> Dict[str, Any]
Required Attributes
agent_id: str
Unique identifier for the agent.
name: str
Human-readable agent name.
capabilities: List[str]
List of capability strings (e.g., [“search”, “analysis”, “web_scraping”]).
Required Methods
async execute_task(task: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]
Execute a task.
Parameters:
task(Dict[str, Any]): Task specificationcontext(Dict[str, Any]): Execution context
Returns: Dict[str, Any] - Task execution result
async review_result(task: Dict[str, Any], result: Dict[str, Any]) -> Dict[str, Any]
Review another agent’s task result.
Parameters:
task(Dict[str, Any]): Original task specificationresult(Dict[str, Any]): Task execution result to review
Returns: Dict[str, Any] - Review result with ‘approved’ (bool) and ‘feedback’ (str)
Example Implementation
class CollaborativeAgent(BaseAIAgent):
agent_id: str
name: str
capabilities: List[str]
async def execute_task(self, task: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
return {"success": True, "output": "result"}
async def review_result(self, task: Dict[str, Any], result: Dict[str, Any]) -> Dict[str, Any]:
return {"approved": True, "feedback": "Looks good"}
# Use with agent registry
agent_registry = {
"agent1": CollaborativeAgent(...),
"agent2": CollaborativeAgent(...)
}
agent = HybridAgent(
collaboration_enabled=True,
agent_registry=agent_registry,
...
)
ToolObservation
Structured observation of tool execution results.
Module: aiecs.domain.agent.models
from aiecs.domain.agent.models import ToolObservation
class ToolObservation(BaseModel):
tool_name: str
parameters: Dict[str, Any] = {}
result: Any = None
success: bool
error: Optional[str] = None
execution_time_ms: Optional[float] = None
timestamp: str
Fields
Field Descriptions
tool_name: str
Name of the tool that was executed.
parameters: Dict[str, Any] = {}
Dictionary of parameters passed to the tool.
result: Any = None
Tool execution result (any type).
success: bool
Whether tool execution succeeded (True/False).
error: Optional[str] = None
Error message if execution failed (None if successful).
execution_time_ms: Optional[float] = None
Execution time in milliseconds (None if not measured, must be >= 0).
timestamp: str
ISO format timestamp of execution (auto-generated if not provided).
Methods
Method Descriptions
to_dict() -> Dict[str, Any]
Convert observation to dictionary for serialization.
Returns: Dict[str, Any] - Dictionary representation
Example:
obs = ToolObservation(tool_name="search", success=True, result="data")
data = obs.to_dict()
# {'tool_name': 'search', 'success': True, 'result': 'data', ...}
to_text() -> str
Format observation as text for LLM context.
Returns: str - Formatted text representation
Example:
obs = ToolObservation(
tool_name="search",
parameters={"query": "AI"},
success=True,
result="Found 10 results",
execution_time_ms=250.5
)
text = obs.to_text()
# "Tool: search
# Parameters: {'query': 'AI'}
# Status: SUCCESS
# Result: Found 10 results
# Execution time: 250.5ms
# Timestamp: 2024-01-01T12:00:00"
Enhanced Methods
_execute_tool_with_observation()
Execute a tool and return structured observation.
Module: aiecs.domain.agent.hybrid_agent
Class: HybridAgent
async def _execute_tool_with_observation(
self,
tool_name: str,
operation: Optional[str],
parameters: Dict[str, Any],
) -> ToolObservation
Description
Wraps tool execution with automatic success/error tracking, execution time measurement, and structured result formatting. Returns a ToolObservation object that can be used for debugging, analysis, and LLM reasoning loops.
Parameters
tool_name(str): Name of the tool to executeoperation(Optional[str]): Optional operation nameparameters(Dict[str, Any]): Tool parameters
Returns
ToolObservation - Structured observation with execution details including:
tool_name: Name of the toolparameters: Parameters passedresult: Tool execution result (or None if failed)success: Whether execution succeedederror: Error message if failedexecution_time_ms: Execution time in millisecondstimestamp: ISO timestamp
Example
# Execute tool with observation
obs = await agent._execute_tool_with_observation(
tool_name="search",
operation="query",
parameters={"q": "Python"}
)
# Check success
if obs.success:
print(f"Found results: {obs.result}")
print(f"Execution time: {obs.execution_time_ms}ms")
else:
print(f"Error: {obs.error}")
# Format for LLM context
observation_text = obs.to_text()
# Include in LLM prompt
Notes
Automatically measures execution time
Catches exceptions and returns error observation
Returns observation even on failure (with
success=False)Essential for MasterController compatibility and observation-based reasoning
Performance Enhancement APIs
Caching APIs
execute_tool_with_cache(tool_name: str, parameters: Dict[str, Any]) -> Any
Execute tool with caching support. See Tool Execution section.
invalidate_cache(tool_name: Optional[str] = None, pattern: Optional[str] = None) -> int
Invalidate cache entries. See Tool Execution section.
get_cache_stats() -> Dict[str, Any]
Get cache statistics. See Tool Execution section.
Parallel Execution APIs
execute_tools_parallel(tool_calls: List[Dict[str, Any]], max_concurrency: int = 5) -> List[Dict[str, Any]]
Execute multiple tools in parallel. See Tool Execution section.
analyze_tool_dependencies(tool_calls: List[Dict[str, Any]]) -> Dict[str, List[str]]
Analyze dependencies between tool calls.
Parameters:
tool_calls(List[Dict[str, Any]]): List of tool calls
Returns: Dict[str, List[str]] - Dependency graph mapping tool names to dependent tools
execute_tools_with_dependencies(tool_calls: List[Dict[str, Any]], max_concurrency: int = 5) -> List[Dict[str, Any]]
Execute tools respecting dependencies.
Parameters:
tool_calls(List[Dict[str, Any]]): List of tool callsmax_concurrency(int): Maximum concurrent executions
Returns: List[Dict[str, Any]] - Results in execution order
Streaming APIs
execute_task_streaming(task: Dict[str, Any], context: Dict[str, Any]) -> AsyncIterator[Dict[str, Any]]
Execute task with streaming response. See Streaming section.
process_message_streaming(message: str, sender_id: Optional[str] = None) -> AsyncIterator[str]
Process message with streaming response. See Streaming section.
Reliability Enhancement APIs
Error Recovery APIs
execute_with_recovery(task: Dict[str, Any], context: Dict[str, Any], strategies: Optional[List[str]] = None) -> Dict[str, Any]
Execute task with recovery strategies. See Error Recovery section.
Resource Management APIs
check_resource_availability() -> Dict[str, Any]
Check if resources are available. See Resource Management section.
wait_for_resources(timeout: Optional[float] = None) -> bool
Wait for resources to become available. See Resource Management section.
get_resource_usage() -> Dict[str, Any]
Get current resource usage. See Resource Management section.
Checkpointing APIs
save_checkpoint(session_id: Optional[str] = None) -> Optional[str]
Save agent state checkpoint. See Checkpointing section.
load_checkpoint(checkpoint_id: str) -> bool
Load agent state from checkpoint. See Checkpointing section.
Observability Enhancement APIs
Metrics APIs
get_metrics() -> AgentMetrics
Get agent metrics. See Metrics and Performance section.
update_metrics(execution_time: Optional[float] = None, success: Optional[bool] = None, tokens_used: Optional[int] = None, tool_calls: Optional[int] = None) -> None
Update agent metrics. See Metrics and Performance section.
get_performance_metrics() -> Dict[str, Any]
Get performance metrics. See Metrics and Performance section.
reset_metrics() -> None
Reset agent metrics. See Metrics and Performance section.
Health Status APIs
get_health_status() -> Dict[str, Any]
Get agent health status. See Metrics and Performance section.
get_comprehensive_status() -> Dict[str, Any]
Get comprehensive agent status. See Metrics and Performance section.
Operation Tracking APIs
track_operation_time(operation_name: str) -> OperationTimer
Track operation execution time (context manager). See Metrics and Performance section.
Session Metrics APIs
update_session_metrics(session_id: str, processing_time: float, is_error: bool = False) -> None
Update session-level metrics.
Parameters:
session_id(str): Session IDprocessing_time(float): Processing time in secondsis_error(bool): Whether request resulted in error
This API reference covers the main classes, methods, models, configuration classes, protocols, and enhancement APIs. For additional examples and usage patterns, see the integration guides and examples documentation.