Source code for aiecs.domain.agent.base_agent

# /*---------------------------------------------------------------------------------------------
#  *  Copyright (c) IRETBL Corporation. All rights reserved.
#  *  Licensed under the Apache-2.0. See License.txt in the project root for license information.
#  *--------------------------------------------------------------------------------------------*/
"""
Base AI Agent

Abstract base class for all AI agents in the AIECS system.
"""

from abc import ABC, abstractmethod
from datetime import datetime
from typing import Dict, List, Any, Optional, Callable, Union, TYPE_CHECKING, AsyncGenerator, Set, cast
from dataclasses import dataclass
import logging
import time
import asyncio
import json

from .models import (
    AgentState,
    AgentType,
    AgentConfiguration,
    AgentGoal,
    AgentMetrics,
    AgentCapabilityDeclaration,
    GoalStatus,
    GoalPriority,
    MemoryType,
)
from .exceptions import (
    InvalidStateTransitionError,
    ConfigurationError,
    AgentInitializationError,
    SerializationError,
    TaskExecutionError,
)

# Import protocols for type hints
from aiecs.domain.agent.plugins.errors import raise_plugin_reload_error
from aiecs.domain.agent.plugins.manager import PluginManager
from aiecs.domain.agent.plugins.models import PluginLoadResult
from aiecs.domain.agent.plugins.registry import PluginRegistry

if TYPE_CHECKING:
    from aiecs.llm.protocols import LLMClientProtocol
    from aiecs.domain.agent.integration.protocols import (
        ConfigManagerProtocol,
        CheckpointerProtocol,
    )
    from aiecs.tools.base_tool import BaseTool
    from aiecs.domain.context.context_engine import ContextEngine
    from aiecs.domain.agent.tools import SkillScriptRegistry, Tool
    from aiecs.domain.agent.skills import SkillRegistry
    from aiecs.domain.agent.plugins.base import BaseAgentPlugin
    from aiecs.domain.temporal_memory.engine import TemporalMemoryEngine
    from aiecs.domain.agent.plugins.models import PluginConfig
    from aiecs.domain.agent.plugins.schema.manifest import PluginManifest

logger = logging.getLogger(__name__)


