Agent Domain API Reference

This document provides a comprehensive API reference for all agent domain classes, methods, and models.

Table of Contents

  1. BaseAIAgent

  2. Concrete Agent Types

  3. ConversationMemory and Session

  4. ContextEngine

  5. Models and Configuration


BaseAIAgent

Abstract base class for all AI agents providing core functionality for lifecycle management, state management, memory, goals, and metrics tracking.

Module: aiecs.domain.agent.base_agent

from aiecs.domain.agent import BaseAIAgent
from aiecs.domain.agent.models import AgentType, AgentConfiguration

class BaseAIAgent(ABC):
    def __init__(
        self,
        agent_id: str,
        name: str,
        agent_type: AgentType,
        config: AgentConfiguration,
        description: Optional[str] = None,
        version: str = "1.0.0",
        tools: Optional[Union[List[str], Dict[str, BaseTool]]] = None,
        llm_client: Optional[LLMClientProtocol] = None,
        config_manager: Optional[ConfigManagerProtocol] = None,
        checkpointer: Optional[CheckpointerProtocol] = None,
        context_engine: Optional[ContextEngine] = None,
        collaboration_enabled: bool = False,
        agent_registry: Optional[Dict[str, Any]] = None,
        learning_enabled: bool = False,
        resource_limits: Optional[ResourceLimits] = None,
    )

Lifecycle Methods

async initialize() -> None

Initialize the agent and prepare it for use.

Raises:

  • AgentInitializationError: If initialization fails

Example:

agent = HybridAgent(...)
await agent.initialize()

async activate() -> None

Activate the agent, transitioning it to ACTIVE state.

Raises:

  • InvalidStateTransitionError: If transition is invalid

async deactivate() -> None

Deactivate the agent, transitioning it to INACTIVE state.

async shutdown() -> None

Shutdown the agent, cleaning up resources.

State Management

state() -> AgentState

Get current agent state (property).

Returns: AgentState - Current agent state

get_state() -> AgentState

Get current agent state.

Returns: AgentState - Current agent state

Task Execution

async execute_task(task: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]

Execute a task (abstract method, implemented by subclasses).

Parameters:

  • task (Dict[str, Any]): Task specification

  • context (Dict[str, Any]): Execution context

Returns: Dict[str, Any] - Task execution result

Raises:

  • TaskExecutionError: If task execution fails

async process_message(message: str, sender_id: Optional[str] = None) -> Dict[str, Any]

Process an incoming message (abstract method, implemented by subclasses).

Parameters:

  • message (str): Message content

  • sender_id (Optional[str]): Optional sender identifier

Returns: Dict[str, Any] - Response dictionary

Tool Execution

async execute_tool(tool_name: str, parameters: Dict[str, Any]) -> Any

Execute a tool by name.

Parameters:

  • tool_name (str): Name of the tool

  • parameters (Dict[str, Any]): Tool parameters

Returns: Any - Tool execution result

Raises:

  • ToolAccessDeniedError: If tool access is denied

async execute_tools_parallel(tool_calls: List[Dict[str, Any]], max_concurrency: int = 5) -> List[Dict[str, Any]]

Execute multiple tools in parallel with concurrency limit.

Parameters:

  • tool_calls (List[Dict[str, Any]]): List of tool call dicts with ‘tool_name’ and ‘parameters’

  • max_concurrency (int): Maximum number of concurrent tool executions (default: 5)

Returns: List[Dict[str, Any]] - List of results in same order as tool_calls

Example:

results = await agent.execute_tools_parallel([
    {"tool_name": "search", "parameters": {"query": "AI"}},
    {"tool_name": "calculator", "parameters": {"operation": "add", "a": 1, "b": 2}}
], max_concurrency=3)

async execute_tool_with_cache(tool_name: str, parameters: Dict[str, Any]) -> Any

Execute tool with caching support.

Parameters:

  • tool_name (str): Name of the tool

  • parameters (Dict[str, Any]): Tool parameters

Returns: Any - Tool result (from cache or fresh execution)

Example:

result = await agent.execute_tool_with_cache("search", {"query": "AI"})

invalidate_cache(tool_name: Optional[str] = None, pattern: Optional[str] = None) -> int

Invalidate cache entries.

Parameters:

  • tool_name (Optional[str]): Specific tool name to invalidate (None = all)

  • pattern (Optional[str]): Pattern to match cache keys

Returns: int - Number of entries invalidated

get_cache_stats() -> Dict[str, Any]

Get cache statistics.

Returns: Dict[str, Any] - Cache statistics including size, hits, misses, hit_rate

Streaming

async execute_task_streaming(task: Dict[str, Any], context: Dict[str, Any]) -> AsyncIterator[Dict[str, Any]]

Execute task with streaming response.

Parameters:

  • task (Dict[str, Any]): Task specification

  • context (Dict[str, Any]): Execution context

Yields: Dict[str, Any] - Event dictionaries with type (‘token’, ‘tool_call’, ‘tool_result’, ‘status’, ‘result’, ‘error’)

Example:

async for event in agent.execute_task_streaming(task, context):
    if event['type'] == 'token':
        print(event['content'], end='', flush=True)
    elif event['type'] == 'tool_call':
        print(f"Calling {event['tool_name']}...")

async process_message_streaming(message: str, sender_id: Optional[str] = None) -> AsyncIterator[str]

Process a message with streaming response.

Parameters:

  • message (str): Message content

  • sender_id (Optional[str]): Optional sender identifier

Yields: str - Response text tokens/chunks

Memory Management

async add_to_memory(key: str, value: Any, memory_type: MemoryType = MemoryType.SHORT_TERM) -> None

Add value to agent memory.

Parameters:

  • key (str): Memory key

  • value (Any): Value to store

  • memory_type (MemoryType): Type of memory (default: SHORT_TERM)

async retrieve_memory(key: str, default: Any = None) -> Any

Retrieve value from agent memory.

Parameters:

  • key (str): Memory key

  • default (Any): Default value if key not found

Returns: Any - Stored value or default

async clear_memory(memory_type: Optional[MemoryType] = None) -> None

Clear agent memory.

Parameters:

  • memory_type (Optional[MemoryType]): Type of memory to clear (None = all)

get_memory_summary() -> Dict[str, Any]

Get memory summary.

Returns: Dict[str, Any] - Memory summary with counts and sizes

Goals Management

set_goal(goal_id: str, description: str, priority: GoalPriority = GoalPriority.MEDIUM, deadline: Optional[datetime] = None) -> AgentGoal

Set a goal for the agent.

Parameters:

  • goal_id (str): Unique goal identifier

  • description (str): Goal description

  • priority (GoalPriority): Goal priority (default: MEDIUM)

  • deadline (Optional[datetime]): Optional deadline

Returns: AgentGoal - Created goal object

get_goals(status: Optional[GoalStatus] = None) -> List[AgentGoal]

Get agent goals, optionally filtered by status.

Parameters:

  • status (Optional[GoalStatus]): Filter by status (None = all)

Returns: List[AgentGoal] - List of goals

get_goal(goal_id: str) -> Optional[AgentGoal]

Get a specific goal by ID.

Parameters:

  • goal_id (str): Goal identifier

Returns: Optional[AgentGoal] - Goal object or None

update_goal_status(goal_id: str, status: GoalStatus, notes: Optional[str] = None) -> None

Update goal status.

Parameters:

  • goal_id (str): Goal identifier

  • status (GoalStatus): New status

  • notes (Optional[str]): Optional notes

Configuration Management

get_config() -> AgentConfiguration

Get current agent configuration.

Returns: AgentConfiguration - Current configuration

update_config(updates: Dict[str, Any]) -> None

Update agent configuration.

Parameters:

  • updates (Dict[str, Any]): Configuration updates

Note: Uses config manager if available, otherwise updates in-memory config

Capabilities

declare_capability(capability_type: str, description: str, level: CapabilityLevel = CapabilityLevel.BASIC) -> None

Declare an agent capability.

Parameters:

  • capability_type (str): Capability type

  • description (str): Capability description

  • level (CapabilityLevel): Capability level (default: BASIC)

has_capability(capability_type: str) -> bool

Check if agent has a capability.

Parameters:

  • capability_type (str): Capability type

Returns: bool - True if agent has capability

get_capabilities() -> List[AgentCapabilityDeclaration]

Get all declared capabilities.

Returns: List[AgentCapabilityDeclaration] - List of capabilities

Metrics and Performance

get_metrics() -> AgentMetrics

Get agent metrics.

Returns: AgentMetrics - Current metrics

update_metrics(execution_time: Optional[float] = None, success: Optional[bool] = None, tokens_used: Optional[int] = None, tool_calls: Optional[int] = None) -> None

Update agent metrics.

Parameters:

  • execution_time (Optional[float]): Execution time in seconds

  • success (Optional[bool]): Whether operation succeeded

  • tokens_used (Optional[int]): Tokens used

  • tool_calls (Optional[int]): Number of tool calls

track_operation_time(operation_name: str) -> OperationTimer

Track operation execution time (context manager).

Parameters:

  • operation_name (str): Operation name

Returns: OperationTimer - Context manager for tracking

Example:

with agent.track_operation_time("data_processing"):
    result = await agent.execute_task(task, context)

get_performance_metrics() -> Dict[str, Any]

Get performance metrics.

Returns: Dict[str, Any] - Performance metrics including avg_response_time, p95_response_time, p99_response_time, operation_metrics

get_health_status() -> Dict[str, Any]

Get agent health status.

Returns: Dict[str, Any] - Health status with score, status, issues, recommendations

get_comprehensive_status() -> Dict[str, Any]

Get comprehensive agent status.

Returns: Dict[str, Any] - Complete status including state, metrics, health, goals, capabilities

reset_metrics() -> None

Reset agent metrics.

Serialization

to_dict() -> Dict[str, Any]

Serialize agent to dictionary.

Returns: Dict[str, Any] - Serialized agent data

@classmethod from_dict(data: Dict[str, Any]) -> BaseAIAgent

Deserialize agent from dictionary.

Parameters:

  • data (Dict[str, Any]): Serialized agent data

Returns: BaseAIAgent - Deserialized agent instance

Checkpointing

async save_checkpoint(session_id: Optional[str] = None) -> Optional[str]

Save agent state checkpoint.

Parameters:

  • session_id (Optional[str]): Optional session ID

Returns: Optional[str] - Checkpoint ID if saved

Raises:

  • SerializationError: If serialization fails

async load_checkpoint(checkpoint_id: str) -> bool

Load agent state from checkpoint.

Parameters:

  • checkpoint_id (str): Checkpoint ID

Returns: bool - True if loaded successfully

Raises:

  • SerializationError: If deserialization fails

Agent Collaboration

async delegate_task(task_description: str, target_agent_id: Optional[str] = None, required_capabilities: Optional[List[str]] = None, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]

Delegate task to another agent.

Parameters:

  • task_description (str): Task description

  • target_agent_id (Optional[str]): Specific agent ID (None = auto-select)

  • required_capabilities (Optional[List[str]]): Required capabilities

  • context (Optional[Dict[str, Any]]): Additional context

Returns: Dict[str, Any] - Delegated task result

Raises:

  • ValueError: If collaboration not enabled or agent not found

async find_capable_agents(required_capabilities: List[str]) -> List[Any]

Find agents with required capabilities.

Parameters:

  • required_capabilities (List[str]): Required capabilities

Returns: List[Any] - List of capable agents

async request_peer_review(task: Dict[str, Any], result: Dict[str, Any], reviewer_id: Optional[str] = None) -> Dict[str, Any]

Request peer review of a task result.

Parameters:

  • task (Dict[str, Any]): Original task specification

  • result (Dict[str, Any]): Task execution result to review

  • reviewer_id (Optional[str]): Specific reviewer agent ID (None = auto-select)

Returns: Dict[str, Any] - Review result with ‘approved’ (bool), ‘feedback’ (str), ‘reviewer_id’ (str)

Raises:

  • ValueError: If collaboration not enabled or reviewer not found

async collaborate_on_task(task: Dict[str, Any], strategy: str = "parallel", required_capabilities: Optional[List[str]] = None, agent_weights: Optional[Dict[str, float]] = None) -> Dict[str, Any]

Collaborate on task with multiple agents.

Parameters:

  • task (Dict[str, Any]): Task specification

  • strategy (str): Collaboration strategy (‘parallel’, ‘sequential’, ‘consensus’, ‘weighted_consensus’)

  • required_capabilities (Optional[List[str]]): Required capabilities

  • agent_weights (Optional[Dict[str, float]]): Agent weights for weighted consensus

Returns: Dict[str, Any] - Collaboration result

Learning and Adaptation

async record_experience(task: Dict[str, Any], result: Dict[str, Any], approach: str, tools_used: Optional[List[str]] = None) -> None

Record an experience for learning and adaptation.

Parameters:

  • task (Dict[str, Any]): Task specification

  • result (Dict[str, Any]): Task execution result

  • approach (str): Approach/strategy used

  • tools_used (Optional[List[str]]): List of tools used

async get_learning_insights() -> Dict[str, Any]

Get learning insights from experiences.

Returns: Dict[str, Any] - Learning insights including successful_approaches, failed_approaches, recommendations

async adapt_strategy(task: Dict[str, Any]) -> Dict[str, Any]

Adapt strategy based on past experiences.

Parameters:

  • task (Dict[str, Any]): Task specification

Returns: Dict[str, Any] - Adapted strategy

Resource Management

async check_resource_availability() -> Dict[str, Any]

Check if resources are available for task execution.

Returns: Dict[str, Any] - Resource status with ‘available’ (bool), ‘reason’ (str), details

async wait_for_resources(timeout: Optional[float] = None) -> bool

Wait for resources to become available.

Parameters:

  • timeout (Optional[float]): Maximum wait time in seconds (None = use default)

Returns: bool - True if resources became available

async get_resource_usage() -> Dict[str, Any]

Get current resource usage.

Returns: Dict[str, Any] - Resource usage statistics

Error Recovery

async execute_with_recovery(task: Dict[str, Any], context: Dict[str, Any], strategies: Optional[List[str]] = None) -> Dict[str, Any]