[docs] class OperationTimer: """ Context manager for timing operations and tracking metrics. Automatically records operation duration and can be used to track operation-level performance metrics. Example: with agent.track_operation_time("llm_call") as timer: result = llm.generate(prompt) # timer.duration contains the elapsed time in seconds """
[docs] def __init__(self, operation_name: str, agent: Optional["BaseAIAgent"] = None): """ Initialize operation timer. Args: operation_name: Name of the operation being timed agent: Optional agent instance for automatic metrics recording """ self.operation_name = operation_name self.agent = agent self.start_time: Optional[float] = None self.end_time: Optional[float] = None self.duration: Optional[float] = None self.error: Optional[Exception] = None
[docs] def __enter__(self) -> "OperationTimer": """Start timing.""" self.start_time = time.time() return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb) -> None: """ Stop timing and record metrics. Args: exc_type: Exception type if an error occurred exc_val: Exception value if an error occurred exc_tb: Exception traceback if an error occurred Returns: False to propagate exceptions """ self.end_time = time.time() if self.start_time is not None: self.duration = self.end_time - self.start_time # Track error if one occurred if exc_val is not None: self.error = exc_val # Record metrics if agent is provided if self.agent and self.duration is not None: self.agent._record_operation_metrics( operation_name=self.operation_name, duration=self.duration, success=exc_val is None, ) # Don't suppress exceptions return None
[docs] def get_duration_ms(self) -> Optional[float]: """Get duration in milliseconds.""" return self.duration * 1000 if self.duration is not None else None
[docs] @dataclass class CacheConfig: """ Configuration for tool result caching. Provides control over caching behavior to improve performance and reduce costs by avoiding redundant tool executions. Supports TTL-based expiration, size limits, and automatic cleanup. **Key Features:** - TTL-based cache expiration (default and per-tool) - Size limits to prevent memory exhaustion - Automatic cleanup when capacity threshold reached - Configurable cache key generation - Input hashing for large parameters Attributes: enabled: Enable/disable caching globally default_ttl: Default time-to-live in seconds for cached entries (default: 300 = 5 minutes) tool_specific_ttl: Dictionary mapping tool names to custom TTL values (overrides default_ttl) max_cache_size: Maximum number of cached entries before cleanup (default: 1000) max_memory_mb: Maximum cache memory usage in MB (approximate, default: 100) cleanup_interval: Interval in seconds between cleanup checks (default: 60) cleanup_threshold: Capacity threshold (0.0-1.0) to trigger cleanup (default: 0.9 = 90%) include_timestamp_in_key: Whether to include timestamp in cache key (default: False) hash_large_inputs: Whether to hash inputs larger than 1KB for cache keys (default: True) Examples: # Example 1: Basic caching configuration config = CacheConfig( enabled=True, default_ttl=300, # 5 minutes max_cache_size=1000 ) # Example 2: Per-tool TTL overrides config = CacheConfig( enabled=True, default_ttl=300, tool_specific_ttl={ "search": 600, # Search results cached for 10 minutes "calculator": 3600, # Calculator results cached for 1 hour "weather": 1800 # Weather data cached for 30 minutes } ) # Example 3: Aggressive caching for expensive operations config = CacheConfig( enabled=True, default_ttl=3600, # 1 hour default max_cache_size=5000, max_memory_mb=500, cleanup_threshold=0.95 # Cleanup at 95% capacity ) # Example 4: Disable caching for time-sensitive tools config = CacheConfig( enabled=False # Disable caching entirely ) # Example 5: Cache with timestamp-aware keys config = CacheConfig( enabled=True, default_ttl=300, include_timestamp_in_key=True # Include timestamp for time-sensitive caching ) """ # Cache enablement enabled: bool = True # Enable/disable caching # TTL settings default_ttl: int = 300 # Default TTL in seconds (5 minutes) tool_specific_ttl: Optional[Dict[str, int]] = None # Per-tool TTL overrides # Size limits max_cache_size: int = 1000 # Maximum number of cached entries max_memory_mb: int = 100 # Maximum cache memory in MB (approximate) # Cleanup settings cleanup_interval: int = 60 # Cleanup interval in seconds cleanup_threshold: float = 0.9 # Trigger cleanup at 90% capacity # Cache key settings include_timestamp_in_key: bool = False # Include timestamp in cache key hash_large_inputs: bool = True # Hash inputs larger than 1KB
[docs] def __post_init__(self): """Initialize defaults.""" if self.tool_specific_ttl is None: self.tool_specific_ttl = {}
[docs] def get_ttl(self, tool_name: str) -> int: """ Get TTL for a specific tool. Args: tool_name: Name of the tool Returns: TTL in seconds """ if self.tool_specific_ttl is None: return self.default_ttl return self.tool_specific_ttl.get(tool_name, self.default_ttl)
from .skills.mixin import SkillCapableMixin # noqa: E402
[docs] class BaseAIAgent(SkillCapableMixin, ABC): """ Abstract base class for AI agents. Provides common functionality for agent lifecycle management, state management, memory, goals, and metrics tracking. This base class supports extensive flexibility and advanced features: **Tool Flexibility:** - Accept tool names (List[str]) for backward compatibility - Accept pre-configured tool instances (Dict[str, BaseTool]) with preserved state - Automatic tool loading and validation **LLM Client Flexibility:** - Accept any object implementing LLMClientProtocol (duck typing) - No requirement for BaseLLMClient inheritance - Custom LLM client wrappers fully supported **Advanced Features:** - ContextEngine integration for persistent conversation history - Custom config managers for dynamic configuration - Checkpointers for state persistence (LangGraph compatible) - Agent collaboration (delegation, peer review, consensus) - Agent learning from experiences - Resource management (rate limiting, quotas) - Performance tracking and health monitoring - Tool result caching - Parallel tool execution - Streaming responses - Error recovery strategies Examples: # Example 1: Basic agent with tool names (backward compatible) agent = HybridAgent( agent_id="agent1", name="My Agent", agent_type=AgentType.HYBRID, config=config, tools=["search", "calculator"] # Tool names loaded by subclass ) # Example 2: Agent with tool instances (preserves tool state) from aiecs.tools import BaseTool class StatefulSearchTool(BaseTool): def __init__(self, api_key: str): self.api_key = api_key self.call_count = 0 # State preserved async def run_async(self, query: str): self.call_count += 1 return f"Search results for: {query}" agent = HybridAgent( agent_id="agent1", name="My Agent", agent_type=AgentType.HYBRID, config=config, tools={ "search": StatefulSearchTool(api_key="..."), "calculator": CalculatorTool() }, llm_client=OpenAIClient() ) # Tool state (call_count) is preserved across agent operations # Example 3: Agent with custom LLM client (no BaseLLMClient inheritance) class CustomLLMClient: provider_name = "custom" async def generate_text(self, messages, **kwargs): # Custom implementation return LLMResponse(content="...", provider="custom") async def stream_text(self, messages, **kwargs): async for token in self._custom_stream(): yield token async def close(self): # Cleanup pass agent = LLMAgent( agent_id="agent1", name="My LLM Agent", agent_type=AgentType.CONVERSATIONAL, config=config, llm_client=CustomLLMClient() # Works without BaseLLMClient! ) # Example 4: Agent with ContextEngine for persistent storage from aiecs.domain.context import ContextEngine context_engine = ContextEngine() await context_engine.initialize() agent = HybridAgent( agent_id="agent1", name="My Agent", agent_type=AgentType.HYBRID, config=config, tools=["search"], llm_client=OpenAIClient(), context_engine=context_engine # Enables persistent conversation history ) # Conversation history persists across agent restarts # Example 5: Agent with custom config manager class DatabaseConfigManager: async def get_config(self, key: str): # Load from database return await db.get_config(key) async def update_config(self, key: str, value: Any): # Update in database await db.update_config(key, value) agent = HybridAgent( agent_id="agent1", name="My Agent", agent_type=AgentType.HYBRID, config=config, tools=["search"], llm_client=OpenAIClient(), config_manager=DatabaseConfigManager() # Dynamic config loading ) # Example 6: Agent with checkpointer for LangGraph integration class RedisCheckpointer: async def save(self, agent_id: str, state: Dict[str, Any]): await redis.set(f"checkpoint:{agent_id}", json.dumps(state)) async def load(self, agent_id: str) -> Optional[Dict[str, Any]]: data = await redis.get(f"checkpoint:{agent_id}") return json.loads(data) if data else None agent = HybridAgent( agent_id="agent1", name="My Agent", agent_type=AgentType.HYBRID, config=config, tools=["search"], llm_client=OpenAIClient(), checkpointer=RedisCheckpointer() # LangGraph-compatible checkpointing ) # Example 7: Agent with collaboration features agent_registry = { "agent2": other_agent_instance, "agent3": another_agent_instance } agent = HybridAgent( agent_id="agent1", name="My Agent", agent_type=AgentType.HYBRID, config=config, tools=["search"], llm_client=OpenAIClient(), collaboration_enabled=True, agent_registry=agent_registry # Enable delegation and peer review ) # Delegate task to another agent result = await agent.delegate_task( task_description="Analyze this data", target_agent_id="agent2" ) # Example 8: Agent with learning enabled from aiecs.domain.agent.models import ResourceLimits agent = HybridAgent( agent_id="agent1", name="My Agent", agent_type=AgentType.HYBRID, config=config, tools=["search"], llm_client=OpenAIClient(), learning_enabled=True # Learn from past experiences ) # Record experience await agent.record_experience( task_type="data_analysis", approach="parallel_tools", success=True, execution_time=2.5 ) # Get recommended approach based on history approach = await agent.get_recommended_approach("data_analysis") print(f"Recommended: {approach}") # Example 9: Agent with resource limits from aiecs.domain.agent.models import ResourceLimits resource_limits = ResourceLimits( max_concurrent_tasks=5, max_tokens_per_minute=10000, max_tool_calls_per_minute=100 ) agent = HybridAgent( agent_id="agent1", name="My Agent", agent_type=AgentType.HYBRID, config=config, tools=["search"], llm_client=OpenAIClient(), resource_limits=resource_limits # Rate limiting and quotas ) # Check resource availability before executing if await agent.check_resource_availability(): result = await agent.execute_task(task, context) else: await agent.wait_for_resources(timeout=30.0) # Example 10: Agent with performance tracking agent = HybridAgent( agent_id="agent1", name="My Agent", agent_type=AgentType.HYBRID, config=config, tools=["search"], llm_client=OpenAIClient() ) # Track operation performance with agent.track_operation_time("data_processing"): result = await agent.execute_task(task, context) # Get performance metrics metrics = agent.get_performance_metrics() print(f"Average response time: {metrics['avg_response_time']}s") print(f"P95 response time: {metrics['p95_response_time']}s") # Get health status health = agent.get_health_status() print(f"Health score: {health['score']}") print(f"Status: {health['status']}") # Example 11: Agent with tool caching agent = HybridAgent( agent_id="agent1", name="My Agent", agent_type=AgentType.HYBRID, config=config, tools=["search"], llm_client=OpenAIClient() ) # Execute tool with caching (30 second TTL) result1 = await agent.execute_tool_with_cache( tool_name="search", operation="query", parameters={"q": "AI"}, cache_ttl=30 ) # Second call uses cache (no API call) result2 = await agent.execute_tool_with_cache( tool_name="search", operation="query", parameters={"q": "AI"}, cache_ttl=30 ) # Get cache statistics stats = agent.get_cache_stats() print(f"Cache hit rate: {stats['hit_rate']:.1%}") # Example 12: Agent with parallel tool execution agent = HybridAgent( agent_id="agent1", name="My Agent", agent_type=AgentType.HYBRID, config=config, tools=["search", "calculator", "translator"], llm_client=OpenAIClient() ) # Execute multiple independent tools in parallel (3-5x faster) results = await agent.execute_tools_parallel([ {"tool": "search", "operation": "query", "parameters": {"q": "AI"}}, {"tool": "calculator", "operation": "add", "parameters": {"a": 1, "b": 2}}, {"tool": "translator", "operation": "translate", "parameters": {"text": "Hello"}} ], max_concurrency=3) # Example 13: Agent with streaming responses agent = HybridAgent( agent_id="agent1", name="My Agent", agent_type=AgentType.HYBRID, config=config, tools=["search"], llm_client=OpenAIClient() ) # Stream task execution (tokens + tool calls) async for event in agent.execute_task_streaming(task, context): if event['type'] == 'token': print(event['content'], end='', flush=True) elif event['type'] == 'tool_call': print(f"\\nCalling {event['tool_name']}...") elif event['type'] == 'result': print(f"\\nFinal result: {event['output']}") # Example 14: Agent with error recovery agent = HybridAgent( agent_id="agent1", name="My Agent", agent_type=AgentType.HYBRID, config=config, tools=["search"], llm_client=OpenAIClient() ) # Execute with automatic recovery strategies result = await agent.execute_with_recovery( task=task, context=context, strategies=["retry", "simplify", "fallback", "delegate"] ) # Automatically tries retry → simplify → fallback → delegate if errors occur """ _plugin_manager: Optional["PluginManager"] = None _plugin_registry: Optional["PluginRegistry"] = None graph_store: Any | None = None enable_graph_reasoning: bool = True _knowledge_context: dict[str, dict[str, Any]] | None = None temporal_memory_enabled: bool = False temporal_memory_engine: Optional["TemporalMemoryEngine"] = None
[docs] def __init__( self, agent_id: str, name: str, agent_type: AgentType, config: AgentConfiguration, description: Optional[str] = None, version: str = "1.0.0", tools: Optional[Union[List[str], Dict[str, "BaseTool"]]] = None, llm_client: Optional["LLMClientProtocol"] = None, config_manager: Optional["ConfigManagerProtocol"] = None, checkpointer: Optional["CheckpointerProtocol"] = None, context_engine: Optional["ContextEngine"] = None, collaboration_enabled: bool = False, agent_registry: Optional[Dict[str, Any]] = None, learning_enabled: bool = False, resource_limits: Optional[Any] = None, skill_script_registry: Optional["SkillScriptRegistry"] = None, skill_registry: Optional["SkillRegistry"] = None, plugin_registry: Optional["PluginRegistry"] = None, ): """ Initialize the base agent. Args: agent_id: Unique identifier for the agent name: Agent name agent_type: Type of agent config: Agent configuration description: Optional agent description version: Agent version tools: Optional tools - either list of tool names or dict of tool instances. List[str]: Tool names to be loaded by subclass Dict[str, BaseTool]: Pre-configured tool instances with state llm_client: Optional LLM client (any object implementing LLMClientProtocol). Supports custom LLM clients without BaseLLMClient inheritance. config_manager: Optional configuration manager for dynamic config loading checkpointer: Optional checkpointer for state persistence (LangGraph compatible) context_engine: Optional ContextEngine instance for persistent conversation history collaboration_enabled: Enable agent collaboration features (delegation, peer review) agent_registry: Registry of other agents for collaboration (agent_id -> agent instance) learning_enabled: Enable agent learning from experiences resource_limits: Optional resource limits configuration and session management. If provided, enables persistent storage across agent restarts. skill_script_registry: Optional SkillScriptRegistry for managing tools from skill scripts. If provided, enables dynamic tool registration via add_tool(), remove_tool(), etc. skill_registry: Optional SkillRegistry for loading skills by name. If provided along with config.skills_enabled=True, enables skill support. plugin_registry: Optional PluginRegistry for builtin and extension plugins. Defaults to ``PluginRegistry.default()`` when omitted. Example: # With tool instances and ContextEngine from aiecs.domain.context import ContextEngine context_engine = ContextEngine() await context_engine.initialize() agent = HybridAgent( agent_id="agent1", name="My Agent", agent_type=AgentType.HYBRID, config=config, tools={ "search": SearchTool(api_key="..."), "calculator": CalculatorTool() }, llm_client=CustomLLMClient(), # Custom client, no inheritance needed config_manager=DatabaseConfigManager(), checkpointer=RedisCheckpointer(), context_engine=context_engine # Enables persistent storage ) # With tool names (backward compatible) agent = HybridAgent( agent_id="agent1", name="My Agent", agent_type=AgentType.HYBRID, config=config, tools=["search", "calculator"] # Loaded by subclass ) """ # Identity self.agent_id = agent_id self.name = name self.agent_type = agent_type self.description = description or f"{agent_type.value} agent" self.version = version # Configuration self._config = config self._config_manager = config_manager # State self._state = AgentState.CREATED self._previous_state: Optional[AgentState] = None # Memory storage (in-memory dict, can be replaced with sophisticated # storage) self._memory: Dict[str, Any] = {} self._memory_metadata: Dict[str, Dict[str, Any]] = {} # Goals self._goals: Dict[str, AgentGoal] = {} # Capabilities self._capabilities: Dict[str, AgentCapabilityDeclaration] = {} # Metrics self._metrics = AgentMetrics() # type: ignore[call-arg] # Timestamps self.created_at = datetime.utcnow() self.updated_at = datetime.utcnow() self.last_active_at: Optional[datetime] = None # Current task tracking self._current_task_id: Optional[str] = None # Tools (optional - only set if tools provided) self._tools_input = tools # Store original input self._available_tools: Optional[List[str]] = None self._tool_instances: Optional[Dict[str, "BaseTool"]] = None # Skill script registry (optional - for dynamic tool management from skills) self._skill_script_registry: Optional["SkillScriptRegistry"] = skill_script_registry # LLM client (optional) self._llm_client = llm_client # Checkpointer (optional) self._checkpointer = checkpointer # ContextEngine (optional - Phase 4 enhancement) self._context_engine = context_engine # Tool result cache (Phase 7 enhancement) self._cache_config = CacheConfig() self._tool_cache: Dict[str, Any] = {} # Cache key -> result self._cache_timestamps: Dict[str, float] = {} # Cache key -> timestamp self._cache_access_count: Dict[str, int] = {} # Cache key -> access count self._last_cleanup_time = time.time() # Agent collaboration (Phase 7 enhancement - tasks 1.15.15-1.15.22) self._collaboration_enabled = collaboration_enabled self._agent_registry = agent_registry or {} # Agent learning (Phase 8 enhancement - tasks 1.16.4-1.16.10) self._learning_enabled = learning_enabled self._experiences: List[Any] = [] # List of Experience objects self._max_experiences = 1000 # Limit stored experiences # Resource management (Phase 8 enhancement - tasks 1.16.11-1.16.17) from .models import ResourceLimits self._resource_limits = resource_limits or ResourceLimits() # type: ignore[call-arg] self._active_tasks: set = set() # Set of active task IDs self._token_usage_window: List[tuple] = [] # List of (timestamp, token_count) self._tool_call_window: List[float] = [] # List of timestamps # Skill support (via SkillCapableMixin) # Initialize skill-related state from the mixin self.__init_skills__( skill_registry=skill_registry, tool_registry=skill_script_registry, # Use skill_script_registry as tool_registry ) self.graph_store = None self.enable_graph_reasoning = True self._knowledge_context = None self.temporal_memory_enabled = False self.temporal_memory_engine = None features = [] if context_engine: features.append("ContextEngine") if collaboration_enabled: features.append("collaboration") if learning_enabled: features.append("learning") if resource_limits: features.append("resource limits") if config.skills_enabled: features.append("skills") feature_str = f" with {', '.join(features)}" if features else "" logger.info(f"Agent initialized: {self.agent_id} ({self.name}, {self.agent_type.value}){feature_str}") self._plugin_registry = plugin_registry or PluginRegistry.default() self._loaded_plugin_manifests: list[PluginManifest] = [] plugin_configs, _ = self._resolve_plugin_configs() self._plugin_manager = PluginManager( agent=self, plugin_configs=plugin_configs, registry=self._plugin_registry, )
def _resolve_plugin_configs( self, task: Optional[Dict[str, Any]] = None, context: Optional[Dict[str, Any]] = None, ) -> tuple[list["PluginConfig"], list[str]]: """ Merge plugin configuration for agent startup or per-task refresh (§6.3, §8.1). Merged with optional per-task ``task`` / ``context`` overlays (§6.3, P2-09). """ from aiecs.domain.agent.plugins.defaults import derive_plugin_configs return derive_plugin_configs(self._config, self, task=task, context=context) def _resolve_manifest_plugin_class( self, manifest: "PluginManifest", ) -> type["BaseAgentPlugin"] | None: """ Optional hook: return a plugin class for a loaded external manifest. When ``None`` (default), manifests from ``plugin_manifest_paths`` / ``extra_plugin_dirs`` are validated for dependencies only; register factories via :meth:`PluginRegistry.register_from_manifest` before init or override this hook in subclasses. """ return None def _apply_plugin_manifest_paths(self) -> None: """Load manifest files from config and validate dependency order (§9.1).""" from aiecs.domain.agent.plugins.manifest_loader import ( collect_manifests_from_config, sort_manifests_by_dependencies, ) manifests = collect_manifests_from_config(self._config) if not manifests: self._loaded_plugin_manifests = [] return registry = self._plugin_registry or PluginRegistry.default() ordered = sort_manifests_by_dependencies(manifests, registry) self._loaded_plugin_manifests = ordered for manifest in ordered: if registry.get_entry(manifest.name) is not None: continue plugin_class = self._resolve_manifest_plugin_class(manifest) if plugin_class is not None: registry.register_from_manifest(manifest, plugin_class) def _apply_task_plugin_configs( self, task: Optional[Dict[str, Any]] = None, context: Optional[Dict[str, Any]] = None, ) -> None: """Refresh resolved plugin configs on the manager for this task (§8.2, P2-09).""" if self._plugin_manager is None: return plugin_configs, _ = self._resolve_plugin_configs(task=task, context=context) self._plugin_manager._plugin_configs = list(plugin_configs) # ==================== State Management ==================== @property def state(self) -> AgentState: """Get current agent state.""" return self._state
[docs] def get_state(self) -> AgentState: """Get current agent state.""" return self._state
def _transition_state(self, new_state: AgentState) -> None: """ Transition to a new state with validation. Args: new_state: Target state Raises: InvalidStateTransitionError: If transition is invalid """ # Define valid transitions valid_transitions = { AgentState.CREATED: {AgentState.INITIALIZING}, AgentState.INITIALIZING: {AgentState.ACTIVE, AgentState.ERROR}, AgentState.ACTIVE: { AgentState.BUSY, AgentState.IDLE, AgentState.STOPPED, AgentState.ERROR, }, AgentState.BUSY: {AgentState.ACTIVE, AgentState.ERROR}, AgentState.IDLE: {AgentState.ACTIVE, AgentState.STOPPED}, AgentState.ERROR: {AgentState.ACTIVE, AgentState.STOPPED}, AgentState.STOPPED: set(), # Terminal state } if new_state not in valid_transitions.get(self._state, set()): raise InvalidStateTransitionError( agent_id=self.agent_id, current_state=self._state.value, attempted_state=new_state.value, ) self._previous_state = self._state self._state = new_state self.updated_at = datetime.utcnow() logger.info(f"Agent {self.agent_id} state: {self._previous_state.value}{new_state.value}") # ==================== Lifecycle Methods ====================
[docs] async def initialize(self) -> None: """ Initialize the agent. This method should be called before the agent can be used. Override in subclasses to add initialization logic. Raises: AgentInitializationError: If initialization fails """ try: self._transition_state(AgentState.INITIALIZING) logger.info(f"Initializing agent {self.agent_id}...") # Subclass initialization await self._initialize() self._transition_state(AgentState.ACTIVE) self.last_active_at = datetime.utcnow() logger.info(f"Agent {self.agent_id} initialized successfully") except Exception as e: self._transition_state(AgentState.ERROR) logger.error(f"Agent {self.agent_id} initialization failed: {e}") raise AgentInitializationError( f"Failed to initialize agent {self.agent_id}: {str(e)}", agent_id=self.agent_id, )
async def _initialize(self, *, force_reload_plugins: bool = False) -> None: """ Base initialization: builtin/extension plugins via ``PluginManager``. Subclasses must call ``await super()._initialize()`` before their own setup. Args: force_reload_plugins: When True, run ``reload_plugins()`` instead of a first-time ``initialize()`` on the existing manager (§8.1, §9.7). """ if force_reload_plugins: await self.reload_plugins() else: self._apply_plugin_manifest_paths() if self._plugin_manager is not None: await self._plugin_manager.initialize()
[docs] async def reload_plugins(self) -> PluginLoadResult: """ Explicit plugin reload: shutdown → re-derive config → new manager → initialize (§9.7). Rejected while the agent is BUSY or ``_current_task_id`` is set. Does not run automatically on configuration changes mid-task. """ if self._state == AgentState.BUSY or self._current_task_id is not None: raise_plugin_reload_error( "Cannot reload plugins while a task is in progress", details={ "agent_id": self.agent_id, "state": self._state.value, "current_task_id": self._current_task_id, }, ) if self._plugin_manager is not None: await self._plugin_manager.shutdown() self._apply_plugin_manifest_paths() plugin_configs, _ = self._resolve_plugin_configs() self._plugin_manager = PluginManager( agent=self, plugin_configs=plugin_configs, registry=self._plugin_registry, ) await self._plugin_manager.initialize() return self._plugin_manager.last_load_result
[docs] async def activate(self) -> None: """Activate the agent.""" if self._state == AgentState.IDLE: self._transition_state(AgentState.ACTIVE) self.last_active_at = datetime.utcnow() logger.info(f"Agent {self.agent_id} activated") else: logger.warning(f"Agent {self.agent_id} cannot be activated from state {self._state.value}")
[docs] async def deactivate(self) -> None: """Deactivate the agent (enter idle state).""" if self._state == AgentState.ACTIVE: self._transition_state(AgentState.IDLE) logger.info(f"Agent {self.agent_id} deactivated") else: logger.warning(f"Agent {self.agent_id} cannot be deactivated from state {self._state.value}")
[docs] async def shutdown(self) -> None: """ Shutdown the agent. Override in subclasses to add cleanup logic. """ logger.info(f"Shutting down agent {self.agent_id}...") await self._shutdown() self._transition_state(AgentState.STOPPED) logger.info(f"Agent {self.agent_id} shut down")
async def _shutdown(self) -> None: """ Base shutdown: release plugin resources. Subclasses must call ``await super()._shutdown()`` after their own cleanup. """ if self._plugin_manager is not None: await self._plugin_manager.shutdown() # ==================== Tool and LLM Client Helper Methods ==================== def _load_tools(self) -> None: """ Load tools from the tools input parameter. Handles both List[str] (tool names) and Dict[str, BaseTool] (tool instances). Sets _available_tools and _tool_instances appropriately. This helper method should be called by subclasses during initialization if they want to use BaseAIAgent's tool management. Raises: ConfigurationError: If tools input is invalid """ if self._tools_input is None: # No tools provided return if isinstance(self._tools_input, list): # Tool names - store for subclass to load self._available_tools = self._tools_input logger.debug(f"Agent {self.agent_id}: Registered {len(self._tools_input)} tool names") elif isinstance(self._tools_input, dict): # Tool instances - validate and store from aiecs.tools.base_tool import BaseTool for tool_name, tool_instance in self._tools_input.items(): if not isinstance(tool_instance, BaseTool): raise ConfigurationError(f"Tool '{tool_name}' must be a BaseTool instance, got {type(tool_instance)}") self._tool_instances = self._tools_input self._available_tools = list(self._tools_input.keys()) logger.debug(f"Agent {self.agent_id}: Registered {len(self._tools_input)} tool instances") else: raise ConfigurationError(f"Tools must be List[str] or Dict[str, BaseTool], got {type(self._tools_input)}") def _validate_llm_client(self) -> None: """ Validate that the LLM client implements the required protocol. Checks that the LLM client has the required methods: - generate_text - stream_text - close - provider_name (property) This helper method should be called by subclasses during initialization if they want to use BaseAIAgent's LLM client validation. Raises: ConfigurationError: If LLM client doesn't implement required methods """ if self._llm_client is None: return required_methods = ["generate_text", "stream_text", "close"] required_properties = ["provider_name"] for method_name in required_methods: if not hasattr(self._llm_client, method_name): raise ConfigurationError(f"LLM client must implement '{method_name}' method") if not callable(getattr(self._llm_client, method_name)): raise ConfigurationError(f"LLM client '{method_name}' must be callable") for prop_name in required_properties: if not hasattr(self._llm_client, prop_name): raise ConfigurationError(f"LLM client must have '{prop_name}' property") logger.debug(f"Agent {self.agent_id}: LLM client validated successfully") def _get_tool_instances(self) -> Optional[Dict[str, "BaseTool"]]: """ Get tool instances dictionary. Returns: Dictionary of tool instances, or None if no tool instances available """ return self._tool_instances # ==================== Skill Script Tool Management Methods ====================
[docs] def add_tool(self, tool: "Tool", replace: bool = False) -> None: """ Add a tool from a skill script to the agent's registry. This method registers a lightweight Tool instance (from skill scripts) to the agent's SkillScriptRegistry. This is separate from BaseTool instances managed via _tool_instances. Args: tool: Tool instance to register replace: If True, replace existing tool with same name Raises: RuntimeError: If no SkillScriptRegistry is configured SkillScriptRegistryError: If tool already exists and replace=False Example: tool = Tool( name="my-tool", description="A custom tool", execute=my_async_function ) agent.add_tool(tool) """ if self._skill_script_registry is None: raise RuntimeError("Cannot add tool: no SkillScriptRegistry configured. " "Pass skill_script_registry to agent constructor.") self._skill_script_registry.register_tool(tool, replace=replace) logger.debug(f"Agent {self.agent_id}: Added tool '{tool.name}'")
[docs] def has_tool(self, tool_name: str) -> bool: """ Check if a tool exists in the skill script registry. Args: tool_name: Name of the tool to check Returns: True if tool exists, False otherwise """ if self._skill_script_registry is None: return False return self._skill_script_registry.has_tool(tool_name)
[docs] def remove_tool(self, tool_name: str) -> bool: """ Remove a tool from the skill script registry. Args: tool_name: Name of the tool to remove Returns: True if tool was removed, False if not found Raises: RuntimeError: If no SkillScriptRegistry is configured """ if self._skill_script_registry is None: raise RuntimeError("Cannot remove tool: no SkillScriptRegistry configured. " "Pass skill_script_registry to agent constructor.") result = self._skill_script_registry.unregister_tool(tool_name) if result: logger.debug(f"Agent {self.agent_id}: Removed tool '{tool_name}'") return result
[docs] def get_tool(self, tool_name: str) -> Optional["Tool"]: """ Get a tool from the skill script registry by name. Args: tool_name: Name of the tool to retrieve Returns: Tool instance if found, None otherwise """ if self._skill_script_registry is None: return None return self._skill_script_registry.get_tool(tool_name)
[docs] def list_skill_tools( self, tags: Optional[List[str]] = None, source: Optional[str] = None, ) -> List["Tool"]: """ List tools from the skill script registry. Args: tags: Optional list of tags to filter by (tools must have all tags) source: Optional source to filter by (e.g., skill name) Returns: List of matching Tool instances """ if self._skill_script_registry is None: return [] return self._skill_script_registry.list_tools(tags=tags, source=source)
@property def skill_script_registry(self) -> Optional["SkillScriptRegistry"]: """Get the skill script registry, if configured.""" return self._skill_script_registry # ==================== SkillCapableMixin Hook Overrides ==================== # These methods override the default implementations in SkillCapableMixin # to integrate with BaseAIAgent's tool management system. def _has_tool(self, tool_name: str) -> bool: """ Check if a tool exists (SkillCapableMixin hook override). Integrates with BaseAIAgent's has_tool() method and also checks _skill_tools from the mixin. """ # Check mixin's skill tools first if tool_name in self._skill_tools: return True # Then check the skill script registry return self.has_tool(tool_name) def _add_tool(self, tool: "Tool") -> None: """ Add a tool to the agent (SkillCapableMixin hook override). Integrates with BaseAIAgent's add_tool() method. """ if self._skill_script_registry is not None: self.add_tool(tool, replace=False) def _remove_tool(self, tool_name: str) -> None: """ Remove a tool from the agent (SkillCapableMixin hook override). Integrates with BaseAIAgent's remove_tool() method. """ if self._skill_script_registry is not None: try: self.remove_tool(tool_name) except RuntimeError: # Registry not configured, ignore pass def _build_base_system_prompt(self) -> str: """ Build base system prompt from configuration. This shared method builds a system prompt from AgentConfiguration fields. Subclasses can use this directly or extend it with additional instructions. Precedence order: 1. config.system_prompt - Direct custom prompt (highest priority) 2. Assembled from goal/backstory/domain_knowledge/reasoning_guidance 3. Default fallback: "You are a helpful AI assistant." Returns: Formatted system prompt string Example: ```python # In LLMAgent - use directly def _build_system_prompt(self) -> str: return self._build_base_system_prompt() # In HybridAgent - extend with ReAct instructions def _build_system_prompt(self) -> str: base = self._build_base_system_prompt() react_instructions = "Follow the ReAct pattern..." return f"{base}\\n\\n{react_instructions}" ``` """ # 1. Custom system_prompt takes precedence if self._config.system_prompt: return self._config.system_prompt # 2. Assemble from individual fields parts = [] if self._config.goal: parts.append(f"Goal: {self._config.goal}") if self._config.backstory: parts.append(f"Background: {self._config.backstory}") if self._config.domain_knowledge: parts.append(f"Domain Knowledge: {self._config.domain_knowledge}") if self._config.reasoning_guidance: parts.append(f"Reasoning Approach: {self._config.reasoning_guidance}") if parts: return "\n\n".join(parts) # 3. Default fallback return "You are a helpful AI assistant." def _build_system_prompts(self) -> List[Dict[str, Any]]: """ Build multiple system prompts from configuration. Returns a list of system prompt dictionaries, each with: - content: str - The prompt content - cache_control: Optional[bool] - Whether to enable caching (None = use global setting) Precedence order: 1. config.system_prompts - Multiple prompts with individual cache control (highest priority) 2. config.system_prompt - Single prompt (backward compatibility) 3. Assembled from goal/backstory/domain_knowledge/reasoning_guidance 4. Default fallback: "You are a helpful AI assistant." Returns: List of dictionaries with 'content' and optional 'cache_control' keys Example: ```python prompts = agent._build_system_prompts() # [ # {"content": "Fixed instructions...", "cache_control": True}, # {"content": "Dynamic context", "cache_control": False} # ] ``` """ # 1. Multiple system prompts take precedence if self._config.system_prompts: result = [] for prompt in self._config.system_prompts: if isinstance(prompt, str): # Simple string - use global cache setting result.append({"content": prompt, "cache_control": None}) # None means use global enable_prompt_caching elif isinstance(prompt, dict): # Dict with content and optional cache_control content = prompt.get("content", "") cache_control = prompt.get("cache_control") if cache_control is None: cache_control = None # Use global setting result.append({"content": content, "cache_control": cache_control}) return result # 2. Single system_prompt (backward compatibility) if self._config.system_prompt: return [{"content": self._config.system_prompt, "cache_control": None}] # Use global setting # 3. Assemble from individual fields parts = [] if self._config.goal: parts.append(f"Goal: {self._config.goal}") if self._config.backstory: parts.append(f"Background: {self._config.backstory}") if self._config.domain_knowledge: parts.append(f"Domain Knowledge: {self._config.domain_knowledge}") if self._config.reasoning_guidance: parts.append(f"Reasoning Approach: {self._config.reasoning_guidance}") if parts: return [{"content": "\n\n".join(parts), "cache_control": None}] # Use global setting # 4. Default fallback return [{"content": "You are a helpful AI assistant.", "cache_control": None}] # Use global setting def _initialize_tools_from_config(self) -> Dict[str, "BaseTool"]: """ Initialize and return tool instances from configuration. This shared method handles both tool names (List[str]) and tool instances (Dict[str, BaseTool]). It consolidates the tool loading logic used by ToolAgent and HybridAgent. Returns: Dictionary of tool name to BaseTool instance Example: ```python async def _initialize(self) -> None: # Load tools using shared method self._tool_instances = self._initialize_tools_from_config() ``` """ from aiecs.tools import get_tool # Process tools input: sets _tool_instances (dict) or _available_tools (list) self._load_tools() # Dict case: _load_tools() already populated _tool_instances directly if self._tool_instances: logger.debug(f"Agent {self.agent_id}: Using {len(self._tool_instances)} pre-configured tool instances") return self._tool_instances # Tool names were provided - load them tool_instances: Dict[str, "BaseTool"] = {} if self._available_tools: for tool_name in self._available_tools: try: tool_instances[tool_name] = get_tool(tool_name) logger.debug(f"Agent {self.agent_id}: Loaded tool: {tool_name}") except Exception as e: logger.warning(f"Failed to load tool {tool_name}: {e}") logger.debug(f"Agent {self.agent_id}: Initialized {len(tool_instances)} tools") return tool_instances def _extract_task_description(self, task: Dict[str, Any]) -> str: """ Extract task description from task dictionary. This shared method extracts the task description from various possible field names, providing a unified interface for all agent types. Args: task: Task specification dictionary Returns: Extracted task description string Raises: TaskExecutionError: If no description field is found Example: ```python async def execute_task(self, task, context): description = self._extract_task_description(task) # Use description for task execution ``` """ description = task.get("description") or task.get("prompt") or task.get("task") if not description: raise TaskExecutionError( "Task must contain 'description', 'prompt', or 'task' field", agent_id=self.agent_id, ) return str(description)
[docs] def get_config_manager(self) -> Optional["ConfigManagerProtocol"]: """ Get the configuration manager. Returns: Configuration manager instance, or None if not configured """ return self._config_manager
# ==================== Abstract Execution Methods ====================
[docs] @abstractmethod async def execute_task(self, task: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: """ Execute a task. Args: task: Task specification context: Execution context Returns: Task execution result Raises: TaskExecutionError: If task execution fails Note: Subclasses can use `_execute_with_retry()` to wrap task execution with automatic retry logic based on agent configuration. """
[docs] @abstractmethod async def process_message(self, message: str, sender_id: Optional[str] = None) -> Dict[str, Any]: """ Process an incoming message. Args: message: Message content sender_id: Optional sender identifier Returns: Response dictionary Note: Subclasses can use `_execute_with_retry()` to wrap message processing with automatic retry logic based on agent configuration. """
# ==================== Retry Logic Integration ==================== async def _execute_with_retry(self, func: Callable, *args, **kwargs) -> Any: """ Execute a function with retry logic using agent's retry policy. This helper method wraps function execution with automatic retry based on the agent's configuration. It uses EnhancedRetryPolicy for sophisticated error handling with exponential backoff and error classification. Args: func: Async function to execute *args: Function positional arguments **kwargs: Function keyword arguments Returns: Function result Raises: Exception: If all retries are exhausted Example: ```python async def _execute_task_internal(self, task, context): # Actual task execution logic return result async def execute_task(self, task, context): return await self._execute_with_retry( self._execute_task_internal, task, context ) ``` """ from .integration.retry_policy import EnhancedRetryPolicy # Get retry policy from configuration retry_config = self._config.retry_policy # Create retry policy instance retry_policy = EnhancedRetryPolicy( max_retries=retry_config.max_retries, base_delay=retry_config.base_delay, max_delay=retry_config.max_delay, exponential_base=retry_config.exponential_factor, jitter_factor=retry_config.jitter_factor, ) # Execute with retry return await retry_policy.execute_with_retry(func, *args, **kwargs) # ==================== Memory Management ====================
[docs] async def add_to_memory( self, key: str, value: Any, memory_type: MemoryType = MemoryType.SHORT_TERM, metadata: Optional[Dict[str, Any]] = None, ) -> None: """ Add an item to agent memory. Args: key: Memory key value: Memory value memory_type: Type of memory (short_term or long_term) metadata: Optional metadata """ self._memory[key] = value self._memory_metadata[key] = { "type": memory_type.value, "timestamp": datetime.utcnow(), "metadata": metadata or {}, } logger.debug(f"Agent {self.agent_id} added memory: {key} ({memory_type.value})")
[docs] async def retrieve_memory(self, key: str, default: Any = None) -> Any: """ Retrieve an item from memory. Args: key: Memory key default: Default value if key not found Returns: Memory value or default """ return self._memory.get(key, default)
[docs] async def clear_memory(self, memory_type: Optional[MemoryType] = None) -> None: """ Clear agent memory. Args: memory_type: If specified, clear only this type of memory """ if memory_type is None: self._memory.clear() self._memory_metadata.clear() logger.info(f"Agent {self.agent_id} cleared all memory") else: keys_to_remove = [k for k, v in self._memory_metadata.items() if v.get("type") == memory_type.value] for key in keys_to_remove: del self._memory[key] del self._memory_metadata[key] logger.info(f"Agent {self.agent_id} cleared {memory_type.value} memory")
[docs] def get_memory_summary(self) -> Dict[str, Any]: """Get a summary of agent memory.""" return { "total_items": len(self._memory), "short_term_count": sum(1 for v in self._memory_metadata.values() if v.get("type") == MemoryType.SHORT_TERM.value), "long_term_count": sum(1 for v in self._memory_metadata.values() if v.get("type") == MemoryType.LONG_TERM.value), }
# ==================== Goal Management ====================
[docs] def set_goal( self, description: str, priority: GoalPriority = GoalPriority.MEDIUM, success_criteria: Optional[str] = None, deadline: Optional[datetime] = None, ) -> str: """ Set a new goal for the agent. Args: description: Goal description priority: Goal priority success_criteria: Success criteria deadline: Goal deadline Returns: Goal ID """ goal = AgentGoal( # type: ignore[call-arg] description=description, priority=priority, success_criteria=success_criteria, deadline=deadline, ) self._goals[goal.goal_id] = goal logger.info(f"Agent {self.agent_id} set goal: {goal.goal_id} ({priority.value})") return goal.goal_id
[docs] def get_goals(self, status: Optional[GoalStatus] = None) -> List[AgentGoal]: """ Get agent goals. Args: status: Filter by status (optional) Returns: List of goals """ if status is None: return list(self._goals.values()) return [g for g in self._goals.values() if g.status == status]
[docs] def get_goal(self, goal_id: str) -> Optional[AgentGoal]: """Get a specific goal by ID.""" return self._goals.get(goal_id)
[docs] def update_goal_status( self, goal_id: str, status: GoalStatus, progress: Optional[float] = None, ) -> None: """ Update goal status. Args: goal_id: Goal ID status: New status progress: Optional progress percentage """ if goal_id not in self._goals: logger.warning(f"Goal {goal_id} not found for agent {self.agent_id}") return goal = self._goals[goal_id] goal.status = status if progress is not None: goal.progress = progress if status == GoalStatus.IN_PROGRESS and goal.started_at is None: goal.started_at = datetime.utcnow() elif status == GoalStatus.ACHIEVED: goal.achieved_at = datetime.utcnow() logger.info(f"Agent {self.agent_id} updated goal {goal_id}: {status.value}")
# ==================== Configuration Management ====================
[docs] def get_config(self) -> AgentConfiguration: """Get agent configuration.""" return self._config
[docs] def update_config(self, updates: Dict[str, Any]) -> None: """ Update agent configuration. Args: updates: Configuration updates Raises: ConfigurationError: If configuration is invalid """ try: # Update configuration for key, value in updates.items(): if hasattr(self._config, key): setattr(self._config, key, value) else: logger.warning(f"Unknown config key: {key}") self.updated_at = datetime.utcnow() logger.info(f"Agent {self.agent_id} configuration updated") except Exception as e: raise ConfigurationError( f"Failed to update configuration: {str(e)}", agent_id=self.agent_id, )
# ==================== Capability Management ====================
[docs] def declare_capability( self, capability_type: str, level: str, description: Optional[str] = None, constraints: Optional[Dict[str, Any]] = None, ) -> None: """ Declare an agent capability. Args: capability_type: Type of capability level: Proficiency level description: Capability description constraints: Capability constraints """ from .models import CapabilityLevel capability = AgentCapabilityDeclaration( capability_type=capability_type, level=CapabilityLevel(level), description=description, constraints=constraints or {}, ) self._capabilities[capability_type] = capability logger.info(f"Agent {self.agent_id} declared capability: {capability_type} ({level})")
[docs] def has_capability(self, capability_type: str) -> bool: """Check if agent has a capability.""" return capability_type in self._capabilities
[docs] def get_capabilities(self) -> List[AgentCapabilityDeclaration]: """Get all agent capabilities.""" return list(self._capabilities.values())
# ==================== Metrics Tracking ====================
[docs] def get_metrics(self) -> AgentMetrics: """Get agent metrics.""" return self._metrics
[docs] def update_metrics( self, execution_time: Optional[float] = None, success: bool = True, quality_score: Optional[float] = None, tokens_used: Optional[int] = None, tool_calls: Optional[int] = None, ) -> None: """ Update agent metrics. Args: execution_time: Task execution time success: Whether task succeeded quality_score: Quality score (0-1) tokens_used: Tokens used tool_calls: Number of tool calls """ self._metrics.total_tasks_executed += 1 if success: self._metrics.successful_tasks += 1 else: self._metrics.failed_tasks += 1 # Update success rate self._metrics.success_rate = self._metrics.successful_tasks / self._metrics.total_tasks_executed * 100 # Update execution time if execution_time is not None: self._metrics.total_execution_time += execution_time self._metrics.average_execution_time = self._metrics.total_execution_time / self._metrics.total_tasks_executed if self._metrics.min_execution_time is None or execution_time < self._metrics.min_execution_time: self._metrics.min_execution_time = execution_time if self._metrics.max_execution_time is None or execution_time > self._metrics.max_execution_time: self._metrics.max_execution_time = execution_time # Update quality score if quality_score is not None: if self._metrics.average_quality_score is None: self._metrics.average_quality_score = quality_score else: # Running average total_quality = self._metrics.average_quality_score * (self._metrics.total_tasks_executed - 1) self._metrics.average_quality_score = (total_quality + quality_score) / self._metrics.total_tasks_executed # Update resource usage if tokens_used is not None: self._metrics.total_tokens_used += tokens_used if tool_calls is not None: self._metrics.total_tool_calls += tool_calls self._metrics.updated_at = datetime.utcnow()
[docs] def update_cache_metrics( self, cache_read_tokens: Optional[int] = None, cache_creation_tokens: Optional[int] = None, cache_hit: Optional[bool] = None, ) -> None: """ Update prompt cache metrics from LLM response. This method tracks provider-level prompt caching statistics to monitor cache hit rates and token savings. Args: cache_read_tokens: Tokens read from cache (indicates cache hit) cache_creation_tokens: Tokens used to create a new cache entry cache_hit: Whether the request hit a cached prompt prefix Example: # After receiving LLM response agent.update_cache_metrics( cache_read_tokens=response.cache_read_tokens, cache_creation_tokens=response.cache_creation_tokens, cache_hit=response.cache_hit ) """ # Track LLM request count self._metrics.total_llm_requests += 1 # Track cache hit/miss if cache_hit is True: self._metrics.cache_hits += 1 elif cache_hit is False: self._metrics.cache_misses += 1 elif cache_read_tokens is not None and cache_read_tokens > 0: # Infer cache hit from tokens self._metrics.cache_hits += 1 elif cache_creation_tokens is not None and cache_creation_tokens > 0: # Infer cache miss from creation tokens self._metrics.cache_misses += 1 # Update cache hit rate total_cache_requests = self._metrics.cache_hits + self._metrics.cache_misses if total_cache_requests > 0: self._metrics.cache_hit_rate = self._metrics.cache_hits / total_cache_requests # Track cache tokens if cache_read_tokens is not None and cache_read_tokens > 0: self._metrics.total_cache_read_tokens += cache_read_tokens # Provider-level caching saves ~90% of token cost for cached tokens self._metrics.estimated_cache_savings_tokens += int(cache_read_tokens * 0.9) if cache_creation_tokens is not None and cache_creation_tokens > 0: self._metrics.total_cache_creation_tokens += cache_creation_tokens self._metrics.updated_at = datetime.utcnow() logger.debug(f"Agent {self.agent_id} cache metrics updated: " f"hit_rate={self._metrics.cache_hit_rate:.2%}, " f"read_tokens={cache_read_tokens}, creation_tokens={cache_creation_tokens}")
[docs] def update_session_metrics( self, session_status: str, session_duration: Optional[float] = None, session_requests: int = 0, ) -> None: """ Update session-level metrics. This method should be called when a session is created, updated, or ended to track session-level statistics in agent metrics. Args: session_status: Session status (active, completed, failed, expired) session_duration: Session duration in seconds (for ended sessions) session_requests: Number of requests in the session Example: # When creating a session agent.update_session_metrics(session_status="active") # When ending a session agent.update_session_metrics( session_status="completed", session_duration=300.5, session_requests=15 ) """ # Update session counts based on status if session_status == "active": self._metrics.total_sessions += 1 self._metrics.active_sessions += 1 elif session_status == "completed": self._metrics.completed_sessions += 1 if self._metrics.active_sessions > 0: self._metrics.active_sessions -= 1 elif session_status == "failed": self._metrics.failed_sessions += 1 if self._metrics.active_sessions > 0: self._metrics.active_sessions -= 1 elif session_status == "expired": self._metrics.expired_sessions += 1 if self._metrics.active_sessions > 0: self._metrics.active_sessions -= 1 # Update session request tracking if session_requests > 0: self._metrics.total_session_requests += session_requests # Update average session duration if session_duration is not None and session_duration > 0: completed_count = self._metrics.completed_sessions + self._metrics.failed_sessions + self._metrics.expired_sessions if completed_count > 0: if self._metrics.average_session_duration is None: self._metrics.average_session_duration = session_duration else: # Running average total_duration = self._metrics.average_session_duration * (completed_count - 1) self._metrics.average_session_duration = (total_duration + session_duration) / completed_count # Update average requests per session if self._metrics.total_sessions > 0: self._metrics.average_requests_per_session = self._metrics.total_session_requests / self._metrics.total_sessions self._metrics.updated_at = datetime.utcnow() logger.debug(f"Agent {self.agent_id} session metrics updated: " f"status={session_status}, total_sessions={self._metrics.total_sessions}, " f"active_sessions={self._metrics.active_sessions}")
# ==================== Performance Tracking ====================
[docs] def track_operation_time(self, operation_name: str) -> OperationTimer: """ Create a context manager for tracking operation time. This method returns an OperationTimer that automatically records operation duration and updates agent metrics when the operation completes. Args: operation_name: Name of the operation to track Returns: OperationTimer context manager Example: with agent.track_operation_time("llm_call") as timer: result = await llm.generate(prompt) # Metrics are automatically recorded # Access duration if needed print(f"Operation took {timer.duration} seconds") """ return OperationTimer(operation_name=operation_name, agent=self)
def _record_operation_metrics(self, operation_name: str, duration: float, success: bool = True) -> None: """ Record operation-level metrics. This method is called automatically by OperationTimer but can also be called manually to record operation metrics. Args: operation_name: Name of the operation duration: Operation duration in seconds success: Whether the operation succeeded Example: # Manual recording start = time.time() try: result = perform_operation() agent._record_operation_metrics("custom_op", time.time() - start, True) except Exception: agent._record_operation_metrics("custom_op", time.time() - start, False) raise """ # Update operation counts if operation_name not in self._metrics.operation_counts: self._metrics.operation_counts[operation_name] = 0 self._metrics.operation_total_time[operation_name] = 0.0 self._metrics.operation_error_counts[operation_name] = 0 self._metrics.operation_counts[operation_name] += 1 self._metrics.operation_total_time[operation_name] += duration if not success: self._metrics.operation_error_counts[operation_name] += 1 # Add to operation history (keep last 100 operations) operation_record = { "operation": operation_name, "duration": duration, "success": success, "timestamp": datetime.utcnow().isoformat(), } self._metrics.operation_history.append(operation_record) # Keep only last 100 operations if len(self._metrics.operation_history) > 100: self._metrics.operation_history = self._metrics.operation_history[-100:] # Recalculate percentiles self._update_operation_percentiles() self._metrics.updated_at = datetime.utcnow() logger.debug(f"Agent {self.agent_id} operation metrics recorded: " f"operation={operation_name}, duration={duration:.3f}s, success={success}") def _update_operation_percentiles(self) -> None: """Update operation time percentiles from operation history.""" if not self._metrics.operation_history: return # Extract durations from operation history durations = [op["duration"] for op in self._metrics.operation_history] # Calculate percentiles self._metrics.p50_operation_time = self._calculate_percentile(durations, 50) self._metrics.p95_operation_time = self._calculate_percentile(durations, 95) self._metrics.p99_operation_time = self._calculate_percentile(durations, 99) def _calculate_percentile(self, values: List[float], percentile: int) -> Optional[float]: """ Calculate percentile from a list of values. Args: values: List of numeric values percentile: Percentile to calculate (0-100) Returns: Percentile value or None if values is empty Example: p95 = agent._calculate_percentile([1.0, 2.0, 3.0, 4.0, 5.0], 95) """ if not values: return None sorted_values = sorted(values) index = int(len(sorted_values) * percentile / 100) # Handle edge cases if index >= len(sorted_values): index = len(sorted_values) - 1 return sorted_values[index]
[docs] def get_performance_metrics(self) -> Dict[str, Any]: """ Get comprehensive performance metrics. Returns detailed performance statistics including operation-level metrics, percentiles, and aggregated statistics. Returns: Dictionary with performance metrics Example: metrics = agent.get_performance_metrics() print(f"P95 latency: {metrics['p95_operation_time']}s") print(f"Total operations: {metrics['total_operations']}") for op_name, stats in metrics['operations'].items(): print(f"{op_name}: {stats['count']} calls, avg {stats['avg_time']:.3f}s") """ # Calculate per-operation statistics operations = {} for op_name, count in self._metrics.operation_counts.items(): total_time = self._metrics.operation_total_time.get(op_name, 0.0) error_count = self._metrics.operation_error_counts.get(op_name, 0) operations[op_name] = { "count": count, "total_time": total_time, "average_time": total_time / count if count > 0 else 0.0, "error_count": error_count, "error_rate": (error_count / count * 100) if count > 0 else 0.0, } return { "total_operations": sum(self._metrics.operation_counts.values()), "operations": operations, "p50_operation_time": self._metrics.p50_operation_time, "p95_operation_time": self._metrics.p95_operation_time, "p99_operation_time": self._metrics.p99_operation_time, "recent_operations": self._metrics.operation_history[-10:], # Last 10 operations # Prompt cache metrics "prompt_cache": { "total_llm_requests": self._metrics.total_llm_requests, "cache_hits": self._metrics.cache_hits, "cache_misses": self._metrics.cache_misses, "cache_hit_rate": self._metrics.cache_hit_rate, "cache_hit_rate_pct": f"{self._metrics.cache_hit_rate * 100:.1f}%", "total_cache_read_tokens": self._metrics.total_cache_read_tokens, "total_cache_creation_tokens": self._metrics.total_cache_creation_tokens, "estimated_cache_savings_tokens": self._metrics.estimated_cache_savings_tokens, "estimated_cache_savings_cost": self._metrics.estimated_cache_savings_cost, }, }
[docs] def get_health_status(self) -> Dict[str, Any]: """ Get agent health status with health score calculation. Calculates a health score (0-100) based on multiple factors: - Success rate (40% weight) - Error rate (30% weight) - Performance (20% weight) - Session health (10% weight) Returns: Dictionary with health status and score Example: health = agent.get_health_status() print(f"Health score: {health['health_score']}/100") print(f"Status: {health['status']}") # healthy, degraded, unhealthy if health['issues']: print(f"Issues: {', '.join(health['issues'])}") """ issues = [] health_score = 100.0 # Factor 1: Success rate (40% weight) success_rate = self._metrics.success_rate if success_rate < 50: issues.append("Low success rate") health_score -= 40 elif success_rate < 80: issues.append("Moderate success rate") health_score -= 20 elif success_rate < 95: health_score -= 10 # Factor 2: Error rate (30% weight) total_tasks = self._metrics.total_tasks_executed if total_tasks > 0: error_rate = (self._metrics.failed_tasks / total_tasks) * 100 if error_rate > 50: issues.append("High error rate") health_score -= 30 elif error_rate > 20: issues.append("Elevated error rate") health_score -= 15 elif error_rate > 5: health_score -= 5 # Factor 3: Performance (20% weight) if self._metrics.p95_operation_time is not None: # Consider p95 > 5s as slow if self._metrics.p95_operation_time > 10: issues.append("Very slow operations (p95 > 10s)") health_score -= 20 elif self._metrics.p95_operation_time > 5: issues.append("Slow operations (p95 > 5s)") health_score -= 10 # Factor 4: Session health (10% weight) if self._metrics.total_sessions > 0: session_failure_rate = (self._metrics.failed_sessions + self._metrics.expired_sessions) / self._metrics.total_sessions * 100 if session_failure_rate > 30: issues.append("High session failure rate") health_score -= 10 elif session_failure_rate > 10: health_score -= 5 # Ensure health score is in valid range health_score = max(0.0, min(100.0, health_score)) # Determine status if health_score >= 80: status = "healthy" elif health_score >= 50: status = "degraded" else: status = "unhealthy" return { "health_score": health_score, "status": status, "issues": issues, "metrics_summary": { "success_rate": success_rate, "total_tasks": total_tasks, "total_sessions": self._metrics.total_sessions, "active_sessions": self._metrics.active_sessions, "p95_operation_time": self._metrics.p95_operation_time, }, "timestamp": datetime.utcnow().isoformat(), }
[docs] def get_comprehensive_status(self) -> Dict[str, Any]: """ Get comprehensive agent status combining all metrics. Returns a complete view of agent state, health, performance, and operational metrics. Returns: Dictionary with comprehensive status information Example: status = agent.get_comprehensive_status() print(f"Agent: {status['agent_id']}") print(f"State: {status['state']}") print(f"Health: {status['health']['status']} ({status['health']['health_score']}/100)") print(f"Tasks: {status['metrics']['total_tasks_executed']}") print(f"Sessions: {status['metrics']['total_sessions']}") """ return { "agent_id": self.agent_id, "name": self.name, "type": self.agent_type.value, "version": self.version, "state": self._state.value, "health": self.get_health_status(), "performance": self.get_performance_metrics(), "metrics": { # Task metrics "total_tasks_executed": self._metrics.total_tasks_executed, "successful_tasks": self._metrics.successful_tasks, "failed_tasks": self._metrics.failed_tasks, "success_rate": self._metrics.success_rate, # Execution time metrics "average_execution_time": self._metrics.average_execution_time, "total_execution_time": self._metrics.total_execution_time, # Session metrics "total_sessions": self._metrics.total_sessions, "active_sessions": self._metrics.active_sessions, "completed_sessions": self._metrics.completed_sessions, "failed_sessions": self._metrics.failed_sessions, "expired_sessions": self._metrics.expired_sessions, # Resource usage "total_tokens_used": self._metrics.total_tokens_used, "total_tool_calls": self._metrics.total_tool_calls, # Error tracking "error_count": self._metrics.error_count, "error_types": self._metrics.error_types, # Prompt cache metrics "cache_hit_rate": self._metrics.cache_hit_rate, "cache_hits": self._metrics.cache_hits, "cache_misses": self._metrics.cache_misses, "total_cache_read_tokens": self._metrics.total_cache_read_tokens, "estimated_cache_savings_tokens": self._metrics.estimated_cache_savings_tokens, }, "capabilities": [cap.capability_type for cap in self.get_capabilities()], "active_goals": len([g for g in self._goals.values() if g.status == GoalStatus.IN_PROGRESS]), "timestamp": datetime.utcnow().isoformat(), }
[docs] def reset_metrics(self) -> None: """ Reset performance and session metrics. Resets all metrics to their initial state while preserving agent configuration and state. Example: # Reset metrics at the start of a new monitoring period agent.reset_metrics() """ self._metrics = AgentMetrics(last_reset_at=datetime.utcnow()) # type: ignore[call-arg] logger.info(f"Agent {self.agent_id} metrics reset")
# ==================== Serialization ====================
[docs] def to_dict(self) -> Dict[str, Any]: """ Serialize agent to dictionary. Includes health status and performance metrics for comprehensive agent state representation. Returns: Dictionary representation Raises: SerializationError: If serialization fails """ try: result: Dict[str, Any] = { "agent_id": self.agent_id, "name": self.name, "agent_type": self.agent_type.value, "description": self.description, "version": self.version, "state": self._state.value, "config": self._config.model_dump(), "goals": [g.model_dump() for g in self._goals.values()], "capabilities": [c.model_dump() for c in self._capabilities.values()], "metrics": self._metrics.model_dump(), "health_status": self.get_health_status(), # Phase 3 enhancement "performance_metrics": self.get_performance_metrics(), # Phase 3 enhancement "memory_summary": self.get_memory_summary(), "created_at": self.created_at.isoformat(), "updated_at": self.updated_at.isoformat(), "last_active_at": (self.last_active_at.isoformat() if self.last_active_at else None), } # Add skill state if skills are enabled (Phase 4 - Agent Skills Extension) if self._config.skills_enabled and self._attached_skills: result["attached_skills"] = [ { "name": skill.metadata.name, "version": skill.metadata.version, "description": skill.metadata.description, } for skill in self._attached_skills ] result["skill_tools"] = list(self._skill_tools.keys()) return result except Exception as e: raise SerializationError( f"Failed to serialize agent: {str(e)}", agent_id=self.agent_id, )
[docs] @classmethod def from_dict(cls, data: Dict[str, Any]) -> "BaseAIAgent": """ Deserialize agent from dictionary. Args: data: Dictionary representation Returns: Agent instance Raises: SerializationError: If deserialization fails """ raise NotImplementedError("from_dict must be implemented by subclasses")
# ==================== Checkpointer Support ====================
[docs] async def save_checkpoint(self, session_id: str, checkpoint_id: Optional[str] = None) -> Optional[str]: """ Save agent state checkpoint. This method saves the current agent state using the configured checkpointer. If no checkpointer is configured, logs a warning and returns None. Args: session_id: Session identifier for the checkpoint checkpoint_id: Optional checkpoint identifier (auto-generated if None) Returns: Checkpoint ID if saved successfully, None otherwise Example: # Save checkpoint with auto-generated ID checkpoint_id = await agent.save_checkpoint(session_id="session-123") # Save checkpoint with custom ID checkpoint_id = await agent.save_checkpoint( session_id="session-123", checkpoint_id="v1.0" ) Note: Requires a checkpointer to be configured during agent initialization. The checkpoint includes full agent state from to_dict(). """ if not self._checkpointer: logger.warning(f"Agent {self.agent_id}: No checkpointer configured, cannot save checkpoint") return None try: # Get current agent state checkpoint_data = self.to_dict() # Add checkpoint metadata checkpoint_data["checkpoint_metadata"] = { "session_id": session_id, "checkpoint_id": checkpoint_id, "saved_at": datetime.utcnow().isoformat(), "agent_version": self.version, } # Save using checkpointer saved_checkpoint_id = await self._checkpointer.save_checkpoint( agent_id=self.agent_id, session_id=session_id, checkpoint_data=checkpoint_data, ) logger.info(f"Agent {self.agent_id}: Checkpoint saved successfully " f"(session={session_id}, checkpoint={saved_checkpoint_id})") return saved_checkpoint_id except Exception as e: logger.error(f"Agent {self.agent_id}: Failed to save checkpoint " f"(session={session_id}): {e}") return None
[docs] async def load_checkpoint(self, session_id: str, checkpoint_id: Optional[str] = None) -> bool: """ Load agent state from checkpoint. This method loads agent state from a saved checkpoint using the configured checkpointer. If no checkpointer is configured, logs a warning and returns False. Args: session_id: Session identifier for the checkpoint checkpoint_id: Optional checkpoint identifier (loads latest if None) Returns: True if checkpoint loaded successfully, False otherwise Example: # Load latest checkpoint success = await agent.load_checkpoint(session_id="session-123") # Load specific checkpoint success = await agent.load_checkpoint( session_id="session-123", checkpoint_id="v1.0" ) Note: Requires a checkpointer to be configured during agent initialization. This method updates the agent's internal state from the checkpoint. Not all state may be restorable (e.g., runtime objects, connections). """ if not self._checkpointer: logger.warning(f"Agent {self.agent_id}: No checkpointer configured, cannot load checkpoint") return False try: # Load checkpoint data checkpoint_data = await self._checkpointer.load_checkpoint( agent_id=self.agent_id, session_id=session_id, checkpoint_id=checkpoint_id, ) if not checkpoint_data: logger.warning(f"Agent {self.agent_id}: No checkpoint found " f"(session={session_id}, checkpoint={checkpoint_id or 'latest'})") return False # Restore agent state from checkpoint self._restore_from_checkpoint(checkpoint_data) logger.info(f"Agent {self.agent_id}: Checkpoint loaded successfully " f"(session={session_id}, checkpoint={checkpoint_id or 'latest'})") return True except Exception as e: logger.error(f"Agent {self.agent_id}: Failed to load checkpoint " f"(session={session_id}, checkpoint={checkpoint_id or 'latest'}): {e}") return False
def _restore_from_checkpoint(self, checkpoint_data: Dict[str, Any]) -> None: """ Restore agent state from checkpoint data. This is an internal method that updates the agent's state from checkpoint data. Subclasses can override this to customize restoration logic. Args: checkpoint_data: Checkpoint data dictionary Note: This method restores basic agent state. Runtime objects like connections, file handles, etc. are not restored. """ # Restore basic state if "state" in checkpoint_data: try: self._state = AgentState(checkpoint_data["state"]) except (ValueError, KeyError): logger.warning("Could not restore state from checkpoint") # Restore metrics if "metrics" in checkpoint_data: try: self._metrics = AgentMetrics(**checkpoint_data["metrics"]) except Exception as e: logger.warning(f"Could not restore metrics from checkpoint: {e}") # Restore goals if "goals" in checkpoint_data: try: self._goals = {} for goal_data in checkpoint_data["goals"]: goal = AgentGoal(**goal_data) self._goals[goal.goal_id] = goal except Exception as e: logger.warning(f"Could not restore goals from checkpoint: {e}") # Update timestamps self.updated_at = datetime.utcnow() logger.debug(f"Agent {self.agent_id}: State restored from checkpoint") # ==================== Utility Methods ====================
[docs] def is_available(self) -> bool: """Check if agent is available for tasks.""" return self._state == AgentState.ACTIVE
[docs] def is_busy(self) -> bool: """Check if agent is currently busy.""" return self._state == AgentState.BUSY
[docs] async def execute_tool(self, tool_name: str, parameters: Dict[str, Any]) -> Any: """ Execute a single tool with given parameters. This is a default implementation that subclasses can override. For ToolAgent, this calls _execute_tool with operation from parameters. Args: tool_name: Name of the tool to execute parameters: Tool parameters (may include 'operation' key) Returns: Tool execution result """ # Check if we have tool instances if hasattr(self, "_tool_instances") and self._tool_instances: tool = self._tool_instances.get(tool_name) if tool: # Make a copy to avoid modifying the original params = parameters.copy() # Try to execute the tool directly (for custom tools with execute method) if hasattr(tool, "execute"): return await tool.execute(**params) # For standard tools with run_async elif hasattr(tool, "run_async"): # Check if operation is specified operation = params.pop("operation", None) if operation: return await tool.run_async(operation, **params) else: return await tool.run_async(**params) raise NotImplementedError(f"execute_tool not implemented for {self.__class__.__name__}. " "Tool {tool_name} not found or doesn't have execute/run_async method.")
# ==================== Parallel Tool Execution (Phase 7) ====================
[docs] async def execute_tools_parallel( self, tool_calls: List[Dict[str, Any]], max_concurrency: int = 5, ) -> List[Dict[str, Any]]: """ Execute multiple tools in parallel with concurrency limit. Args: tool_calls: List of tool call dicts with 'tool_name' and 'parameters' max_concurrency: Maximum number of concurrent tool executions Returns: List of results in same order as tool_calls Example: tool_calls = [ {"tool_name": "search", "parameters": {"query": "AI"}}, {"tool_name": "calculator", "parameters": {"expression": "2+2"}}, {"tool_name": "search", "parameters": {"query": "ML"}}, ] results = await agent.execute_tools_parallel(tool_calls, max_concurrency=2) """ if not tool_calls: return [] # Create semaphore for concurrency control semaphore = asyncio.Semaphore(max_concurrency) async def execute_with_semaphore(tool_call: Dict[str, Any], index: int): """Execute tool with semaphore.""" async with semaphore: tool_name = tool_call.get("tool_name") parameters = tool_call.get("parameters", {}) if tool_name is None: raise ValueError("tool_name is required in tool_call") try: # Execute tool (subclass should implement execute_tool) result = await self.execute_tool(tool_name, parameters) return {"index": index, "success": True, "result": result} except Exception as e: logger.error(f"Tool {tool_name} failed: {e}") return { "index": index, "success": False, "error": str(e), "tool_name": tool_name, } # Execute all tools in parallel tasks = [execute_with_semaphore(tool_call, i) for i, tool_call in enumerate(tool_calls)] results_unordered = await asyncio.gather(*tasks, return_exceptions=True) # Sort results by index to maintain order valid_results = [r for r in results_unordered if not isinstance(r, Exception) and isinstance(r, dict) and "index" in r] results_sorted = sorted( valid_results, key=lambda x: x["index"], ) # Remove index from results return [{k: v for k, v in r.items() if k != "index"} for r in results_sorted]
[docs] async def analyze_tool_dependencies(self, tool_calls: List[Dict[str, Any]]) -> Dict[str, List[str]]: """ Analyze dependencies between tool calls. Detects if one tool's output is used as input to another tool. Args: tool_calls: List of tool call dicts Returns: Dict mapping tool index to list of dependency indices Example: tool_calls = [ {"tool_name": "search", "parameters": {"query": "AI"}}, {"tool_name": "summarize", "parameters": {"text": "${0.result}"}}, ] deps = await agent.analyze_tool_dependencies(tool_calls) # deps = {"1": ["0"]} # Tool 1 depends on tool 0 """ dependencies: Dict[str, List[str]] = {} for i, tool_call in enumerate(tool_calls): deps = [] parameters = tool_call.get("parameters", {}) # Check if parameters reference other tool results param_str = json.dumps(parameters) # Look for ${index.field} patterns import re matches = re.findall(r"\$\{(\d+)\.", param_str) deps = list(set(matches)) # Remove duplicates if deps: dependencies[str(i)] = deps return dependencies
[docs] async def execute_tools_with_dependencies(self, tool_calls: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Execute tools respecting dependencies using topological sort. Args: tool_calls: List of tool call dicts Returns: List of results in same order as tool_calls Example: tool_calls = [ {"tool_name": "search", "parameters": {"query": "AI"}}, {"tool_name": "summarize", "parameters": {"text": "${0.result}"}}, ] results = await agent.execute_tools_with_dependencies(tool_calls) """ # Analyze dependencies dependencies = await self.analyze_tool_dependencies(tool_calls) # Topological sort executed: Set[int] = set() results: List[Optional[Dict[str, Any]]] = [None] * len(tool_calls) def can_execute(index: int) -> bool: """Check if tool can be executed.""" deps = dependencies.get(str(index), []) return all(int(dep) in executed for dep in deps) # Execute tools in dependency order while len(executed) < len(tool_calls): # Find tools that can be executed ready = [i for i in range(len(tool_calls)) if i not in executed and can_execute(i)] if not ready: # Circular dependency or error logger.error("Circular dependency detected or no tools ready") break # Execute ready tools in parallel ready_calls = [tool_calls[i] for i in ready] ready_results = await self.execute_tools_parallel(ready_calls) # Store results and mark as executed for i, result in zip(ready, ready_results): if result is not None: results[i] = result executed.add(i) # Substitute results in dependent tool calls for j in range(len(tool_calls)): if j not in executed: tool_calls[j] = self._substitute_tool_result(tool_calls[j], i, result) # Filter out None values and return return [r for r in results if r is not None]
def _substitute_tool_result(self, tool_call: Dict[str, Any], source_index: int, source_result: Dict[str, Any]) -> Dict[str, Any]: """ Substitute tool result references in parameters. Args: tool_call: Tool call dict source_index: Index of source tool source_result: Result from source tool Returns: Updated tool call dict """ import re param_str = json.dumps(tool_call.get("parameters", {})) # Replace ${index.field} with actual values pattern = rf"\$\{{{source_index}\.(\w+)\}}" def replacer(match): field = match.group(1) value = source_result.get(field) return json.dumps(value) if value is not None else "null" param_str = re.sub(pattern, replacer, param_str) tool_call["parameters"] = json.loads(param_str) return tool_call # ==================== Tool Result Caching (Phase 7) ==================== def _generate_cache_key(self, tool_name: str, parameters: Dict[str, Any]) -> str: """ Generate cache key for tool result. Args: tool_name: Name of the tool parameters: Tool parameters Returns: Cache key string Example: key = agent._generate_cache_key("search", {"query": "AI"}) """ # Sort parameters for consistent keys param_str = json.dumps(parameters, sort_keys=True) # Hash large inputs if self._cache_config.hash_large_inputs and len(param_str) > 1024: import hashlib param_hash = hashlib.md5(param_str.encode()).hexdigest() cache_key = f"{tool_name}:{param_hash}" else: cache_key = f"{tool_name}:{param_str}" # Include timestamp if configured if self._cache_config.include_timestamp_in_key: timestamp = int(time.time() / 60) # Minute-level granularity cache_key = f"{cache_key}:{timestamp}" return cache_key
[docs] async def execute_tool_with_cache(self, tool_name: str, parameters: Dict[str, Any]) -> Any: """ Execute tool with caching support. Args: tool_name: Name of the tool parameters: Tool parameters Returns: Tool result (from cache or fresh execution) Example: result = await agent.execute_tool_with_cache("search", {"query": "AI"}) """ if not self._cache_config.enabled: # Cache disabled, execute directly return await self.execute_tool(tool_name, parameters) # Generate cache key cache_key = self._generate_cache_key(tool_name, parameters) # Check cache if cache_key in self._tool_cache: # Check TTL cached_time = self._cache_timestamps.get(cache_key, 0) ttl = self._cache_config.get_ttl(tool_name) age = time.time() - cached_time if age < ttl: # Cache hit self._cache_access_count[cache_key] = self._cache_access_count.get(cache_key, 0) + 1 logger.debug(f"Cache hit for {tool_name} (age: {age:.1f}s)") return self._tool_cache[cache_key] else: # Cache expired logger.debug(f"Cache expired for {tool_name} (age: {age:.1f}s)") del self._tool_cache[cache_key] del self._cache_timestamps[cache_key] if cache_key in self._cache_access_count: del self._cache_access_count[cache_key] # Cache miss - execute tool logger.debug(f"Cache miss for {tool_name}") result = await self.execute_tool(tool_name, parameters) # Store in cache self._tool_cache[cache_key] = result self._cache_timestamps[cache_key] = time.time() self._cache_access_count[cache_key] = 0 # Cleanup if needed await self._cleanup_cache() return result
[docs] def invalidate_cache(self, tool_name: Optional[str] = None, pattern: Optional[str] = None) -> int: """ Invalidate cache entries. Args: tool_name: Invalidate all entries for this tool (optional) pattern: Invalidate entries matching pattern (optional) Returns: Number of entries invalidated Example: # Invalidate all search results count = agent.invalidate_cache(tool_name="search") # Invalidate all cache count = agent.invalidate_cache() """ if tool_name is None and pattern is None: # Invalidate all count = len(self._tool_cache) self._tool_cache.clear() self._cache_timestamps.clear() self._cache_access_count.clear() logger.info(f"Invalidated all cache ({count} entries)") return count # Invalidate matching entries keys_to_delete = [] for key in list(self._tool_cache.keys()): if tool_name and key.startswith(f"{tool_name}:"): keys_to_delete.append(key) elif pattern and pattern in key: keys_to_delete.append(key) for key in keys_to_delete: del self._tool_cache[key] del self._cache_timestamps[key] if key in self._cache_access_count: del self._cache_access_count[key] logger.info(f"Invalidated {len(keys_to_delete)} cache entries") return len(keys_to_delete)
[docs] def get_cache_stats(self) -> Dict[str, Any]: """ Get cache statistics. Returns: Dictionary with cache statistics Example: stats = agent.get_cache_stats() print(f"Cache size: {stats['size']}") print(f"Hit rate: {stats['hit_rate']:.1%}") """ total_entries = len(self._tool_cache) total_accesses = sum(self._cache_access_count.values()) # Calculate hit rate (approximate) cache_hits = sum(count for count in self._cache_access_count.values() if count > 0) hit_rate = cache_hits / total_accesses if total_accesses > 0 else 0.0 # Calculate memory usage (approximate) import sys memory_bytes = sum(sys.getsizeof(v) for v in self._tool_cache.values()) memory_mb = memory_bytes / (1024 * 1024) # Per-tool stats tool_stats = {} for key in self._tool_cache.keys(): tool_name = key.split(":")[0] if tool_name not in tool_stats: tool_stats[tool_name] = {"count": 0, "accesses": 0} tool_stats[tool_name]["count"] += 1 tool_stats[tool_name]["accesses"] += self._cache_access_count.get(key, 0) return { "enabled": self._cache_config.enabled, "size": total_entries, "max_size": self._cache_config.max_cache_size, "memory_mb": memory_mb, "max_memory_mb": self._cache_config.max_memory_mb, "total_accesses": total_accesses, "hit_rate": hit_rate, "tool_stats": tool_stats, }
async def _cleanup_cache(self) -> None: """ Cleanup cache based on size and memory limits. Removes least recently used entries when limits are exceeded. """ # Check if cleanup needed current_time = time.time() if current_time - self._last_cleanup_time < self._cache_config.cleanup_interval: return self._last_cleanup_time = current_time # Check size limit if len(self._tool_cache) > self._cache_config.max_cache_size * self._cache_config.cleanup_threshold: # Remove oldest entries entries_to_remove = int(len(self._tool_cache) - self._cache_config.max_cache_size * 0.8) # Sort by timestamp (oldest first) sorted_keys = sorted(self._cache_timestamps.items(), key=lambda x: x[1]) for key, _ in sorted_keys[:entries_to_remove]: del self._tool_cache[key] del self._cache_timestamps[key] if key in self._cache_access_count: del self._cache_access_count[key] logger.debug(f"Cleaned up {entries_to_remove} cache entries (size limit)") # ==================== Image Helpers ==================== @staticmethod def _merge_task_images(task: Dict[str, Any], context: Dict[str, Any]) -> None: """Merge images from *task* dict into *context*, normalising to a flat list. Images may be provided in two places: * ``task["images"]`` – attached directly to the task payload. * ``context["images"]`` – passed by the caller as part of the execution context (e.g. a previous multimodal message). Both sources are merged into ``context["images"]`` so that ``_build_initial_messages`` (and equivalent methods in sub-classes) has a single, canonical location to read images from. Scalar values are promoted to single-element lists so that downstream code can always iterate safely. """ task_images = task.get("images") if not task_images: return task_list: List[Any] = task_images if isinstance(task_images, list) else [task_images] existing = context.get("images") if existing is None: context["images"] = task_list else: existing_list: List[Any] = existing if isinstance(existing, list) else [existing] context["images"] = existing_list + task_list # ==================== Streaming Support (Phase 7 - Tasks 1.15.11-1.15.12) ====================
[docs] async def execute_task_streaming(self, task: Dict[str, Any], context: Dict[str, Any]) -> AsyncGenerator[Dict[str, Any], None]: """ Execute a task with streaming results. This method streams task execution events as they occur, including: - Status updates (started, thinking, acting, completed) - LLM tokens (for agents with LLM clients) - Tool calls and results (for agents with tools) - Final result Args: task: Task specification context: Execution context Yields: Dict[str, Any]: Event dictionaries with 'type' and event-specific data Event types: - 'status': Status update (e.g., started, thinking, completed) - 'token': LLM token (for streaming text generation) - 'tool_call': Tool execution started - 'tool_result': Tool execution completed - 'result': Final task result - 'error': Error occurred Example: ```python async for event in agent.execute_task_streaming(task, context): if event['type'] == 'token': print(event['content'], end='', flush=True) elif event['type'] == 'tool_call': print(f"\\nCalling tool: {event['tool_name']}") elif event['type'] == 'tool_result': print(f"Tool result: {event['result']}") elif event['type'] == 'result': print(f"\\nFinal result: {event['output']}") ``` Note: Subclasses should override this method to provide streaming support. Default implementation falls back to non-streaming execute_task. """ # Default implementation: execute task and yield result yield {"type": "status", "status": "started", "timestamp": datetime.utcnow().isoformat()} try: result = await self.execute_task(task, context) yield {"type": "result", **result} except Exception as e: yield { "type": "error", "error": str(e), "timestamp": datetime.utcnow().isoformat(), } raise
[docs] async def process_message_streaming(self, message: str, sender_id: Optional[str] = None) -> AsyncGenerator[str, None]: """ Process a message with streaming response. This method streams the response text as it's generated, providing a better user experience for long responses. Args: message: Message content sender_id: Optional sender identifier Yields: str: Response text tokens/chunks Example: ```python async for token in agent.process_message_streaming("Hello!"): print(token, end='', flush=True) ``` Note: Subclasses should override this method to provide streaming support. Default implementation falls back to non-streaming process_message. """ # Default implementation: process message and yield result try: result = await self.process_message(message, sender_id) response = result.get("response", "") yield response except Exception as e: logger.error(f"Streaming message processing failed: {e}") raise
# ==================== Agent Collaboration (Phase 7 - Tasks 1.15.15-1.15.22) ====================
[docs] async def delegate_task( self, task: Dict[str, Any], required_capabilities: Optional[List[str]] = None, target_agent_id: Optional[str] = None, ) -> Dict[str, Any]: """ Delegate a task to another capable agent. Args: task: Task specification to delegate required_capabilities: Required capabilities for the task target_agent_id: Specific agent to delegate to (if None, finds capable agent) Returns: Task execution result from delegated agent Raises: ValueError: If collaboration not enabled or no capable agent found Example: ```python # Delegate to specific agent result = await agent.delegate_task( task={"description": "Search for AI papers"}, target_agent_id="search_agent" ) # Delegate to any capable agent result = await agent.delegate_task( task={"description": "Analyze data"}, required_capabilities=["data_analysis", "statistics"] ) ``` """ if not self._collaboration_enabled: raise ValueError("Agent collaboration is not enabled") # Find target agent if target_agent_id: target_agent = self._agent_registry.get(target_agent_id) if not target_agent: raise ValueError(f"Agent {target_agent_id} not found in registry") elif required_capabilities: capable_agents = await self.find_capable_agents(required_capabilities) if not capable_agents: raise ValueError(f"No capable agents found for capabilities: {required_capabilities}") target_agent = capable_agents[0] # Use first capable agent else: raise ValueError("Either target_agent_id or required_capabilities must be provided") logger.info(f"Agent {self.agent_id} delegating task to {target_agent.agent_id}") # Delegate task try: result = await target_agent.execute_task(task, context={"delegated_by": self.agent_id}) logger.info(f"Task delegation successful: {self.agent_id} -> {target_agent.agent_id}") return cast(Dict[str, Any], result) except Exception as e: logger.error(f"Task delegation failed: {e}") raise
[docs] async def find_capable_agents(self, required_capabilities: List[str]) -> List[Any]: """ Find agents with required capabilities. Args: required_capabilities: List of required capability names Returns: List of agents that have all required capabilities Example: ```python agents = await agent.find_capable_agents(["search", "summarize"]) for capable_agent in agents: print(f"Found: {capable_agent.name}") ``` """ if not self._collaboration_enabled: return [] capable_agents = [] for agent_id, agent in self._agent_registry.items(): # Skip self if agent_id == self.agent_id: continue # Check if agent has all required capabilities agent_capabilities = getattr(agent, "capabilities", []) if all(cap in agent_capabilities for cap in required_capabilities): capable_agents.append(agent) logger.debug(f"Found {len(capable_agents)} capable agents for {required_capabilities}") return capable_agents
[docs] async def request_peer_review( self, task: Dict[str, Any], result: Dict[str, Any], reviewer_id: Optional[str] = None, ) -> Dict[str, Any]: """ Request peer review of a task result. Args: task: Original task specification result: Task execution result to review reviewer_id: Specific reviewer agent ID (if None, selects automatically) Returns: Review result with 'approved' (bool), 'feedback' (str), 'reviewer_id' (str) Example: ```python result = await agent.execute_task(task, context) review = await agent.request_peer_review(task, result) if review['approved']: print(f"Approved: {review['feedback']}") else: print(f"Needs revision: {review['feedback']}") ``` """ if not self._collaboration_enabled: raise ValueError("Agent collaboration is not enabled") # Find reviewer if reviewer_id: reviewer = self._agent_registry.get(reviewer_id) if not reviewer: raise ValueError(f"Reviewer {reviewer_id} not found in registry") else: # Select first available agent (excluding self) available_reviewers = [agent for agent_id, agent in self._agent_registry.items() if agent_id != self.agent_id] if not available_reviewers: raise ValueError("No reviewers available") reviewer = available_reviewers[0] logger.info(f"Agent {self.agent_id} requesting review from {reviewer.agent_id}") # Request review try: if hasattr(reviewer, "review_result"): review = await reviewer.review_result(task, result) else: # Fallback: use execute_task with review prompt task_desc = task.get("description", "") task_result = result.get("output", "") review_task = { "description": (f"Review this task result:\nTask: {task_desc}\nResult: {task_result}"), "task_id": f"review_{task.get('task_id', 'unknown')}", } review_result = await reviewer.execute_task(review_task, context={}) review = { "approved": True, # Assume approved if no explicit review method "feedback": review_result.get("output", ""), "reviewer_id": reviewer.agent_id, } logger.info(f"Review received from {reviewer.agent_id}") return cast(Dict[str, Any], review) except Exception as e: logger.error(f"Peer review failed: {e}") raise
[docs] async def collaborate_on_task( self, task: Dict[str, Any], collaborator_ids: List[str], strategy: str = "parallel", ) -> Dict[str, Any]: """ Collaborate with other agents on a task. Args: task: Task specification collaborator_ids: List of agent IDs to collaborate with strategy: Collaboration strategy - 'parallel', 'sequential', or 'consensus' Returns: Aggregated result based on strategy Strategies: - parallel: All agents work simultaneously, results aggregated - sequential: Agents work in order, each building on previous results - consensus: All agents work independently, best result selected by voting Example: ```python # Parallel collaboration result = await agent.collaborate_on_task( task={"description": "Analyze market trends"}, collaborator_ids=["analyst1", "analyst2", "analyst3"], strategy="parallel" ) # Sequential collaboration (pipeline) result = await agent.collaborate_on_task( task={"description": "Research and summarize"}, collaborator_ids=["researcher", "summarizer"], strategy="sequential" ) # Consensus collaboration result = await agent.collaborate_on_task( task={"description": "Make recommendation"}, collaborator_ids=["expert1", "expert2", "expert3"], strategy="consensus" ) ``` """ if not self._collaboration_enabled: raise ValueError("Agent collaboration is not enabled") # Get collaborator agents collaborators = [] for agent_id in collaborator_ids: agent = self._agent_registry.get(agent_id) if not agent: logger.warning(f"Collaborator {agent_id} not found, skipping") continue collaborators.append(agent) if not collaborators: raise ValueError("No valid collaborators found") logger.info(f"Agent {self.agent_id} collaborating with {len(collaborators)} agents " f"using {strategy} strategy") # Execute based on strategy if strategy == "parallel": return await self._collaborate_parallel(task, collaborators) elif strategy == "sequential": return await self._collaborate_sequential(task, collaborators) elif strategy == "consensus": return await self._collaborate_consensus(task, collaborators) else: raise ValueError(f"Unknown collaboration strategy: {strategy}")
async def _collaborate_parallel(self, task: Dict[str, Any], collaborators: List[Any]) -> Dict[str, Any]: """ Parallel collaboration: all agents work simultaneously. Args: task: Task specification collaborators: List of collaborator agents Returns: Aggregated result """ # Execute task on all agents in parallel tasks = [agent.execute_task(task, context={"collaboration": "parallel"}) for agent in collaborators] results = await asyncio.gather(*tasks, return_exceptions=True) # Aggregate results return await self._aggregate_results(task, results, collaborators) async def _collaborate_sequential(self, task: Dict[str, Any], collaborators: List[Any]) -> Dict[str, Any]: """ Sequential collaboration: agents work in order, building on previous results. Args: task: Task specification collaborators: List of collaborator agents (in execution order) Returns: Final result from last agent """ current_task = task.copy() results = [] for i, agent in enumerate(collaborators): logger.debug(f"Sequential step {i + 1}/{len(collaborators)}: {agent.agent_id}") # Execute task result = await agent.execute_task(current_task, context={"collaboration": "sequential", "step": i + 1}) results.append(result) # Update task for next agent with previous result if i < len(collaborators) - 1: current_task = { "description": f"{task.get('description')}\n\nPrevious result: {result.get('output')}", "task_id": f"{task.get('task_id', 'unknown')}_step_{i + 2}", } # Return final result return { "success": True, "output": results[-1].get("output") if results else "", "collaboration_strategy": "sequential", "steps": len(results), "all_results": results, "timestamp": datetime.utcnow().isoformat(), } async def _collaborate_consensus(self, task: Dict[str, Any], collaborators: List[Any]) -> Dict[str, Any]: """ Consensus collaboration: all agents work independently, best result selected. Args: task: Task specification collaborators: List of collaborator agents Returns: Best result selected by consensus """ # Execute task on all agents in parallel tasks = [agent.execute_task(task, context={"collaboration": "consensus"}) for agent in collaborators] results = await asyncio.gather(*tasks, return_exceptions=True) # Select best result by consensus return await self._select_consensus_result(task, results, collaborators) async def _aggregate_results(self, task: Dict[str, Any], results: List[Any], collaborators: List[Any]) -> Dict[str, Any]: """ Aggregate results from parallel collaboration. Args: task: Original task results: List of results from collaborators collaborators: List of collaborator agents Returns: Aggregated result """ successful_results = [] errors = [] for i, result in enumerate(results): if isinstance(result, Exception): errors.append({"agent": collaborators[i].agent_id, "error": str(result)}) else: successful_results.append({"agent": collaborators[i].agent_id, "result": result}) # Combine outputs combined_output = "\n\n".join([f"[{r['agent']}]: {r['result'].get('output', '')}" for r in successful_results]) return { "success": len(successful_results) > 0, "output": combined_output, "collaboration_strategy": "parallel", "successful_agents": len(successful_results), "failed_agents": len(errors), "results": successful_results, "errors": errors if errors else None, "timestamp": datetime.utcnow().isoformat(), } async def _select_consensus_result(self, task: Dict[str, Any], results: List[Any], collaborators: List[Any]) -> Dict[str, Any]: """ Select best result by consensus voting. Args: task: Original task results: List of results from collaborators collaborators: List of collaborator agents Returns: Best result selected by consensus """ successful_results = [] for i, result in enumerate(results): if not isinstance(result, Exception): successful_results.append({"agent": collaborators[i].agent_id, "result": result, "votes": 0}) if not successful_results: return { "success": False, "output": "All collaborators failed", "collaboration_strategy": "consensus", "timestamp": datetime.utcnow().isoformat(), } # Simple voting: each agent votes for best result (excluding their own) # In a real implementation, this could use LLM to evaluate quality for voter_idx, voter_result in enumerate(successful_results): # For now, use simple heuristic: longest output is "best" # In production, use LLM-based evaluation best_idx = max( range(len(successful_results)), key=lambda i: (len(successful_results[i]["result"].get("output", "")) if i != voter_idx else 0), ) successful_results[best_idx]["votes"] += 1 # Select result with most votes best_result = max(successful_results, key=lambda r: r["votes"]) return { "success": True, "output": best_result["result"].get("output", ""), "collaboration_strategy": "consensus", "selected_agent": best_result["agent"], "votes": best_result["votes"], "total_agents": len(successful_results), "all_results": successful_results, "timestamp": datetime.utcnow().isoformat(), } # ==================== Smart Context Management (Phase 8 - Tasks 1.16.1-1.16.3) ====================
[docs] async def get_relevant_context( self, query: str, context_items: List[Dict[str, Any]], max_items: Optional[int] = None, min_relevance_score: float = 0.5, ) -> List[Dict[str, Any]]: """ Get relevant context items using semantic search and relevance scoring. This method filters and ranks context items based on their relevance to the query, helping agents stay within token limits while maintaining the most important context. Args: query: Query or task description to match against context_items: List of context items (dicts with 'content' field) max_items: Maximum number of items to return (None = no limit) min_relevance_score: Minimum relevance score (0.0-1.0) Returns: List of relevant context items, sorted by relevance (highest first) Example: ```python context_items = [ {"content": "User prefers concise answers", "type": "preference"}, {"content": "Previous task: data analysis", "type": "history"}, {"content": "System configuration: prod", "type": "config"}, ] relevant = await agent.get_relevant_context( query="Analyze sales data", context_items=context_items, max_items=2, min_relevance_score=0.6 ) # Returns top 2 most relevant items with score >= 0.6 ``` """ if not context_items: return [] # Score all items scored_items = [] for item in context_items: score = await self.score_context_relevance(query, item) if score >= min_relevance_score: scored_items.append({**item, "_relevance_score": score}) # Sort by relevance (highest first) scored_items.sort(key=lambda x: x["_relevance_score"], reverse=True) # Limit number of items if max_items is not None: scored_items = scored_items[:max_items] logger.debug(f"Selected {len(scored_items)}/{len(context_items)} relevant context items " f"(min_score={min_relevance_score})") return scored_items
[docs] async def score_context_relevance(self, query: str, context_item: Dict[str, Any]) -> float: """ Score the relevance of a context item to a query. Uses multiple signals to determine relevance: - Keyword overlap (basic) - Semantic similarity (if LLM client with embeddings available) - Recency (if timestamp available) - Type priority (if type specified) Args: query: Query or task description context_item: Context item to score (dict with 'content' field) Returns: Relevance score between 0.0 (not relevant) and 1.0 (highly relevant) Example: ```python score = await agent.score_context_relevance( query="Analyze sales data", context_item={"content": "Previous analysis results", "type": "history"} ) print(f"Relevance: {score:.2f}") ``` """ content = context_item.get("content", "") if not content: return 0.0 # Convert to lowercase for comparison query_lower = query.lower() content_lower = content.lower() # 1. Keyword overlap score (0.0-0.5) query_words = set(query_lower.split()) content_words = set(content_lower.split()) if not query_words: keyword_score = 0.0 else: overlap = len(query_words & content_words) keyword_score = min(0.5, (overlap / len(query_words)) * 0.5) # 2. Semantic similarity score (0.0-0.3) # If LLM client with embeddings is available, use it semantic_score = 0.0 if self._llm_client and hasattr(self._llm_client, "get_embeddings"): try: embeddings = await self._llm_client.get_embeddings([query, content]) if len(embeddings) == 2: # Calculate cosine similarity import math vec1, vec2 = embeddings[0], embeddings[1] dot_product = sum(a * b for a, b in zip(vec1, vec2)) mag1 = math.sqrt(sum(a * a for a in vec1)) mag2 = math.sqrt(sum(b * b for b in vec2)) if mag1 > 0 and mag2 > 0: similarity = dot_product / (mag1 * mag2) semantic_score = max(0.0, similarity) * 0.3 except Exception as e: logger.debug(f"Semantic similarity calculation failed: {e}") # 3. Recency score (0.0-0.1) recency_score = 0.0 if "timestamp" in context_item: try: from datetime import datetime timestamp = context_item["timestamp"] if isinstance(timestamp, str): timestamp = datetime.fromisoformat(timestamp.replace("Z", "+00:00")) age_seconds = (datetime.utcnow() - timestamp).total_seconds() # Decay over 24 hours recency_score = max(0.0, 0.1 * (1.0 - min(1.0, age_seconds / 86400))) except Exception as e: logger.debug(f"Recency calculation failed: {e}") # 4. Type priority score (0.0-0.1) type_score = 0.0 item_type = context_item.get("type", "") priority_types = {"preference": 0.1, "constraint": 0.1, "requirement": 0.09} type_score = priority_types.get(item_type, 0.05) # Combine scores total_score = keyword_score + semantic_score + recency_score + type_score return min(1.0, total_score)
[docs] async def prune_context( self, context_items: List[Dict[str, Any]], max_tokens: int, query: Optional[str] = None, preserve_types: Optional[List[str]] = None, ) -> List[Dict[str, Any]]: """ Prune context items to fit within token limit. Uses relevance scoring to keep the most important context while staying within token limits. Optionally preserves certain types of context regardless of relevance. Args: context_items: List of context items to prune max_tokens: Maximum total tokens allowed query: Optional query for relevance scoring preserve_types: Optional list of types to always preserve Returns: Pruned list of context items that fit within token limit Example: ```python pruned = await agent.prune_context( context_items=all_context, max_tokens=2000, query="Analyze data", preserve_types=["constraint", "requirement"] ) print(f"Pruned from {len(all_context)} to {len(pruned)} items") ``` """ if not context_items: return [] preserve_types = preserve_types or [] # Separate preserved and regular items preserved_items = [] regular_items = [] for item in context_items: if item.get("type") in preserve_types: preserved_items.append(item) else: regular_items.append(item) # Score regular items if query provided if query and regular_items: scored_items = [] for item in regular_items: score = await self.score_context_relevance(query, item) scored_items.append({**item, "_relevance_score": score}) # Sort by relevance scored_items.sort(key=lambda x: x["_relevance_score"], reverse=True) regular_items = scored_items # Estimate tokens (rough approximation: 1 token ≈ 4 characters) def estimate_tokens(item: Dict[str, Any]) -> int: content = str(item.get("content", "")) return len(content) // 4 # Add preserved items first result = [] current_tokens = 0 for item in preserved_items: item_tokens = estimate_tokens(item) if current_tokens + item_tokens <= max_tokens: result.append(item) current_tokens += item_tokens else: logger.warning(f"Preserved item exceeds token limit, skipping: {item.get('type')}") # Add regular items until token limit for item in regular_items: item_tokens = estimate_tokens(item) if current_tokens + item_tokens <= max_tokens: result.append(item) current_tokens += item_tokens else: break logger.info(f"Pruned context from {len(context_items)} to {len(result)} items " f"({current_tokens}/{max_tokens} tokens)") return result
# ==================== Agent Learning (Phase 8 - Tasks 1.16.4-1.16.10) ====================
[docs] async def record_experience( self, task: Dict[str, Any], result: Dict[str, Any], approach: str, tools_used: Optional[List[str]] = None, ) -> None: """ Record an experience for learning and adaptation. Args: task: Task specification result: Task execution result approach: Approach/strategy used tools_used: List of tools used (if any) Example: ```python await agent.record_experience( task={"description": "Analyze data", "type": "analysis"}, result={"success": True, "execution_time": 5.2}, approach="statistical_analysis", tools_used=["pandas", "numpy"] ) ``` """ if not self._learning_enabled: return from .models import Experience # Classify task task_type = await self._classify_task(task) # Create experience record experience = Experience( # type: ignore[call-arg] agent_id=self.agent_id, task_type=task_type, task_description=task.get("description", ""), task_complexity=task.get("complexity"), approach=approach, tools_used=tools_used or [], execution_time=result.get("execution_time", 0.0), success=result.get("success", False), quality_score=result.get("quality_score"), error_type=result.get("error_type"), error_message=result.get("error"), context_size=result.get("context_size"), iterations=result.get("iterations"), metadata={"task_id": task.get("task_id")}, ) # Add to experiences self._experiences.append(experience) # Limit stored experiences if len(self._experiences) > self._max_experiences: self._experiences = self._experiences[-self._max_experiences :] logger.debug(f"Recorded experience: {task_type} - " f"{'success' if experience.success else 'failure'} " f"({experience.execution_time:.2f}s)")
[docs] async def get_learning_insights(self) -> Dict[str, Any]: """ Get learning insights and analytics. Provides analytics about agent learning including success rates, common patterns, and areas for improvement. Returns: Dict with learning insights and statistics Example: ```python insights = await agent.get_learning_insights() print(f"Total experiences: {insights['total_experiences']}") print(f"Success rate: {insights['overall_success_rate']:.2%}") print(f"Most common task: {insights['most_common_task_type']}") ``` """ if not self._learning_enabled or not self._experiences: return { "total_experiences": 0, "learning_enabled": self._learning_enabled, } total = len(self._experiences) successful = sum(1 for exp in self._experiences if exp.success) failed = total - successful # Task type distribution task_types: Dict[str, int] = {} for exp in self._experiences: task_types[exp.task_type] = task_types.get(exp.task_type, 0) + 1 # Approach effectiveness approach_success: Dict[str, Dict[str, int]] = {} for exp in self._experiences: if exp.approach not in approach_success: approach_success[exp.approach] = {"success": 0, "failure": 0} if exp.success: approach_success[exp.approach]["success"] += 1 else: approach_success[exp.approach]["failure"] += 1 # Calculate success rates approach_rates = {approach: stats["success"] / (stats["success"] + stats["failure"]) for approach, stats in approach_success.items()} # Error patterns error_types: Dict[str, int] = {} for exp in self._experiences: if not exp.success and exp.error_type: error_types[exp.error_type] = error_types.get(exp.error_type, 0) + 1 return { "total_experiences": total, "successful_experiences": successful, "failed_experiences": failed, "overall_success_rate": successful / total if total > 0 else 0.0, "task_type_distribution": task_types, "most_common_task_type": (max(task_types.items(), key=lambda x: x[1])[0] if task_types else None), "approach_effectiveness": approach_rates, "best_approach": (max(approach_rates.items(), key=lambda x: x[1])[0] if approach_rates else None), "error_patterns": error_types, "most_common_error": (max(error_types.items(), key=lambda x: x[1])[0] if error_types else None), "learning_enabled": self._learning_enabled, }
[docs] async def adapt_strategy(self, task: Dict[str, Any]) -> Dict[str, Any]: """ Adapt strategy based on learning insights. Analyzes past experiences to suggest strategy adaptations for the current task. Args: task: Task specification Returns: Dict with strategy adaptations and recommendations Example: ```python adaptations = await agent.adapt_strategy( task={"description": "Complex analysis", "type": "analysis"} ) print(f"Recommended approach: {adaptations['recommended_approach']}") print(f"Suggested tools: {adaptations['suggested_tools']}") ``` """ if not self._learning_enabled: return {"adapted": False, "reason": "Learning not enabled"} # Get recommended approach recommendation = await self.get_recommended_approach(task) if not recommendation: return { "adapted": False, "reason": "No relevant experiences found", } # Classify task task_type = await self._classify_task(task) # Find similar successful experiences similar_successful = [exp for exp in self._experiences if exp.task_type == task_type and exp.success] # Analyze tool usage patterns tool_usage: Dict[str, int] = {} for exp in similar_successful: for tool in exp.tools_used: tool_usage[tool] = tool_usage.get(tool, 0) + 1 # Get most commonly used tools suggested_tools = sorted(tool_usage.items(), key=lambda x: x[1], reverse=True)[:5] # Top 5 tools return { "adapted": True, "recommended_approach": recommendation["approach"], "confidence": recommendation["confidence"], "reasoning": recommendation["reasoning"], "suggested_tools": [tool for tool, _ in suggested_tools], "tool_usage_stats": dict(suggested_tools), "based_on_experiences": len(similar_successful), }
async def _classify_task(self, task: Dict[str, Any]) -> str: """ Classify task into a type/category. Uses simple heuristics to classify tasks. Can be overridden by subclasses for more sophisticated classification. Args: task: Task specification Returns: Task type string Example: ```python task_type = await agent._classify_task( {"description": "Analyze sales data"} ) # Returns: "analysis" ``` """ # Check explicit type if "type" in task: return str(task["type"]) # Simple keyword-based classification description = task.get("description", "").lower() if any(word in description for word in ["analyze", "analysis", "examine"]): return "analysis" elif any(word in description for word in ["search", "find", "lookup"]): return "search" elif any(word in description for word in ["create", "generate", "write"]): return "generation" elif any(word in description for word in ["summarize", "summary"]): return "summarization" elif any(word in description for word in ["calculate", "compute"]): return "calculation" elif any(word in description for word in ["translate", "convert"]): return "translation" else: return "general" # ==================== Resource Management (Phase 8 - Tasks 1.16.11-1.16.17) ====================
[docs] async def check_resource_availability(self) -> Dict[str, Any]: """ Check if resources are available for task execution. Checks against configured resource limits including: - Concurrent task limits - Token rate limits - Tool call rate limits Returns: Dict with 'available' (bool) and details about resource status Example: ```python status = await agent.check_resource_availability() if status['available']: await agent.execute_task(task, context) else: print(f"Resources unavailable: {status['reason']}") ``` """ if not self._resource_limits.enforce_limits: return {"available": True, "reason": "Limits not enforced"} # Check concurrent task limit if len(self._active_tasks) >= self._resource_limits.max_concurrent_tasks: return { "available": False, "reason": "Concurrent task limit reached", "active_tasks": len(self._active_tasks), "max_tasks": self._resource_limits.max_concurrent_tasks, } # Check token rate limits token_check = await self._check_token_rate_limit() if not token_check["available"]: return token_check # Check tool call rate limits tool_check = await self._check_tool_call_rate_limit() if not tool_check["available"]: return tool_check return { "available": True, "active_tasks": len(self._active_tasks), "max_tasks": self._resource_limits.max_concurrent_tasks, }
[docs] async def wait_for_resources(self, timeout: Optional[float] = None) -> bool: """ Wait for resources to become available. Args: timeout: Maximum time to wait in seconds (uses resource_wait_timeout_seconds if None) Returns: True if resources became available, False if timeout Example: ```python if await agent.wait_for_resources(timeout=30): await agent.execute_task(task, context) else: print("Timeout waiting for resources") ``` """ if timeout is None: timeout = self._resource_limits.resource_wait_timeout_seconds start_time = time.time() check_interval = 0.5 # Check every 500ms while time.time() - start_time < timeout: status = await self.check_resource_availability() if status["available"]: return True # Wait before next check await asyncio.sleep(check_interval) logger.warning(f"Timeout waiting for resources after {timeout}s") return False
[docs] async def get_resource_usage(self) -> Dict[str, Any]: """ Get current resource usage statistics. Returns: Dict with resource usage information Example: ```python usage = await agent.get_resource_usage() print(f"Active tasks: {usage['active_tasks']}") print(f"Tokens/min: {usage['tokens_per_minute']}") print(f"Tool calls/min: {usage['tool_calls_per_minute']}") ``` """ current_time = time.time() # Calculate token usage rates tokens_last_minute = sum(count for ts, count in self._token_usage_window if current_time - ts < 60) tokens_last_hour = sum(count for ts, count in self._token_usage_window if current_time - ts < 3600) # Calculate tool call rates tool_calls_last_minute = sum(1 for ts in self._tool_call_window if current_time - ts < 60) tool_calls_last_hour = sum(1 for ts in self._tool_call_window if current_time - ts < 3600) return { "active_tasks": len(self._active_tasks), "max_concurrent_tasks": self._resource_limits.max_concurrent_tasks, "task_utilization": len(self._active_tasks) / self._resource_limits.max_concurrent_tasks, "tokens_per_minute": tokens_last_minute, "tokens_per_hour": tokens_last_hour, "max_tokens_per_minute": self._resource_limits.max_tokens_per_minute, "max_tokens_per_hour": self._resource_limits.max_tokens_per_hour, "tool_calls_per_minute": tool_calls_last_minute, "tool_calls_per_hour": tool_calls_last_hour, "max_tool_calls_per_minute": self._resource_limits.max_tool_calls_per_minute, "max_tool_calls_per_hour": self._resource_limits.max_tool_calls_per_hour, "limits_enforced": self._resource_limits.enforce_limits, }
async def _check_token_rate_limit(self) -> Dict[str, Any]: """ Check token rate limits. Returns: Dict with 'available' (bool) and limit details """ if not self._resource_limits.enforce_limits: return {"available": True} current_time = time.time() # Clean old entries (older than 1 hour) self._token_usage_window = [(ts, count) for ts, count in self._token_usage_window if current_time - ts < 3600] # Check per-minute limit if self._resource_limits.max_tokens_per_minute is not None: tokens_last_minute = sum(count for ts, count in self._token_usage_window if current_time - ts < 60) if tokens_last_minute >= self._resource_limits.max_tokens_per_minute: return { "available": False, "reason": "Token rate limit (per minute) reached", "tokens_used": tokens_last_minute, "limit": self._resource_limits.max_tokens_per_minute, "window": "minute", } # Check per-hour limit if self._resource_limits.max_tokens_per_hour is not None: tokens_last_hour = sum(count for ts, count in self._token_usage_window) if tokens_last_hour >= self._resource_limits.max_tokens_per_hour: return { "available": False, "reason": "Token rate limit (per hour) reached", "tokens_used": tokens_last_hour, "limit": self._resource_limits.max_tokens_per_hour, "window": "hour", } return {"available": True} async def _check_tool_call_rate_limit(self) -> Dict[str, Any]: """ Check tool call rate limits. Returns: Dict with 'available' (bool) and limit details """ if not self._resource_limits.enforce_limits: return {"available": True} current_time = time.time() # Clean old entries (older than 1 hour) self._tool_call_window = [ts for ts in self._tool_call_window if current_time - ts < 3600] # Check per-minute limit if self._resource_limits.max_tool_calls_per_minute is not None: calls_last_minute = sum(1 for ts in self._tool_call_window if current_time - ts < 60) if calls_last_minute >= self._resource_limits.max_tool_calls_per_minute: return { "available": False, "reason": "Tool call rate limit (per minute) reached", "calls_made": calls_last_minute, "limit": self._resource_limits.max_tool_calls_per_minute, "window": "minute", } # Check per-hour limit if self._resource_limits.max_tool_calls_per_hour is not None: calls_last_hour = len(self._tool_call_window) if calls_last_hour >= self._resource_limits.max_tool_calls_per_hour: return { "available": False, "reason": "Tool call rate limit (per hour) reached", "calls_made": calls_last_hour, "limit": self._resource_limits.max_tool_calls_per_hour, "window": "hour", } return {"available": True} # ==================== Error Recovery (Phase 8 - Tasks 1.16.18-1.16.22) ====================
[docs] async def execute_with_recovery( self, task: Dict[str, Any], context: Dict[str, Any], strategies: Optional[List[str]] = None, ) -> Dict[str, Any]: """ Execute task with advanced error recovery strategies. Tries multiple recovery strategies in sequence until one succeeds: 1. Retry with exponential backoff 2. Simplify task and retry 3. Use fallback approach 4. Delegate to another agent Args: task: Task specification context: Execution context strategies: List of strategy names to try (uses default chain if None) Returns: Task execution result Raises: TaskExecutionError: If all recovery strategies fail Example: ```python result = await agent.execute_with_recovery( task={"description": "Complex analysis"}, context={}, strategies=["retry", "simplify", "delegate"] ) ``` """ from .models import RecoveryStrategy from .exceptions import TaskExecutionError # Default strategy chain if strategies is None: strategies = [ RecoveryStrategy.RETRY, RecoveryStrategy.SIMPLIFY, RecoveryStrategy.FALLBACK, RecoveryStrategy.DELEGATE, ] errors = [] for strategy in strategies: try: logger.info(f"Attempting recovery strategy: {strategy}") if strategy == RecoveryStrategy.RETRY: # Retry with exponential backoff (using existing retry mechanism) result = await self._execute_with_retry(self.execute_task, task, context) logger.info(f"Recovery successful with strategy: {strategy}") return cast(Dict[str, Any], result) elif strategy == RecoveryStrategy.SIMPLIFY: # Simplify task and retry simplified_task = await self._simplify_task(task) result = await self.execute_task(simplified_task, context) logger.info(f"Recovery successful with strategy: {strategy}") return result elif strategy == RecoveryStrategy.FALLBACK: # Use fallback approach result = await self._execute_with_fallback(task, context) logger.info(f"Recovery successful with strategy: {strategy}") return result elif strategy == RecoveryStrategy.DELEGATE: # Delegate to another agent if self._collaboration_enabled: result = await self._delegate_to_capable_agent(task, context) logger.info(f"Recovery successful with strategy: {strategy}") return result else: logger.warning("Delegation not available (collaboration disabled)") continue except Exception as e: logger.warning(f"Recovery strategy {strategy} failed: {e}") errors.append({"strategy": strategy, "error": str(e)}) continue # All strategies failed error_summary = "; ".join([f"{e['strategy']}: {e['error']}" for e in errors]) raise TaskExecutionError( f"All recovery strategies failed. Errors: {error_summary}", agent_id=self.agent_id, task_id=task.get("task_id"), )
async def _simplify_task(self, task: Dict[str, Any]) -> Dict[str, Any]: """ Simplify a task to make it easier to execute. Strategies: - Reduce complexity by breaking into smaller parts - Remove optional requirements - Use simpler language Args: task: Original task specification Returns: Simplified task specification Example: ```python simplified = await agent._simplify_task( {"description": "Perform comprehensive analysis with visualizations"} ) # Returns: {"description": "Perform basic analysis"} ``` """ description = task.get("description", "") # Simple heuristics for simplification simplified_description = description # Remove complexity keywords complexity_words = [ "comprehensive", "detailed", "thorough", "extensive", "in-depth", "complete", "full", "exhaustive", ] for word in complexity_words: simplified_description = simplified_description.replace(word, "basic") # Remove optional requirements optional_phrases = [ "with visualizations", "with charts", "with graphs", "with examples", "with details", "with explanations", ] for phrase in optional_phrases: simplified_description = simplified_description.replace(phrase, "") # Clean up extra spaces simplified_description = " ".join(simplified_description.split()) simplified_task = task.copy() simplified_task["description"] = simplified_description simplified_task["simplified"] = True simplified_task["original_description"] = description logger.debug(f"Simplified task: '{description}' -> '{simplified_description}'") return simplified_task async def _execute_with_fallback(self, task: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: """ Execute task with fallback approach. Uses a simpler, more reliable approach when the primary approach fails. Args: task: Task specification context: Execution context Returns: Task execution result Example: ```python result = await agent._execute_with_fallback(task, context) ``` """ # Create fallback task with reduced requirements fallback_task = task.copy() fallback_task["fallback_mode"] = True # Reduce max_tokens if specified if "max_tokens" in context: context = context.copy() context["max_tokens"] = min(context["max_tokens"], 1000) # Reduce temperature for more deterministic output if "temperature" in context: context = context.copy() context["temperature"] = 0.3 logger.info("Executing with fallback approach (reduced requirements)") # Execute with modified parameters result = await self.execute_task(fallback_task, context) result["fallback_used"] = True return result async def _delegate_to_capable_agent(self, task: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: """ Delegate task to a capable agent as recovery strategy. Finds an agent capable of handling the task and delegates to it. Args: task: Task specification context: Execution context Returns: Task execution result from delegated agent Raises: ValueError: If no capable agent found Example: ```python result = await agent._delegate_to_capable_agent(task, context) ``` """ if not self._collaboration_enabled: raise ValueError("Collaboration not enabled, cannot delegate") # Try to classify task and find capable agents task_type = await self._classify_task(task) # Look for agents with matching capabilities capable_agents = [] for agent_id, agent in self._agent_registry.items(): if agent_id == self.agent_id: continue # Skip self # Check if agent has relevant capabilities agent_capabilities = getattr(agent, "capabilities", []) if task_type in agent_capabilities or "general" in agent_capabilities: capable_agents.append(agent) if not capable_agents: # Try any available agent as last resort capable_agents = [agent for agent_id, agent in self._agent_registry.items() if agent_id != self.agent_id] if not capable_agents: raise ValueError("No capable agents available for delegation") # Delegate to first capable agent target_agent = capable_agents[0] logger.info(f"Delegating task to {target_agent.agent_id} for recovery") result: Dict[str, Any] = cast(Dict[str, Any], await target_agent.execute_task(task, context={**context, "delegated_by": self.agent_id, "recovery_delegation": True})) result["delegated_to"] = target_agent.agent_id result["recovery_delegation"] = True return result
[docs] def __str__(self) -> str: """String representation.""" return f"Agent({self.agent_id}, {self.name}, {self.agent_type.value}, {self._state.value})"
[docs] def __repr__(self) -> str: """Detailed representation.""" return f"BaseAIAgent(agent_id='{self.agent_id}', name='{self.name}', " f"type='{self.agent_type.value}', state='{self._state.value}')"