Execute task with recovery strategies.

Parameters:

  • task (Dict[str, Any]): Task specification

  • context (Dict[str, Any]): Execution context

  • strategies (Optional[List[str]]): Recovery strategies (None = use default)

Returns: Dict[str, Any] - Task execution result

Raises:

  • TaskExecutionError: If all recovery strategies fail

Context Management

async get_relevant_context(query: str, max_items: int = 10, min_score: float = 0.5) -> List[Dict[str, Any]]

Get relevant context items for a query.

Parameters:

  • query (str): Query string

  • max_items (int): Maximum number of items (default: 10)

  • min_score (float): Minimum relevance score (default: 0.5)

Returns: List[Dict[str, Any]] - List of relevant context items

async score_context_relevance(context_items: List[Dict[str, Any]], query: str) -> List[float]

Score context items for relevance to query.

Parameters:

  • context_items (List[Dict[str, Any]]): Context items to score

  • query (str): Query string

Returns: List[float] - Relevance scores

async prune_context(context_items: List[Dict[str, Any]], max_tokens: int, query: Optional[str] = None, preserve_types: Optional[List[str]] = None) -> List[Dict[str, Any]]

Prune context items to fit within token limit.

Parameters:

  • context_items (List[Dict[str, Any]]): Context items to prune

  • max_tokens (int): Maximum tokens

  • query (Optional[str]): Optional query for relevance scoring

  • preserve_types (Optional[List[str]]): Types to preserve

Returns: List[Dict[str, Any]] - Pruned context items

Utility Methods

is_available() -> bool

Check if agent is available.

Returns: bool - True if agent is available

is_busy() -> bool

Check if agent is busy.

Returns: bool - True if agent is busy


Concrete Agent Types

LLMAgent

LLM-powered agent for text generation and reasoning.

Module: aiecs.domain.agent.llm_agent

from aiecs.domain.agent import LLMAgent
from aiecs.llm import BaseLLMClient

class LLMAgent(BaseAIAgent):
    def __init__(
        self,
        agent_id: str,
        name: str,
        llm_client: Union[BaseLLMClient, LLMClientProtocol],
        config: AgentConfiguration,
        description: Optional[str] = None,
        version: str = "1.0.0",
        config_manager: Optional[ConfigManagerProtocol] = None,
        checkpointer: Optional[CheckpointerProtocol] = None,
        context_engine: Optional[ContextEngine] = None,
        collaboration_enabled: bool = False,
        agent_registry: Optional[Dict[str, Any]] = None,
        learning_enabled: bool = False,
        resource_limits: Optional[ResourceLimits] = None,
    )

Methods

All methods from BaseAIAgent plus:

async execute_task(task: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]

Execute a task using LLM.

Parameters:

  • task (Dict[str, Any]): Task specification with ‘description’, ‘prompt’, or ‘task’ field

  • context (Dict[str, Any]): Execution context

Returns: Dict[str, Any] - Execution result with ‘output’, ‘execution_time’, ‘tokens_used’

async process_message(message: str, sender_id: Optional[str] = None) -> Dict[str, Any]

Process a message using LLM.

Parameters:

  • message (str): Message content

  • sender_id (Optional[str]): Optional sender identifier

Returns: Dict[str, Any] - Response dictionary with ‘response’, ‘execution_time’

async execute_task_streaming(task: Dict[str, Any], context: Dict[str, Any]) -> AsyncIterator[Dict[str, Any]]

Execute task with streaming response.

Parameters:

  • task (Dict[str, Any]): Task specification

  • context (Dict[str, Any]): Execution context

Yields: Dict[str, Any] - Event dictionaries with ‘type’ (‘token’, ‘result’, ‘error’)

ToolAgent

Agent specialized in tool selection and execution.

Module: aiecs.domain.agent.tool_agent

from aiecs.domain.agent import ToolAgent

class ToolAgent(BaseAIAgent):
    def __init__(
        self,
        agent_id: str,
        name: str,
        tools: Union[List[str], Dict[str, BaseTool]],
        config: AgentConfiguration,
        description: Optional[str] = None,
        version: str = "1.0.0",
        config_manager: Optional[ConfigManagerProtocol] = None,
        checkpointer: Optional[CheckpointerProtocol] = None,
        context_engine: Optional[ContextEngine] = None,
        collaboration_enabled: bool = False,
        agent_registry: Optional[Dict[str, Any]] = None,
        learning_enabled: bool = False,
        resource_limits: Optional[ResourceLimits] = None,
    )

Methods

All methods from BaseAIAgent plus:

async execute_task(task: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]

Execute a task using tools.

Parameters:

  • task (Dict[str, Any]): Task specification with ‘tool’, ‘operation’, and ‘parameters’

  • context (Dict[str, Any]): Execution context

Returns: Dict[str, Any] - Execution result with ‘output’, ‘tool_used’, ‘execution_time’

Raises:

  • TaskExecutionError: If task execution fails

  • ToolAccessDeniedError: If tool access is denied

HybridAgent

Hybrid agent combining LLM reasoning with tool execution (ReAct pattern).

Module: aiecs.domain.agent.hybrid_agent

from aiecs.domain.agent import HybridAgent

class HybridAgent(BaseAIAgent):
    def __init__(
        self,
        agent_id: str,
        name: str,
        llm_client: Union[BaseLLMClient, LLMClientProtocol],
        tools: Union[List[str], Dict[str, BaseTool]],
        config: AgentConfiguration,
        description: Optional[str] = None,
        version: str = "1.0.0",
        config_manager: Optional[ConfigManagerProtocol] = None,
        checkpointer: Optional[CheckpointerProtocol] = None,
        context_engine: Optional[ContextEngine] = None,
        collaboration_enabled: bool = False,
        agent_registry: Optional[Dict[str, Any]] = None,
        learning_enabled: bool = False,
        resource_limits: Optional[ResourceLimits] = None,
        max_iterations: int = 10,
    )

Methods

All methods from BaseAIAgent plus:

async execute_task(task: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]

Execute a task using ReAct pattern (Reason → Act → Observe).

Parameters:

  • task (Dict[str, Any]): Task specification with ‘description’ or ‘prompt’

  • context (Dict[str, Any]): Execution context

Returns: Dict[str, Any] - Execution result with ‘output’, ‘steps’, ‘iterations’, ‘execution_time’

async process_message(message: str, sender_id: Optional[str] = None) -> Dict[str, Any]

Process a message using ReAct pattern.

Parameters:

  • message (str): Message content

  • sender_id (Optional[str]): Optional sender identifier

Returns: Dict[str, Any] - Response dictionary

async execute_task_streaming(task: Dict[str, Any], context: Dict[str, Any]) -> AsyncIterator[Dict[str, Any]]

Execute task with streaming response (tokens + tool calls).

Parameters:

  • task (Dict[str, Any]): Task specification

  • context (Dict[str, Any]): Execution context

Yields: Dict[str, Any] - Event dictionaries with ‘type’ (‘token’, ‘tool_call’, ‘tool_result’, ‘status’, ‘result’)

async _execute_tool_with_observation(tool_name: str, operation: Optional[str], parameters: Dict[str, Any]) -> ToolObservation

Execute a tool and return structured observation.

Parameters:

  • tool_name (str): Name of the tool to execute

  • operation (Optional[str]): Optional operation name

  • parameters (Dict[str, Any]): Tool parameters

Returns: ToolObservation - Structured observation with execution details

get_available_tools() -> List[str]

Get list of available tools.

Returns: List[str] - List of tool names


ConversationMemory and Session

ConversationMemory

Multi-turn conversation handling with session management.

Module: aiecs.domain.agent.memory.conversation

from aiecs.domain.agent.memory import ConversationMemory

class ConversationMemory:
    def __init__(
        self,
        agent_id: str,
        context_engine: Optional[ContextEngine] = None,
        max_history_length: int = 100,
    )

Methods

async add_message(role: str, content: str, session_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> None

Add a message to conversation history.

Parameters:

  • role (str): Message role (‘user’, ‘assistant’, ‘system’)

  • content (str): Message content

  • session_id (Optional[str]): Optional session ID

  • metadata (Optional[Dict[str, Any]]): Optional metadata

async get_messages(session_id: Optional[str] = None, limit: Optional[int] = None) -> List[LLMMessage]

Get conversation messages.

Parameters:

  • session_id (Optional[str]): Optional session ID

  • limit (Optional[int]): Optional limit on number of messages

Returns: List[LLMMessage] - List of messages

async clear(session_id: Optional[str] = None) -> None

Clear conversation history.

Parameters:

  • session_id (Optional[str]): Optional session ID (None = all sessions)

async get_session(session_id: str) -> Optional[Session]

Get a session by ID.

Parameters:

  • session_id (str): Session ID

Returns: Optional[Session] - Session object or None

async create_session(session_id: Optional[str] = None, user_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> Session

Create a new session.

Parameters:

  • session_id (Optional[str]): Optional session ID (auto-generated if None)

  • user_id (Optional[str]): Optional user ID

  • metadata (Optional[Dict[str, Any]]): Optional metadata

Returns: Session - Created session

async end_session(session_id: str, status: str = "completed") -> None

End a session.

Parameters:

  • session_id (str): Session ID

  • status (str): Session status (‘completed’, ‘failed’, ‘expired’)

async list_sessions(active_only: bool = False) -> List[Session]

List all sessions.

Parameters:

  • active_only (bool): Only return active sessions

Returns: List[Session] - List of sessions

Session

Conversation session with lifecycle management and metrics tracking.

Module: aiecs.domain.agent.memory.conversation

from aiecs.domain.agent.memory import Session

@dataclass
class Session:
    session_id: str
    agent_id: str
    user_id: Optional[str] = None
    created_at: datetime = field(default_factory=datetime.utcnow)
    last_activity: datetime = field(default_factory=datetime.utcnow)
    status: str = "active"
    metadata: Dict[str, Any] = field(default_factory=dict)
    messages: List[LLMMessage] = field(default_factory=list)
    request_count: int = 0
    error_count: int = 0
    total_processing_time: float = 0.0

Methods

add_message(role: str, content: str, metadata: Optional[Dict[str, Any]] = None) -> None

Add a message to the session.

Parameters:

  • role (str): Message role

  • content (str): Message content

  • metadata (Optional[Dict[str, Any]]): Optional metadata

track_request(processing_time: float, is_error: bool = False) -> None

Track a request in the session.

Parameters:

  • processing_time (float): Processing time in seconds

  • is_error (bool): Whether request resulted in error

get_metrics() -> Dict[str, Any]

Get session metrics.

Returns: Dict[str, Any] - Metrics including request_count, error_count, average_processing_time, message_count

is_active() -> bool

Check if session is active.

Returns: bool - True if session is active

end(status: str = "completed") -> None

End the session.

Parameters:

  • status (str): Session status (‘completed’, ‘failed’, ‘expired’)

is_expired(max_idle_seconds: int = 1800) -> bool

Check if session is expired.

Parameters:

  • max_idle_seconds (int): Maximum idle time in seconds (default: 1800 = 30 minutes)

Returns: bool - True if session is expired


ContextEngine

Advanced context and session management engine with Redis backend storage.

Module: aiecs.domain.context.context_engine

from aiecs.domain.context import ContextEngine

class ContextEngine:
    def __init__(
        self,
        storage_backend: Optional[IStorageBackend] = None,
        compression_config: Optional[CompressionConfig] = None,
    )

Initialization

async initialize() -> None

Initialize the context engine.

Raises:

  • RuntimeError: If initialization fails

Session Management

async create_session(session_id: Optional[str] = None, user_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> str

Create a new session.

Parameters:

  • session_id (Optional[str]): Optional session ID (auto-generated if None)

  • user_id (Optional[str]): Optional user ID

  • metadata (Optional[Dict[str, Any]]): Optional metadata

Returns: str - Session ID

async get_session(session_id: str) -> Optional[SessionMetrics]

Get session metrics.

Parameters:

  • session_id (str): Session ID

Returns: Optional[SessionMetrics] - Session metrics or None

async end_session(session_id: str, status: str = "completed") -> None

End a session.

Parameters:

  • session_id (str): Session ID

  • status (str): Session status

Conversation History

async add_message(session_id: str, role: str, content: str, metadata: Optional[Dict[str, Any]] = None) -> None

Add a message to conversation history.

Parameters:

  • session_id (str): Session ID

  • role (str): Message role

  • content (str): Message content

  • metadata (Optional[Dict[str, Any]]): Optional metadata

async get_messages(session_id: str, limit: Optional[int] = None, offset: int = 0) -> List[ConversationMessage]

Get conversation messages.

Parameters:

  • session_id (str): Session ID

  • limit (Optional[int]): Optional limit

  • offset (int): Offset for pagination

Returns: List[ConversationMessage] - List of messages

async clear_messages(session_id: str) -> None

Clear conversation messages for a session.

Parameters:

  • session_id (str): Session ID

Compression Methods

async compress_conversation(session_id: str, strategy: Optional[str] = None, target_length: Optional[int] = None, custom_prompt: Optional[str] = None) -> Dict[str, Any]

Compress conversation history.

Parameters:

  • session_id (str): Session ID

  • strategy (Optional[str]): Compression strategy (‘truncate’, ‘summarize’, ‘semantic’, ‘hybrid’) (None = use config)

  • target_length (Optional[int]): Target length in messages (None = use config)

  • custom_prompt (Optional[str]): Custom compression prompt (None = use default)

Returns: Dict[str, Any] - Compression result with ‘original_length’, ‘compressed_length’, ‘strategy’, ‘summary’

Example:

result = await context_engine.compress_conversation(
    session_id="session-123",
    strategy="summarize",
    target_length=10
)

async summarize_conversation(session_id: str, max_messages: Optional[int] = None, custom_prompt: Optional[str] = None) -> str

Summarize conversation history.

Parameters:

  • session_id (str): Session ID

  • max_messages (Optional[int]): Maximum messages to summarize (None = all)

  • custom_prompt (Optional[str]): Custom summarization prompt

Returns: str - Conversation summary

async truncate_conversation(session_id: str, keep_last: int) -> int

Truncate conversation history, keeping only the last N messages.

Parameters:

  • session_id (str): Session ID

  • keep_last (int): Number of messages to keep

Returns: int - Number of messages removed

async semantic_compress_conversation(session_id: str, query: Optional[str] = None, max_messages: Optional[int] = None) -> Dict[str, Any]

Semantically compress conversation based on relevance.

Parameters:

  • session_id (str): Session ID

  • query (Optional[str]): Optional query for relevance scoring

  • max_messages (Optional[int]): Maximum messages to keep

Returns: Dict[str, Any] - Compression result

Context Storage

async store_context(session_id: str, key: str, value: Any, metadata: Optional[Dict[str, Any]] = None) -> None

Store context data.

Parameters:

  • session_id (str): Session ID

  • key (str): Context key

  • value (Any): Context value

  • metadata (Optional[Dict[str, Any]]): Optional metadata

async get_context(session_id: str, key: str, default: Any = None) -> Any

Get context data.

Parameters:

  • session_id (str): Session ID

  • key (str): Context key

  • default (Any): Default value if not found

Returns: Any - Context value or default

async list_context_keys(session_id: str) -> List[str]

List all context keys for a session.

Parameters:

  • session_id (str): Session ID

Returns: List[str] - List of context keys

Cleanup

async cleanup_expired_sessions(max_idle_seconds: int = 1800) -> int

Clean up expired sessions.

Parameters:

  • max_idle_seconds (int): Maximum idle time in seconds

Returns: int - Number of sessions cleaned up


Models and Configuration

CompressionConfig

Configuration for conversation compression.

Module: aiecs.domain.context.context_engine

from aiecs.domain.context.context_engine import CompressionConfig

@dataclass
class CompressionConfig:
    strategy: str = "truncate"
    max_messages: int = 50
    keep_recent: int = 10
    summary_prompt_template: Optional[str] = None
    summary_max_tokens: int = 500
    include_summary_in_history: bool = True
    similarity_threshold: float = 0.95
    embedding_model: str = "text-embedding-ada-002"
    hybrid_strategies: List[str] = None
    auto_compress_enabled: bool = False
    auto_compress_threshold: int = 100
    auto_compress_target: int = 50
    compression_timeout: float = 30.0

Fields

strategy: str = "truncate"

Compression strategy to use.

Values:

  • "truncate": Fast truncation, keeps most recent N messages (no LLM required)

  • "summarize": LLM-based summarization of older messages

  • "semantic": Embedding-based deduplication of similar messages

  • "hybrid": Combination of multiple strategies applied sequentially

max_messages: int = 50

Maximum messages to keep (for truncation strategy).

keep_recent: int = 10

Always keep N most recent messages (applies to all strategies).

summary_prompt_template: Optional[str] = None

Custom prompt template for summarization. Uses {messages} placeholder.

Example:

config = CompressionConfig(
    summary_prompt_template="Summarize focusing on key decisions:\n\n{messages}"
)
summary_max_tokens: int = 500

Maximum tokens for summary output.

include_summary_in_history: bool = True

Whether to add summary as system message in history.

similarity_threshold: float = 0.95

Similarity threshold for semantic deduplication (0.0-1.0). Messages above this similarity are considered duplicates.

embedding_model: str = "text-embedding-ada-002"

Embedding model name for semantic deduplication.

hybrid_strategies: List[str] = None

List of strategies to combine for hybrid mode. Default: ["truncate", "summarize"].

auto_compress_enabled: bool = False

Enable automatic compression when threshold exceeded.

auto_compress_threshold: int = 100

Message count threshold to trigger auto-compression.

auto_compress_target: int = 50

Target message count after auto-compression.

compression_timeout: float = 30.0

Maximum time for compression operation in seconds.


CacheConfig

Configuration for tool result caching.

Module: aiecs.domain.agent.base_agent

from aiecs.domain.agent.base_agent import CacheConfig

@dataclass
class CacheConfig:
    enabled: bool = True
    default_ttl: int = 300
    tool_specific_ttl: Dict[str, int] = None
    max_cache_size: int = 1000
    max_memory_mb: int = 100
    cleanup_interval: int = 60
    cleanup_threshold: float = 0.9
    include_timestamp_in_key: bool = False
    hash_large_inputs: bool = True

Fields

enabled: bool = True

Enable/disable caching globally.

default_ttl: int = 300

Default time-to-live in seconds for cached entries (default: 300 = 5 minutes).

tool_specific_ttl: Dict[str, int] = None

Dictionary mapping tool names to custom TTL values (overrides default_ttl).

Example:

config = CacheConfig(
    tool_specific_ttl={
        "search": 600,  # 10 minutes
        "calculator": 3600  # 1 hour
    }
)
max_cache_size: int = 1000

Maximum number of cached entries before cleanup (default: 1000).

max_memory_mb: int = 100

Maximum cache memory usage in MB (approximate, default: 100).

cleanup_interval: int = 60

Interval in seconds between cleanup checks (default: 60).

cleanup_threshold: float = 0.9

Capacity threshold (0.0-1.0) to trigger cleanup (default: 0.9 = 90%).

include_timestamp_in_key: bool = False

Whether to include timestamp in cache key (default: False).

hash_large_inputs: bool = True

Whether to hash inputs larger than 1KB for cache keys (default: True).

Methods

get_ttl(tool_name: str) -> int

Get TTL for a specific tool.

Parameters:

  • tool_name (str): Name of the tool

Returns: int - TTL in seconds (tool-specific if set, otherwise default)


ResourceLimits

Configuration for agent resource limits and rate limiting.

Module: aiecs.domain.agent.models

from aiecs.domain.agent.models import ResourceLimits

class ResourceLimits(BaseModel):
    max_concurrent_tasks: int = 10
    max_tokens_per_minute: Optional[int] = None
    max_tokens_per_hour: Optional[int] = None
    token_burst_size: Optional[int] = None
    max_tool_calls_per_minute: Optional[int] = None
    max_tool_calls_per_hour: Optional[int] = None
    max_memory_mb: Optional[int] = None
    task_timeout_seconds: Optional[int] = None
    resource_wait_timeout_seconds: int = 60
    enforce_limits: bool = True
    reject_on_limit: bool = False

Fields

max_concurrent_tasks: int = 10

Maximum number of concurrent tasks (default: 10, minimum: 1).

max_tokens_per_minute: Optional[int] = None

Maximum tokens per minute (None = unlimited).

max_tokens_per_hour: Optional[int] = None

Maximum tokens per hour (None = unlimited).

token_burst_size: Optional[int] = None

Token burst size for token bucket algorithm. If None, uses max_tokens_per_minute.

Example:

limits = ResourceLimits(
    max_tokens_per_minute=10000,
    token_burst_size=20000  # Allow 2x burst
)
max_tool_calls_per_minute: Optional[int] = None

Maximum tool calls per minute (None = unlimited).

max_tool_calls_per_hour: Optional[int] = None

Maximum tool calls per hour (None = unlimited).

max_memory_mb: Optional[int] = None

Maximum memory usage in MB (None = unlimited).

task_timeout_seconds: Optional[int] = None

Maximum task execution time in seconds (None = unlimited).

resource_wait_timeout_seconds: int = 60

Maximum time to wait for resources in seconds (default: 60).

enforce_limits: bool = True

Whether to enforce resource limits (default: True). If False, limits are monitored but not enforced.

reject_on_limit: bool = False

Reject requests when limit reached vs wait (default: False). If True, requests are rejected immediately when limits are reached. If False, requests wait for resources to become available.


Experience

Model for recording agent learning experiences.

Module: aiecs.domain.agent.models

from aiecs.domain.agent.models import Experience

class Experience(BaseModel):
    experience_id: str
    agent_id: str
    task_type: str
    task_description: str
    task_complexity: Optional[str] = None
    approach: str
    tools_used: List[str] = []
    execution_time: float
    success: bool
    quality_score: Optional[float] = None
    error_type: Optional[str] = None
    error_message: Optional[str] = None
    context_size: Optional[int] = None
    iterations: Optional[int] = None
    lessons_learned: Optional[str] = None
    recommended_improvements: Optional[str] = None
    timestamp: datetime
    metadata: Dict[str, Any] = {}

Fields

experience_id: str

Unique identifier for the experience (auto-generated UUID if not provided).

agent_id: str

ID of the agent that had this experience.

task_type: str

Type/category of task (e.g., “data_analysis”, “search”, “translation”).

task_description: str

Human-readable task description.

task_complexity: Optional[str] = None

Task complexity level. Common values: “simple”, “medium”, “complex”.

approach: str

Approach/strategy used (e.g., “parallel_tools”, “sequential”, “react_loop”).

tools_used: List[str] = []

List of tool names used in execution.

execution_time: float

Execution time in seconds (must be >= 0).

success: bool

Whether task execution succeeded.

quality_score: Optional[float] = None

Quality score from 0.0 to 1.0 (None if not available).

error_type: Optional[str] = None

Type of error if failed (e.g., “timeout”, “validation_error”, “network_error”).

error_message: Optional[str] = None

Error message if execution failed.

context_size: Optional[int] = None

Context size in tokens (if applicable, must be >= 0).

iterations: Optional[int] = None

Number of iterations/attempts (if applicable, must be >= 0).

lessons_learned: Optional[str] = None

Human-readable lessons learned from this experience.

timestamp: datetime

When the experience occurred (defaults to current UTC time if not provided).

metadata: Dict[str, Any] = {}

Additional experience metadata.


RecoveryStrategy

Recovery strategies for error handling.

Module: aiecs.domain.agent.models

from aiecs.domain.agent.models import RecoveryStrategy

class RecoveryStrategy(str, Enum):
    RETRY = "retry"
    SIMPLIFY = "simplify"
    FALLBACK = "fallback"
    DELEGATE = "delegate"
    ABORT = "abort"

Enum Values

RETRY = "retry"

Retry the same task with exponential backoff.

Use When:

  • Transient errors (network, timeout, rate limits)

  • Temporary failures

  • Errors likely to succeed on retry

Example:

strategies = [RecoveryStrategy.RETRY]
result = await agent.execute_with_recovery(task, context, strategies)
SIMPLIFY = "simplify"

Simplify the task and retry.

Use When:

  • Task is too complex

  • Breaking down helps

  • Simpler version likely to succeed

Example:

strategies = [RecoveryStrategy.SIMPLIFY]
result = await agent.execute_with_recovery(task, context, strategies)
FALLBACK = "fallback"

Use a fallback approach or alternative method.

Use When:

  • Alternative approach available

  • Primary method failed

  • Fallback method acceptable

Example:

strategies = [RecoveryStrategy.FALLBACK]
result = await agent.execute_with_recovery(task, context, strategies)
DELEGATE = "delegate"

Delegate the task to another capable agent.

Use When:

  • Other agents available

  • Current agent lacks capability

  • Delegation appropriate

Example:

strategies = [RecoveryStrategy.DELEGATE]
result = await agent.execute_with_recovery(task, context, strategies)

Note: Requires collaboration_enabled=True and agent_registry to be configured.

ABORT = "abort"

Abort execution and return error (terminal strategy).

Use When:

  • All recovery attempts exhausted

  • Error is terminal

  • No further recovery possible

Example:

strategies = [RecoveryStrategy.ABORT]
try:
    result = await agent.execute_with_recovery(task, context, strategies)
except Exception as e:
    # Task aborted
    print(f"Task aborted: {e}")

Usage Pattern

Strategies are typically chained together, trying each in sequence:

# Full recovery chain
strategies = [
    RecoveryStrategy.RETRY,      # Try retry first
    RecoveryStrategy.SIMPLIFY,   # Then simplify
    RecoveryStrategy.FALLBACK,   # Then fallback
    RecoveryStrategy.DELEGATE,   # Then delegate
    RecoveryStrategy.ABORT      # Finally abort
]

result = await agent.execute_with_recovery(task, context, strategies)

Protocols

Protocols define interfaces for duck typing, enabling flexible integration of custom components without inheritance requirements.

LLMClientProtocol

Protocol for LLM clients enabling duck typing integration.

Module: aiecs.llm.protocols

from aiecs.llm.protocols import LLMClientProtocol
from aiecs.llm.clients.base_client import LLMMessage, LLMResponse

@runtime_checkable
class LLMClientProtocol(Protocol):
    provider_name: str
    
    async def generate_text(
        self,
        messages: List[LLMMessage],
        model: Optional[str] = None,
        temperature: float = 0.7,
        max_tokens: Optional[int] = None,
        **kwargs,
    ) -> LLMResponse
    
    async def stream_text(
        self,
        messages: List[LLMMessage],
        model: Optional[str] = None,
        temperature: float = 0.7,
        max_tokens: Optional[int] = None,
        **kwargs,
    ) -> AsyncGenerator[str, None]
    
    async def close(self) -> None
    
    async def get_embeddings(
        self,
        texts: List[str],
        model: Optional[str] = None,
        **kwargs,
    ) -> List[List[float]]

Required Attributes

provider_name: str

Provider name identifier (e.g., “openai”, “xai”, “custom”).

Required Methods

async generate_text(messages: List[LLMMessage], model: Optional[str] = None, temperature: float = 0.7, max_tokens: Optional[int] = None, **kwargs) -> LLMResponse

Generate text using the LLM provider’s API.

Parameters:

  • messages (List[LLMMessage]): List of conversation messages

  • model (Optional[str]): Model name (optional, uses default if not provided)

  • temperature (float): Sampling temperature (0.0 to 1.0, default: 0.7)

  • max_tokens (Optional[int]): Maximum tokens to generate

  • **kwargs: Additional provider-specific parameters

Returns: LLMResponse - Response with generated text and metadata

async stream_text(messages: List[LLMMessage], model: Optional[str] = None, temperature: float = 0.7, max_tokens: Optional[int] = None, **kwargs) -> AsyncGenerator[str, None]

Stream text generation using the LLM provider’s API.

Parameters:

  • messages (List[LLMMessage]): List of conversation messages

  • model (Optional[str]): Model name (optional, uses default if not provided)

  • temperature (float): Sampling temperature (0.0 to 1.0, default: 0.7)

  • max_tokens (Optional[int]): Maximum tokens to generate

  • **kwargs: Additional provider-specific parameters

Yields: str - Text tokens as they are generated

async close() -> None

Clean up resources (connections, sessions, etc.).

async get_embeddings(texts: List[str], model: Optional[str] = None, **kwargs) -> List[List[float]]

Get embeddings for a list of texts (optional, for semantic compression).

Parameters:

  • texts (List[str]): List of texts to embed

  • model (Optional[str]): Embedding model name

  • **kwargs: Additional provider-specific parameters

Returns: List[List[float]] - List of embedding vectors

Note: Not all LLM clients support embeddings. If not supported, raise NotImplementedError.

Example Implementation

class CustomLLMClient:
    provider_name = "custom"
    
    async def generate_text(self, messages, model=None, temperature=0.7, max_tokens=None, **kwargs):
        # Custom implementation
        return LLMResponse(content="...", provider="custom")
    
    async def stream_text(self, messages, model=None, temperature=0.7, max_tokens=None, **kwargs):
        async for token in self._custom_stream():
            yield token
    
    async def close(self):
        # Cleanup
        pass

# Use with agent (no inheritance required!)
agent = HybridAgent(
    llm_client=CustomLLMClient(),
    ...
)

ConfigManagerProtocol

Protocol for custom configuration managers enabling dynamic configuration loading.

Module: aiecs.domain.agent.integration.protocols

from aiecs.domain.agent.integration.protocols import ConfigManagerProtocol

@runtime_checkable
class ConfigManagerProtocol(Protocol):
    async def get_config(self, key: str, default: Any = None) -> Any
    async def set_config(self, key: str, value: Any) -> None
    async def reload_config(self) -> None

Required Methods

async get_config(key: str, default: Any = None) -> Any

Get configuration value by key.

Parameters:

  • key (str): Configuration key

  • default (Any): Default value if key not found

Returns: Any - Configuration value or default

async set_config(key: str, value: Any) -> None

Set configuration value.

Parameters:

  • key (str): Configuration key

  • value (Any): Configuration value

async reload_config() -> None

Reload configuration from source. This method should refresh any cached configuration data.

Example Implementation

class DatabaseConfigManager:
    async def get_config(self, key: str, default: Any = None) -> Any:
        return await db.get_config(key, default)
    
    async def set_config(self, key: str, value: Any) -> None:
        await db.set_config(key, value)
    
    async def reload_config(self) -> None:
        await db.refresh_cache()

# Use with agent
agent = HybridAgent(
    config_manager=DatabaseConfigManager(),
    ...
)

CheckpointerProtocol

Protocol for custom checkpointers enabling state persistence (LangGraph compatible).

Module: aiecs.domain.agent.integration.protocols

from aiecs.domain.agent.integration.protocols import CheckpointerProtocol

@runtime_checkable
class CheckpointerProtocol(Protocol):
    async def save_checkpoint(
        self, agent_id: str, session_id: str, checkpoint_data: Dict[str, Any]
    ) -> str
    
    async def load_checkpoint(
        self, agent_id: str, session_id: str, checkpoint_id: Optional[str] = None
    ) -> Optional[Dict[str, Any]]
    
    async def list_checkpoints(self, agent_id: str, session_id: str) -> list[str]

Required Methods

async save_checkpoint(agent_id: str, session_id: str, checkpoint_data: Dict[str, Any]) -> str

Save checkpoint and return checkpoint ID.

Parameters:

  • agent_id (str): Agent identifier

  • session_id (str): Session identifier

  • checkpoint_data (Dict[str, Any]): Checkpoint data to save

Returns: str - Checkpoint ID for later retrieval

async load_checkpoint(agent_id: str, session_id: str, checkpoint_id: Optional[str] = None) -> Optional[Dict[str, Any]]

Load checkpoint data.

Parameters:

  • agent_id (str): Agent identifier

  • session_id (str): Session identifier

  • checkpoint_id (Optional[str]): Specific checkpoint ID (loads latest if None)

Returns: Optional[Dict[str, Any]] - Checkpoint data or None if not found

async list_checkpoints(agent_id: str, session_id: str) -> list[str]

List all checkpoint IDs for a session.

Parameters:

  • agent_id (str): Agent identifier

  • session_id (str): Session identifier

Returns: list[str] - List of checkpoint IDs

Example Implementation

class RedisCheckpointer:
    async def save_checkpoint(self, agent_id: str, session_id: str, checkpoint_data: Dict[str, Any]) -> str:
        checkpoint_id = str(uuid.uuid4())
        await redis.set(f"checkpoint:{checkpoint_id}", json.dumps(checkpoint_data))
        return checkpoint_id
    
    async def load_checkpoint(self, agent_id: str, session_id: str, checkpoint_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
        if checkpoint_id:
            data = await redis.get(f"checkpoint:{checkpoint_id}")
            return json.loads(data) if data else None
        return await self._load_latest(agent_id, session_id)
    
    async def list_checkpoints(self, agent_id: str, session_id: str) -> list[str]:
        return await redis.keys(f"checkpoint:{agent_id}:{session_id}:*")

# Use with agent
agent = HybridAgent(
    checkpointer=RedisCheckpointer(),
    ...
)

AgentCollaborationProtocol

Protocol for agent collaboration enabling multi-agent workflows.

Module: aiecs.domain.agent.integration.protocols

from aiecs.domain.agent.integration.protocols import AgentCollaborationProtocol

@runtime_checkable
class AgentCollaborationProtocol(Protocol):
    agent_id: str
    name: str
    capabilities: List[str]
    
    async def execute_task(
        self, task: Dict[str, Any], context: Dict[str, Any]
    ) -> Dict[str, Any]
    
    async def review_result(
        self, task: Dict[str, Any], result: Dict[str, Any]
    ) -> Dict[str, Any]

Required Attributes

agent_id: str

Unique identifier for the agent.

name: str

Human-readable agent name.

capabilities: List[str]

List of capability strings (e.g., [“search”, “analysis”, “web_scraping”]).

Required Methods

async execute_task(task: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]

Execute a task.

Parameters:

  • task (Dict[str, Any]): Task specification

  • context (Dict[str, Any]): Execution context

Returns: Dict[str, Any] - Task execution result

async review_result(task: Dict[str, Any], result: Dict[str, Any]) -> Dict[str, Any]

Review another agent’s task result.

Parameters:

  • task (Dict[str, Any]): Original task specification

  • result (Dict[str, Any]): Task execution result to review

Returns: Dict[str, Any] - Review result with ‘approved’ (bool) and ‘feedback’ (str)

Example Implementation

class CollaborativeAgent(BaseAIAgent):
    agent_id: str
    name: str
    capabilities: List[str]
    
    async def execute_task(self, task: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
        return {"success": True, "output": "result"}
    
    async def review_result(self, task: Dict[str, Any], result: Dict[str, Any]) -> Dict[str, Any]:
        return {"approved": True, "feedback": "Looks good"}

# Use with agent registry
agent_registry = {
    "agent1": CollaborativeAgent(...),
    "agent2": CollaborativeAgent(...)
}

agent = HybridAgent(
    collaboration_enabled=True,
    agent_registry=agent_registry,
    ...
)

ToolObservation

Structured observation of tool execution results.

Module: aiecs.domain.agent.models

from aiecs.domain.agent.models import ToolObservation

class ToolObservation(BaseModel):
    tool_name: str
    parameters: Dict[str, Any] = {}
    result: Any = None
    success: bool
    error: Optional[str] = None
    execution_time_ms: Optional[float] = None
    timestamp: str

Fields

Field Descriptions

tool_name: str

Name of the tool that was executed.

parameters: Dict[str, Any] = {}

Dictionary of parameters passed to the tool.

result: Any = None

Tool execution result (any type).

success: bool

Whether tool execution succeeded (True/False).

error: Optional[str] = None

Error message if execution failed (None if successful).

execution_time_ms: Optional[float] = None

Execution time in milliseconds (None if not measured, must be >= 0).

timestamp: str

ISO format timestamp of execution (auto-generated if not provided).

Methods

Method Descriptions

to_dict() -> Dict[str, Any]

Convert observation to dictionary for serialization.

Returns: Dict[str, Any] - Dictionary representation

Example:

obs = ToolObservation(tool_name="search", success=True, result="data")
data = obs.to_dict()
# {'tool_name': 'search', 'success': True, 'result': 'data', ...}
to_text() -> str

Format observation as text for LLM context.

Returns: str - Formatted text representation

Example:

obs = ToolObservation(
    tool_name="search",
    parameters={"query": "AI"},
    success=True,
    result="Found 10 results",
    execution_time_ms=250.5
)
text = obs.to_text()
# "Tool: search
# Parameters: {'query': 'AI'}
# Status: SUCCESS
# Result: Found 10 results
# Execution time: 250.5ms
# Timestamp: 2024-01-01T12:00:00"

Enhanced Methods

_execute_tool_with_observation()

Execute a tool and return structured observation.

Module: aiecs.domain.agent.hybrid_agent

Class: HybridAgent

async def _execute_tool_with_observation(
    self,
    tool_name: str,
    operation: Optional[str],
    parameters: Dict[str, Any],
) -> ToolObservation

Description

Wraps tool execution with automatic success/error tracking, execution time measurement, and structured result formatting. Returns a ToolObservation object that can be used for debugging, analysis, and LLM reasoning loops.

Parameters

  • tool_name (str): Name of the tool to execute

  • operation (Optional[str]): Optional operation name

  • parameters (Dict[str, Any]): Tool parameters

Returns

ToolObservation - Structured observation with execution details including:

  • tool_name: Name of the tool

  • parameters: Parameters passed

  • result: Tool execution result (or None if failed)

  • success: Whether execution succeeded

  • error: Error message if failed

  • execution_time_ms: Execution time in milliseconds

  • timestamp: ISO timestamp

Example

# Execute tool with observation
obs = await agent._execute_tool_with_observation(
    tool_name="search",
    operation="query",
    parameters={"q": "Python"}
)

# Check success
if obs.success:
    print(f"Found results: {obs.result}")
    print(f"Execution time: {obs.execution_time_ms}ms")
else:
    print(f"Error: {obs.error}")

# Format for LLM context
observation_text = obs.to_text()
# Include in LLM prompt

Notes

  • Automatically measures execution time

  • Catches exceptions and returns error observation

  • Returns observation even on failure (with success=False)

  • Essential for MasterController compatibility and observation-based reasoning


Performance Enhancement APIs

Caching APIs

execute_tool_with_cache(tool_name: str, parameters: Dict[str, Any]) -> Any

Execute tool with caching support. See Tool Execution section.

invalidate_cache(tool_name: Optional[str] = None, pattern: Optional[str] = None) -> int

Invalidate cache entries. See Tool Execution section.

get_cache_stats() -> Dict[str, Any]

Get cache statistics. See Tool Execution section.

Parallel Execution APIs

execute_tools_parallel(tool_calls: List[Dict[str, Any]], max_concurrency: int = 5) -> List[Dict[str, Any]]

Execute multiple tools in parallel. See Tool Execution section.

analyze_tool_dependencies(tool_calls: List[Dict[str, Any]]) -> Dict[str, List[str]]

Analyze dependencies between tool calls.

Parameters:

  • tool_calls (List[Dict[str, Any]]): List of tool calls

Returns: Dict[str, List[str]] - Dependency graph mapping tool names to dependent tools

execute_tools_with_dependencies(tool_calls: List[Dict[str, Any]], max_concurrency: int = 5) -> List[Dict[str, Any]]

Execute tools respecting dependencies.

Parameters:

  • tool_calls (List[Dict[str, Any]]): List of tool calls

  • max_concurrency (int): Maximum concurrent executions

Returns: List[Dict[str, Any]] - Results in execution order

Streaming APIs

execute_task_streaming(task: Dict[str, Any], context: Dict[str, Any]) -> AsyncIterator[Dict[str, Any]]

Execute task with streaming response. See Streaming section.

process_message_streaming(message: str, sender_id: Optional[str] = None) -> AsyncIterator[str]

Process message with streaming response. See Streaming section.


Reliability Enhancement APIs

Error Recovery APIs

execute_with_recovery(task: Dict[str, Any], context: Dict[str, Any], strategies: Optional[List[str]] = None) -> Dict[str, Any]

Execute task with recovery strategies. See Error Recovery section.

Resource Management APIs

check_resource_availability() -> Dict[str, Any]

Check if resources are available. See Resource Management section.

wait_for_resources(timeout: Optional[float] = None) -> bool

Wait for resources to become available. See Resource Management section.

get_resource_usage() -> Dict[str, Any]

Get current resource usage. See Resource Management section.

Checkpointing APIs

save_checkpoint(session_id: Optional[str] = None) -> Optional[str]

Save agent state checkpoint. See Checkpointing section.

load_checkpoint(checkpoint_id: str) -> bool

Load agent state from checkpoint. See Checkpointing section.


Observability Enhancement APIs

Metrics APIs

get_metrics() -> AgentMetrics

Get agent metrics. See Metrics and Performance section.

update_metrics(execution_time: Optional[float] = None, success: Optional[bool] = None, tokens_used: Optional[int] = None, tool_calls: Optional[int] = None) -> None

Update agent metrics. See Metrics and Performance section.

get_performance_metrics() -> Dict[str, Any]

Get performance metrics. See Metrics and Performance section.

reset_metrics() -> None

Reset agent metrics. See Metrics and Performance section.

Health Status APIs

get_health_status() -> Dict[str, Any]

Get agent health status. See Metrics and Performance section.

get_comprehensive_status() -> Dict[str, Any]

Get comprehensive agent status. See Metrics and Performance section.

Operation Tracking APIs

track_operation_time(operation_name: str) -> OperationTimer

Track operation execution time (context manager). See Metrics and Performance section.

Session Metrics APIs

update_session_metrics(session_id: str, processing_time: float, is_error: bool = False) -> None

Update session-level metrics.

Parameters:

  • session_id (str): Session ID

  • processing_time (float): Processing time in seconds

  • is_error (bool): Whether request resulted in error


This API reference covers the main classes, methods, models, configuration classes, protocols, and enhancement APIs. For additional examples and usage patterns, see the integration guides and examples documentation.