Domain API

This section documents the domain layer components.

Agent

Base Agent

Base AI Agent

Abstract base class for all AI agents in the AIECS system.

class aiecs.domain.agent.base_agent.OperationTimer[source]

Bases: object

Context manager for timing operations and tracking metrics.

Automatically records operation duration and can be used to track operation-level performance metrics.

Example

with agent.track_operation_time(“llm_call”) as timer:

result = llm.generate(prompt)

# timer.duration contains the elapsed time in seconds

__init__(operation_name, agent=None)[source]

Initialize operation timer.

Parameters:
  • operation_name (str) – Name of the operation being timed

  • agent (BaseAIAgent | None) – Optional agent instance for automatic metrics recording

__enter__()[source]

Start timing.

Return type:

OperationTimer

__exit__(exc_type, exc_val, exc_tb)[source]

Stop timing and record metrics.

Parameters:
  • exc_type – Exception type if an error occurred

  • exc_val – Exception value if an error occurred

  • exc_tb – Exception traceback if an error occurred

Returns:

False to propagate exceptions

Return type:

None

get_duration_ms()[source]

Get duration in milliseconds.

Return type:

float | None

class aiecs.domain.agent.base_agent.CacheConfig[source]

Bases: object

Configuration for tool result caching.

Provides control over caching behavior to improve performance and reduce costs by avoiding redundant tool executions. Supports TTL-based expiration, size limits, and automatic cleanup.

Key Features: - TTL-based cache expiration (default and per-tool) - Size limits to prevent memory exhaustion - Automatic cleanup when capacity threshold reached - Configurable cache key generation - Input hashing for large parameters

enabled

Enable/disable caching globally

Type:

bool

default_ttl

Default time-to-live in seconds for cached entries (default: 300 = 5 minutes)

Type:

int

tool_specific_ttl

Dictionary mapping tool names to custom TTL values (overrides default_ttl)

Type:

Dict[str, int] | None

max_cache_size

Maximum number of cached entries before cleanup (default: 1000)

Type:

int

max_memory_mb

Maximum cache memory usage in MB (approximate, default: 100)

Type:

int

cleanup_interval

Interval in seconds between cleanup checks (default: 60)

Type:

int

cleanup_threshold

Capacity threshold (0.0-1.0) to trigger cleanup (default: 0.9 = 90%)

Type:

float

include_timestamp_in_key

Whether to include timestamp in cache key (default: False)

Type:

bool

hash_large_inputs

Whether to hash inputs larger than 1KB for cache keys (default: True)

Type:

bool

Examples

# Example 1: Basic caching configuration config = CacheConfig(

enabled=True, default_ttl=300, # 5 minutes max_cache_size=1000

)

# Example 2: Per-tool TTL overrides config = CacheConfig(

enabled=True, default_ttl=300, tool_specific_ttl={

“search”: 600, # Search results cached for 10 minutes “calculator”: 3600, # Calculator results cached for 1 hour “weather”: 1800 # Weather data cached for 30 minutes

}

)

# Example 3: Aggressive caching for expensive operations config = CacheConfig(

enabled=True, default_ttl=3600, # 1 hour default max_cache_size=5000, max_memory_mb=500, cleanup_threshold=0.95 # Cleanup at 95% capacity

)

# Example 4: Disable caching for time-sensitive tools config = CacheConfig(

enabled=False # Disable caching entirely

)

# Example 5: Cache with timestamp-aware keys config = CacheConfig(

enabled=True, default_ttl=300, include_timestamp_in_key=True # Include timestamp for time-sensitive caching

)

enabled: bool = True
default_ttl: int = 300
tool_specific_ttl: Dict[str, int] | None = None
max_cache_size: int = 1000
max_memory_mb: int = 100
cleanup_interval: int = 60
cleanup_threshold: float = 0.9
include_timestamp_in_key: bool = False
hash_large_inputs: bool = True
__post_init__()[source]

Initialize defaults.

get_ttl(tool_name)[source]

Get TTL for a specific tool.

Parameters:

tool_name (str) – Name of the tool

Returns:

TTL in seconds

Return type:

int

__init__(enabled=True, default_ttl=300, tool_specific_ttl=None, max_cache_size=1000, max_memory_mb=100, cleanup_interval=60, cleanup_threshold=0.9, include_timestamp_in_key=False, hash_large_inputs=True)
Parameters:
  • enabled (bool)

  • default_ttl (int)

  • tool_specific_ttl (Dict[str, int] | None)

  • max_cache_size (int)

  • max_memory_mb (int)

  • cleanup_interval (int)

  • cleanup_threshold (float)

  • include_timestamp_in_key (bool)

  • hash_large_inputs (bool)

Return type:

None

class aiecs.domain.agent.base_agent.BaseAIAgent[source]

Bases: SkillCapableMixin, ABC

Abstract base class for AI agents.

Provides common functionality for agent lifecycle management, state management, memory, goals, and metrics tracking.

This base class supports extensive flexibility and advanced features:

Tool Flexibility: - Accept tool names (List[str]) for backward compatibility - Accept pre-configured tool instances (Dict[str, BaseTool]) with preserved state - Automatic tool loading and validation

LLM Client Flexibility: - Accept any object implementing LLMClientProtocol (duck typing) - No requirement for BaseLLMClient inheritance - Custom LLM client wrappers fully supported

Advanced Features: - ContextEngine integration for persistent conversation history - Custom config managers for dynamic configuration - Checkpointers for state persistence (LangGraph compatible) - Agent collaboration (delegation, peer review, consensus) - Agent learning from experiences - Resource management (rate limiting, quotas) - Performance tracking and health monitoring - Tool result caching - Parallel tool execution - Streaming responses - Error recovery strategies

Examples

# Example 1: Basic agent with tool names (backward compatible) agent = HybridAgent(

agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools=[“search”, “calculator”] # Tool names loaded by subclass

)

# Example 2: Agent with tool instances (preserves tool state) from aiecs.tools import BaseTool

class StatefulSearchTool(BaseTool):
def __init__(self, api_key: str):

self.api_key = api_key self.call_count = 0 # State preserved

async def run_async(self, query: str):

self.call_count += 1 return f”Search results for: {query}”

agent = HybridAgent(

agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools={

“search”: StatefulSearchTool(api_key=”…”), “calculator”: CalculatorTool()

}, llm_client=OpenAIClient()

) # Tool state (call_count) is preserved across agent operations

# Example 3: Agent with custom LLM client (no BaseLLMClient inheritance) class CustomLLMClient:

provider_name = “custom”

async def generate_text(self, messages, **kwargs):

# Custom implementation return LLMResponse(content=”…”, provider=”custom”)

async def stream_text(self, messages, **kwargs):
async for token in self._custom_stream():

yield token

async def close(self):

# Cleanup pass

agent = LLMAgent(

agent_id=”agent1”, name=”My LLM Agent”, agent_type=AgentType.CONVERSATIONAL, config=config, llm_client=CustomLLMClient() # Works without BaseLLMClient!

)

# Example 4: Agent with ContextEngine for persistent storage from aiecs.domain.context import ContextEngine

context_engine = ContextEngine() await context_engine.initialize()

agent = HybridAgent(

agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools=[“search”], llm_client=OpenAIClient(), context_engine=context_engine # Enables persistent conversation history

) # Conversation history persists across agent restarts

# Example 5: Agent with custom config manager class DatabaseConfigManager:

async def get_config(self, key: str):

# Load from database return await db.get_config(key)

async def update_config(self, key: str, value: Any):

# Update in database await db.update_config(key, value)

agent = HybridAgent(

agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools=[“search”], llm_client=OpenAIClient(), config_manager=DatabaseConfigManager() # Dynamic config loading

)

# Example 6: Agent with checkpointer for LangGraph integration class RedisCheckpointer:

async def save(self, agent_id: str, state: Dict[str, Any]):

await redis.set(f”checkpoint:{agent_id}”, json.dumps(state))

async def load(self, agent_id: str) -> Optional[Dict[str, Any]]:

data = await redis.get(f”checkpoint:{agent_id}”) return json.loads(data) if data else None

agent = HybridAgent(

agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools=[“search”], llm_client=OpenAIClient(), checkpointer=RedisCheckpointer() # LangGraph-compatible checkpointing

)

# Example 7: Agent with collaboration features agent_registry = {

“agent2”: other_agent_instance, “agent3”: another_agent_instance

}

agent = HybridAgent(

agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools=[“search”], llm_client=OpenAIClient(), collaboration_enabled=True, agent_registry=agent_registry # Enable delegation and peer review

)

# Delegate task to another agent result = await agent.delegate_task(

task_description=”Analyze this data”, target_agent_id=”agent2”

)

# Example 8: Agent with learning enabled from aiecs.domain.agent.models import ResourceLimits

agent = HybridAgent(

agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools=[“search”], llm_client=OpenAIClient(), learning_enabled=True # Learn from past experiences

)

# Record experience await agent.record_experience(

task_type=”data_analysis”, approach=”parallel_tools”, success=True, execution_time=2.5

)

# Get recommended approach based on history approach = await agent.get_recommended_approach(“data_analysis”) print(f”Recommended: {approach}”)

# Example 9: Agent with resource limits from aiecs.domain.agent.models import ResourceLimits

resource_limits = ResourceLimits(

max_concurrent_tasks=5, max_tokens_per_minute=10000, max_tool_calls_per_minute=100

)

agent = HybridAgent(

agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools=[“search”], llm_client=OpenAIClient(), resource_limits=resource_limits # Rate limiting and quotas

)

# Check resource availability before executing if await agent.check_resource_availability():

result = await agent.execute_task(task, context)

else:

await agent.wait_for_resources(timeout=30.0)

# Example 10: Agent with performance tracking agent = HybridAgent(

agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools=[“search”], llm_client=OpenAIClient()

)

# Track operation performance with agent.track_operation_time(“data_processing”):

result = await agent.execute_task(task, context)

# Get performance metrics metrics = agent.get_performance_metrics() print(f”Average response time: {metrics[‘avg_response_time’]}s”) print(f”P95 response time: {metrics[‘p95_response_time’]}s”)

# Get health status health = agent.get_health_status() print(f”Health score: {health[‘score’]}”) print(f”Status: {health[‘status’]}”)

# Example 11: Agent with tool caching agent = HybridAgent(

agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools=[“search”], llm_client=OpenAIClient()

)

# Execute tool with caching (30 second TTL) result1 = await agent.execute_tool_with_cache(

tool_name=”search”, operation=”query”, parameters={“q”: “AI”}, cache_ttl=30

)

# Second call uses cache (no API call) result2 = await agent.execute_tool_with_cache(

tool_name=”search”, operation=”query”, parameters={“q”: “AI”}, cache_ttl=30

)

# Get cache statistics stats = agent.get_cache_stats() print(f”Cache hit rate: {stats[‘hit_rate’]:.1%}”)

# Example 12: Agent with parallel tool execution agent = HybridAgent(

agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools=[“search”, “calculator”, “translator”], llm_client=OpenAIClient()

)

# Execute multiple independent tools in parallel (3-5x faster) results = await agent.execute_tools_parallel([

{“tool”: “search”, “operation”: “query”, “parameters”: {“q”: “AI”}}, {“tool”: “calculator”, “operation”: “add”, “parameters”: {“a”: 1, “b”: 2}}, {“tool”: “translator”, “operation”: “translate”, “parameters”: {“text”: “Hello”}}

], max_concurrency=3)

# Example 13: Agent with streaming responses agent = HybridAgent(

agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools=[“search”], llm_client=OpenAIClient()

)

# Stream task execution (tokens + tool calls) async for event in agent.execute_task_streaming(task, context):

if event[‘type’] == ‘token’:

print(event[‘content’], end=’’, flush=True)

elif event[‘type’] == ‘tool_call’:

print(f”nCalling {event[‘tool_name’]}…”)

elif event[‘type’] == ‘result’:

print(f”nFinal result: {event[‘output’]}”)

# Example 14: Agent with error recovery agent = HybridAgent(

agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools=[“search”], llm_client=OpenAIClient()

)

# Execute with automatic recovery strategies result = await agent.execute_with_recovery(

task=task, context=context, strategies=[“retry”, “simplify”, “fallback”, “delegate”]

) # Automatically tries retry → simplify → fallback → delegate if errors occur

__init__(agent_id, name, agent_type, config, description=None, version='1.0.0', tools=None, llm_client=None, config_manager=None, checkpointer=None, context_engine=None, collaboration_enabled=False, agent_registry=None, learning_enabled=False, resource_limits=None, skill_script_registry=None, skill_registry=None, plugin_registry=None)[source]

Initialize the base agent.

Parameters:
  • agent_id (str) – Unique identifier for the agent

  • name (str) – Agent name

  • agent_type (AgentType) – Type of agent

  • config (AgentConfiguration) – Agent configuration

  • description (str | None) – Optional agent description

  • version (str) – Agent version

  • tools (List[str] | Dict[str, BaseTool] | None) – Optional tools - either list of tool names or dict of tool instances. List[str]: Tool names to be loaded by subclass Dict[str, BaseTool]: Pre-configured tool instances with state

  • llm_client (LLMClientProtocol | None) – Optional LLM client (any object implementing LLMClientProtocol). Supports custom LLM clients without BaseLLMClient inheritance.

  • config_manager (ConfigManagerProtocol | None) – Optional configuration manager for dynamic config loading

  • checkpointer (CheckpointerProtocol | None) – Optional checkpointer for state persistence (LangGraph compatible)

  • context_engine (ContextEngine | None) – Optional ContextEngine instance for persistent conversation history

  • collaboration_enabled (bool) – Enable agent collaboration features (delegation, peer review)

  • agent_registry (Dict[str, Any] | None) – Registry of other agents for collaboration (agent_id -> agent instance)

  • learning_enabled (bool) – Enable agent learning from experiences

  • resource_limits (Any | None) – Optional resource limits configuration and session management. If provided, enables persistent storage across agent restarts.

  • skill_script_registry (SkillScriptRegistry | None) – Optional SkillScriptRegistry for managing tools from skill scripts. If provided, enables dynamic tool registration via add_tool(), remove_tool(), etc.

  • skill_registry (SkillRegistry | None) – Optional SkillRegistry for loading skills by name. If provided along with config.skills_enabled=True, enables skill support.

  • plugin_registry (PluginRegistry | None) – Optional PluginRegistry for builtin and extension plugins. Defaults to PluginRegistry.default() when omitted.

Example

# With tool instances and ContextEngine from aiecs.domain.context import ContextEngine

context_engine = ContextEngine() await context_engine.initialize()

agent = HybridAgent(

agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools={

“search”: SearchTool(api_key=”…”), “calculator”: CalculatorTool()

}, llm_client=CustomLLMClient(), # Custom client, no inheritance needed config_manager=DatabaseConfigManager(), checkpointer=RedisCheckpointer(), context_engine=context_engine # Enables persistent storage

)

# With tool names (backward compatible) agent = HybridAgent(

agent_id=”agent1”, name=”My Agent”, agent_type=AgentType.HYBRID, config=config, tools=[“search”, “calculator”] # Loaded by subclass

)

property state: AgentState

Get current agent state.

get_state()[source]

Get current agent state.

Return type:

AgentState

async initialize()[source]

Initialize the agent.

This method should be called before the agent can be used. Override in subclasses to add initialization logic.

Raises:

AgentInitializationError – If initialization fails

Return type:

None

async reload_plugins()[source]

Explicit plugin reload: shutdown → re-derive config → new manager → initialize (§9.7).

Rejected while the agent is BUSY or _current_task_id is set. Does not run automatically on configuration changes mid-task.

Return type:

PluginLoadResult

async activate()[source]

Activate the agent.

Return type:

None

async deactivate()[source]

Deactivate the agent (enter idle state).

Return type:

None

async shutdown()[source]

Shutdown the agent.

Override in subclasses to add cleanup logic.

Return type:

None

add_tool(tool, replace=False)[source]

Add a tool from a skill script to the agent’s registry.

This method registers a lightweight Tool instance (from skill scripts) to the agent’s SkillScriptRegistry. This is separate from BaseTool instances managed via _tool_instances.

Parameters:
  • tool (Tool) – Tool instance to register

  • replace (bool) – If True, replace existing tool with same name

Raises:
  • RuntimeError – If no SkillScriptRegistry is configured

  • SkillScriptRegistryError – If tool already exists and replace=False

Return type:

None

Example

tool = Tool(

name=”my-tool”, description=”A custom tool”, execute=my_async_function

) agent.add_tool(tool)

has_tool(tool_name)[source]

Check if a tool exists in the skill script registry.

Parameters:

tool_name (str) – Name of the tool to check

Returns:

True if tool exists, False otherwise

Return type:

bool

remove_tool(tool_name)[source]

Remove a tool from the skill script registry.

Parameters:

tool_name (str) – Name of the tool to remove

Returns:

True if tool was removed, False if not found

Raises:

RuntimeError – If no SkillScriptRegistry is configured

Return type:

bool

get_tool(tool_name)[source]

Get a tool from the skill script registry by name.

Parameters:

tool_name (str) – Name of the tool to retrieve

Returns:

Tool instance if found, None otherwise

Return type:

Tool | None

list_skill_tools(tags=None, source=None)[source]

List tools from the skill script registry.

Parameters:
  • tags (List[str] | None) – Optional list of tags to filter by (tools must have all tags)

  • source (str | None) – Optional source to filter by (e.g., skill name)

Returns:

List of matching Tool instances

Return type:

List[Tool]

property skill_script_registry: SkillScriptRegistry | None

Get the skill script registry, if configured.

get_config_manager()[source]

Get the configuration manager.

Returns:

Configuration manager instance, or None if not configured

Return type:

ConfigManagerProtocol | None

abstract async execute_task(task, context)[source]

Execute a task.

Parameters:
Returns:

Task execution result

Raises:

TaskExecutionError – If task execution fails

Return type:

Dict[str, Any]

Note

Subclasses can use _execute_with_retry() to wrap task execution with automatic retry logic based on agent configuration.

abstract async process_message(message, sender_id=None)[source]

Process an incoming message.

Parameters:
  • message (str) – Message content

  • sender_id (str | None) – Optional sender identifier

Returns:

Response dictionary

Return type:

Dict[str, Any]

Note

Subclasses can use _execute_with_retry() to wrap message processing with automatic retry logic based on agent configuration.

async add_to_memory(key, value, memory_type=MemoryType.SHORT_TERM, metadata=None)[source]

Add an item to agent memory.

Parameters:
  • key (str) – Memory key

  • value (Any) – Memory value

  • memory_type (MemoryType) – Type of memory (short_term or long_term)

  • metadata (Dict[str, Any] | None) – Optional metadata

Return type:

None

async retrieve_memory(key, default=None)[source]

Retrieve an item from memory.

Parameters:
  • key (str) – Memory key

  • default (Any) – Default value if key not found

Returns:

Memory value or default

Return type:

Any

async clear_memory(memory_type=None)[source]

Clear agent memory.

Parameters:

memory_type (MemoryType | None) – If specified, clear only this type of memory

Return type:

None

get_memory_summary()[source]

Get a summary of agent memory.

Return type:

Dict[str, Any]

set_goal(description, priority=GoalPriority.MEDIUM, success_criteria=None, deadline=None)[source]

Set a new goal for the agent.

Parameters:
  • description (str) – Goal description

  • priority (GoalPriority) – Goal priority

  • success_criteria (str | None) – Success criteria

  • deadline (datetime | None) – Goal deadline

Returns:

Goal ID

Return type:

str

get_goals(status=None)[source]

Get agent goals.

Parameters:

status (GoalStatus | None) – Filter by status (optional)

Returns:

List of goals

Return type:

List[AgentGoal]

get_goal(goal_id)[source]

Get a specific goal by ID.

Parameters:

goal_id (str)

Return type:

AgentGoal | None

update_goal_status(goal_id, status, progress=None)[source]

Update goal status.

Parameters:
  • goal_id (str) – Goal ID

  • status (GoalStatus) – New status

  • progress (float | None) – Optional progress percentage

Return type:

None

get_config()[source]

Get agent configuration.

Return type:

AgentConfiguration

update_config(updates)[source]

Update agent configuration.

Parameters:

updates (Dict[str, Any]) – Configuration updates

Raises:

ConfigurationError – If configuration is invalid

Return type:

None

declare_capability(capability_type, level, description=None, constraints=None)[source]

Declare an agent capability.

Parameters:
  • capability_type (str) – Type of capability

  • level (str) – Proficiency level

  • description (str | None) – Capability description

  • constraints (Dict[str, Any] | None) – Capability constraints

Return type:

None

has_capability(capability_type)[source]

Check if agent has a capability.

Parameters:

capability_type (str)

Return type:

bool

get_capabilities()[source]

Get all agent capabilities.

Return type:

List[AgentCapabilityDeclaration]

get_metrics()[source]

Get agent metrics.

Return type:

AgentMetrics

update_metrics(execution_time=None, success=True, quality_score=None, tokens_used=None, tool_calls=None)[source]

Update agent metrics.

Parameters:
  • execution_time (float | None) – Task execution time

  • success (bool) – Whether task succeeded

  • quality_score (float | None) – Quality score (0-1)

  • tokens_used (int | None) – Tokens used

  • tool_calls (int | None) – Number of tool calls

Return type:

None

update_cache_metrics(cache_read_tokens=None, cache_creation_tokens=None, cache_hit=None)[source]

Update prompt cache metrics from LLM response.

This method tracks provider-level prompt caching statistics to monitor cache hit rates and token savings.

Parameters:
  • cache_read_tokens (int | None) – Tokens read from cache (indicates cache hit)

  • cache_creation_tokens (int | None) – Tokens used to create a new cache entry

  • cache_hit (bool | None) – Whether the request hit a cached prompt prefix

Return type:

None

Example

# After receiving LLM response agent.update_cache_metrics(

cache_read_tokens=response.cache_read_tokens, cache_creation_tokens=response.cache_creation_tokens, cache_hit=response.cache_hit

)

update_session_metrics(session_status, session_duration=None, session_requests=0)[source]

Update session-level metrics.

This method should be called when a session is created, updated, or ended to track session-level statistics in agent metrics.

Parameters:
  • session_status (str) – Session status (active, completed, failed, expired)

  • session_duration (float | None) – Session duration in seconds (for ended sessions)

  • session_requests (int) – Number of requests in the session

Return type:

None

Example

# When creating a session agent.update_session_metrics(session_status=”active”)

# When ending a session agent.update_session_metrics(

session_status=”completed”, session_duration=300.5, session_requests=15

)

track_operation_time(operation_name)[source]

Create a context manager for tracking operation time.

This method returns an OperationTimer that automatically records operation duration and updates agent metrics when the operation completes.

Parameters:

operation_name (str) – Name of the operation to track

Returns:

OperationTimer context manager

Return type:

OperationTimer

Example

with agent.track_operation_time(“llm_call”) as timer:

result = await llm.generate(prompt)

# Metrics are automatically recorded

# Access duration if needed print(f”Operation took {timer.duration} seconds”)

get_performance_metrics()[source]

Get comprehensive performance metrics.

Returns detailed performance statistics including operation-level metrics, percentiles, and aggregated statistics.

Returns:

Dictionary with performance metrics

Return type:

Dict[str, Any]

Example

metrics = agent.get_performance_metrics() print(f”P95 latency: {metrics[‘p95_operation_time’]}s”) print(f”Total operations: {metrics[‘total_operations’]}”) for op_name, stats in metrics[‘operations’].items():

print(f”{op_name}: {stats[‘count’]} calls, avg {stats[‘avg_time’]:.3f}s”)

get_health_status()[source]

Get agent health status with health score calculation.

Calculates a health score (0-100) based on multiple factors: - Success rate (40% weight) - Error rate (30% weight) - Performance (20% weight) - Session health (10% weight)

Returns:

Dictionary with health status and score

Return type:

Dict[str, Any]

Example

health = agent.get_health_status() print(f”Health score: {health[‘health_score’]}/100”) print(f”Status: {health[‘status’]}”) # healthy, degraded, unhealthy if health[‘issues’]:

print(f”Issues: {’, ‘.join(health[‘issues’])}”)

get_comprehensive_status()[source]

Get comprehensive agent status combining all metrics.

Returns a complete view of agent state, health, performance, and operational metrics.

Returns:

Dictionary with comprehensive status information

Return type:

Dict[str, Any]

Example

status = agent.get_comprehensive_status() print(f”Agent: {status[‘agent_id’]}”) print(f”State: {status[‘state’]}”) print(f”Health: {status[‘health’][‘status’]} ({status[‘health’][‘health_score’]}/100)”) print(f”Tasks: {status[‘metrics’][‘total_tasks_executed’]}”) print(f”Sessions: {status[‘metrics’][‘total_sessions’]}”)

reset_metrics()[source]

Reset performance and session metrics.

Resets all metrics to their initial state while preserving agent configuration and state.

Example

# Reset metrics at the start of a new monitoring period agent.reset_metrics()

Return type:

None

to_dict()[source]

Serialize agent to dictionary.

Includes health status and performance metrics for comprehensive agent state representation.

Returns:

Dictionary representation

Raises:

SerializationError – If serialization fails

Return type:

Dict[str, Any]

classmethod from_dict(data)[source]

Deserialize agent from dictionary.

Parameters:

data (Dict[str, Any]) – Dictionary representation

Returns:

Agent instance

Raises:

SerializationError – If deserialization fails

Return type:

BaseAIAgent

async save_checkpoint(session_id, checkpoint_id=None)[source]

Save agent state checkpoint.

This method saves the current agent state using the configured checkpointer. If no checkpointer is configured, logs a warning and returns None.

Parameters:
  • session_id (str) – Session identifier for the checkpoint

  • checkpoint_id (str | None) – Optional checkpoint identifier (auto-generated if None)

Returns:

Checkpoint ID if saved successfully, None otherwise

Return type:

str | None

Example

# Save checkpoint with auto-generated ID checkpoint_id = await agent.save_checkpoint(session_id=”session-123”)

# Save checkpoint with custom ID checkpoint_id = await agent.save_checkpoint(

session_id=”session-123”, checkpoint_id=”v1.0”

)

Note

Requires a checkpointer to be configured during agent initialization. The checkpoint includes full agent state from to_dict().

async load_checkpoint(session_id, checkpoint_id=None)[source]

Load agent state from checkpoint.

This method loads agent state from a saved checkpoint using the configured checkpointer. If no checkpointer is configured, logs a warning and returns False.

Parameters:
  • session_id (str) – Session identifier for the checkpoint

  • checkpoint_id (str | None) – Optional checkpoint identifier (loads latest if None)

Returns:

True if checkpoint loaded successfully, False otherwise

Return type:

bool

Example

# Load latest checkpoint success = await agent.load_checkpoint(session_id=”session-123”)

# Load specific checkpoint success = await agent.load_checkpoint(

session_id=”session-123”, checkpoint_id=”v1.0”

)

Note

Requires a checkpointer to be configured during agent initialization. This method updates the agent’s internal state from the checkpoint. Not all state may be restorable (e.g., runtime objects, connections).

is_available()[source]

Check if agent is available for tasks.

Return type:

bool

is_busy()[source]

Check if agent is currently busy.

Return type:

bool

async execute_tool(tool_name, parameters)[source]

Execute a single tool with given parameters.

This is a default implementation that subclasses can override. For ToolAgent, this calls _execute_tool with operation from parameters.

Parameters:
  • tool_name (str) – Name of the tool to execute

  • parameters (Dict[str, Any]) – Tool parameters (may include ‘operation’ key)

Returns:

Tool execution result

Return type:

Any

async execute_tools_parallel(tool_calls, max_concurrency=5)[source]

Execute multiple tools in parallel with concurrency limit.

Parameters:
  • tool_calls (List[Dict[str, Any]]) – List of tool call dicts with ‘tool_name’ and ‘parameters’

  • max_concurrency (int) – Maximum number of concurrent tool executions

Returns:

List of results in same order as tool_calls

Return type:

List[Dict[str, Any]]

Example

tool_calls = [

{“tool_name”: “search”, “parameters”: {“query”: “AI”}}, {“tool_name”: “calculator”, “parameters”: {“expression”: “2+2”}}, {“tool_name”: “search”, “parameters”: {“query”: “ML”}},

] results = await agent.execute_tools_parallel(tool_calls, max_concurrency=2)

async analyze_tool_dependencies(tool_calls)[source]

Analyze dependencies between tool calls.

Detects if one tool’s output is used as input to another tool.

Parameters:

tool_calls (List[Dict[str, Any]]) – List of tool call dicts

Returns:

Dict mapping tool index to list of dependency indices

Return type:

Dict[str, List[str]]

Example

tool_calls = [

{“tool_name”: “search”, “parameters”: {“query”: “AI”}}, {“tool_name”: “summarize”, “parameters”: {“text”: “${0.result}”}},

] deps = await agent.analyze_tool_dependencies(tool_calls) # deps = {“1”: [“0”]} # Tool 1 depends on tool 0

async execute_tools_with_dependencies(tool_calls)[source]

Execute tools respecting dependencies using topological sort.

Parameters:

tool_calls (List[Dict[str, Any]]) – List of tool call dicts

Returns:

List of results in same order as tool_calls

Return type:

List[Dict[str, Any]]

Example

tool_calls = [

{“tool_name”: “search”, “parameters”: {“query”: “AI”}}, {“tool_name”: “summarize”, “parameters”: {“text”: “${0.result}”}},

] results = await agent.execute_tools_with_dependencies(tool_calls)

async execute_tool_with_cache(tool_name, parameters)[source]

Execute tool with caching support.

Parameters:
  • tool_name (str) – Name of the tool

  • parameters (Dict[str, Any]) – Tool parameters

Returns:

Tool result (from cache or fresh execution)

Return type:

Any

Example

result = await agent.execute_tool_with_cache(“search”, {“query”: “AI”})

invalidate_cache(tool_name=None, pattern=None)[source]

Invalidate cache entries.

Parameters:
  • tool_name (str | None) – Invalidate all entries for this tool (optional)

  • pattern (str | None) – Invalidate entries matching pattern (optional)

Returns:

Number of entries invalidated

Return type:

int

Example

# Invalidate all search results count = agent.invalidate_cache(tool_name=”search”)

# Invalidate all cache count = agent.invalidate_cache()

get_cache_stats()[source]

Get cache statistics.

Returns:

Dictionary with cache statistics

Return type:

Dict[str, Any]

Example

stats = agent.get_cache_stats() print(f”Cache size: {stats[‘size’]}”) print(f”Hit rate: {stats[‘hit_rate’]:.1%}”)

async execute_task_streaming(task, context)[source]

Execute a task with streaming results.

This method streams task execution events as they occur, including: - Status updates (started, thinking, acting, completed) - LLM tokens (for agents with LLM clients) - Tool calls and results (for agents with tools) - Final result

Parameters:
Yields:

Dict[str, Any] – Event dictionaries with ‘type’ and event-specific data

Return type:

AsyncGenerator[Dict[str, Any], None]

Event types:
  • ‘status’: Status update (e.g., started, thinking, completed)

  • ‘token’: LLM token (for streaming text generation)

  • ‘tool_call’: Tool execution started

  • ‘tool_result’: Tool execution completed

  • ‘result’: Final task result

  • ‘error’: Error occurred

Example

```python async for event in agent.execute_task_streaming(task, context):

if event[‘type’] == ‘token’:

print(event[‘content’], end=’’, flush=True)

elif event[‘type’] == ‘tool_call’:

print(f”nCalling tool: {event[‘tool_name’]}”)

elif event[‘type’] == ‘tool_result’:

print(f”Tool result: {event[‘result’]}”)

elif event[‘type’] == ‘result’:

print(f”nFinal result: {event[‘output’]}”)

```

Note

Subclasses should override this method to provide streaming support. Default implementation falls back to non-streaming execute_task.

async process_message_streaming(message, sender_id=None)[source]

Process a message with streaming response.

This method streams the response text as it’s generated, providing a better user experience for long responses.

Parameters:
  • message (str) – Message content

  • sender_id (str | None) – Optional sender identifier

Yields:

str – Response text tokens/chunks

Return type:

AsyncGenerator[str, None]

Example

```python async for token in agent.process_message_streaming(“Hello!”):

print(token, end=’’, flush=True)

```

Note

Subclasses should override this method to provide streaming support. Default implementation falls back to non-streaming process_message.

async delegate_task(task, required_capabilities=None, target_agent_id=None)[source]

Delegate a task to another capable agent.

Parameters:
  • task (Dict[str, Any]) – Task specification to delegate

  • required_capabilities (List[str] | None) – Required capabilities for the task

  • target_agent_id (str | None) – Specific agent to delegate to (if None, finds capable agent)

Returns:

Task execution result from delegated agent

Raises:

ValueError – If collaboration not enabled or no capable agent found

Return type:

Dict[str, Any]

Example

```python # Delegate to specific agent result = await agent.delegate_task(

task={“description”: “Search for AI papers”}, target_agent_id=”search_agent”

)

# Delegate to any capable agent result = await agent.delegate_task(

task={“description”: “Analyze data”}, required_capabilities=[“data_analysis”, “statistics”]

async find_capable_agents(required_capabilities)[source]

Find agents with required capabilities.

Parameters:

required_capabilities (List[str]) – List of required capability names

Returns:

List of agents that have all required capabilities

Return type:

List[Any]

Example

```python agents = await agent.find_capable_agents([“search”, “summarize”]) for capable_agent in agents:

print(f”Found: {capable_agent.name}”)

```

async request_peer_review(task, result, reviewer_id=None)[source]

Request peer review of a task result.

Parameters:
  • task (Dict[str, Any]) – Original task specification

  • result (Dict[str, Any]) – Task execution result to review

  • reviewer_id (str | None) – Specific reviewer agent ID (if None, selects automatically)

Returns:

Review result with ‘approved’ (bool), ‘feedback’ (str), ‘reviewer_id’ (str)

Return type:

Dict[str, Any]

Example

```python result = await agent.execute_task(task, context) review = await agent.request_peer_review(task, result) if review[‘approved’]:

print(f”Approved: {review[‘feedback’]}”)

else:

print(f”Needs revision: {review[‘feedback’]}”)

```

async collaborate_on_task(task, collaborator_ids, strategy='parallel')[source]

Collaborate with other agents on a task.

Parameters:
  • task (Dict[str, Any]) – Task specification

  • collaborator_ids (List[str]) – List of agent IDs to collaborate with

  • strategy (str) – Collaboration strategy - ‘parallel’, ‘sequential’, or ‘consensus’

Returns:

Aggregated result based on strategy

Return type:

Dict[str, Any]

Strategies:
  • parallel: All agents work simultaneously, results aggregated

  • sequential: Agents work in order, each building on previous results

  • consensus: All agents work independently, best result selected by voting

Example

```python # Parallel collaboration result = await agent.collaborate_on_task(

task={“description”: “Analyze market trends”}, collaborator_ids=[“analyst1”, “analyst2”, “analyst3”], strategy=”parallel”

)

# Sequential collaboration (pipeline) result = await agent.collaborate_on_task(

task={“description”: “Research and summarize”}, collaborator_ids=[“researcher”, “summarizer”], strategy=”sequential”

)

# Consensus collaboration result = await agent.collaborate_on_task(

task={“description”: “Make recommendation”}, collaborator_ids=[“expert1”, “expert2”, “expert3”], strategy=”consensus”

async get_relevant_context(query, context_items, max_items=None, min_relevance_score=0.5)[source]

Get relevant context items using semantic search and relevance scoring.

This method filters and ranks context items based on their relevance to the query, helping agents stay within token limits while maintaining the most important context.

Parameters:
  • query (str) – Query or task description to match against

  • context_items (List[Dict[str, Any]]) – List of context items (dicts with ‘content’ field)

  • max_items (int | None) – Maximum number of items to return (None = no limit)

  • min_relevance_score (float) – Minimum relevance score (0.0-1.0)

Returns:

List of relevant context items, sorted by relevance (highest first)

Return type:

List[Dict[str, Any]]

Example

```python context_items = [

{“content”: “User prefers concise answers”, “type”: “preference”}, {“content”: “Previous task: data analysis”, “type”: “history”}, {“content”: “System configuration: prod”, “type”: “config”},

]

relevant = await agent.get_relevant_context(

query=”Analyze sales data”, context_items=context_items, max_items=2, min_relevance_score=0.6

) # Returns top 2 most relevant items with score >= 0.6 ```

async score_context_relevance(query, context_item)[source]

Score the relevance of a context item to a query.

Uses multiple signals to determine relevance: - Keyword overlap (basic) - Semantic similarity (if LLM client with embeddings available) - Recency (if timestamp available) - Type priority (if type specified)

Parameters:
  • query (str) – Query or task description

  • context_item (Dict[str, Any]) – Context item to score (dict with ‘content’ field)

Returns:

Relevance score between 0.0 (not relevant) and 1.0 (highly relevant)

Return type:

float

Example

```python score = await agent.score_context_relevance(

query=”Analyze sales data”, context_item={“content”: “Previous analysis results”, “type”: “history”}

) print(f”Relevance: {score:.2f}”) ```

async prune_context(context_items, max_tokens, query=None, preserve_types=None)[source]

Prune context items to fit within token limit.

Uses relevance scoring to keep the most important context while staying within token limits. Optionally preserves certain types of context regardless of relevance.

Parameters:
  • context_items (List[Dict[str, Any]]) – List of context items to prune

  • max_tokens (int) – Maximum total tokens allowed

  • query (str | None) – Optional query for relevance scoring

  • preserve_types (List[str] | None) – Optional list of types to always preserve

Returns:

Pruned list of context items that fit within token limit

Return type:

List[Dict[str, Any]]

Example

```python pruned = await agent.prune_context(

context_items=all_context, max_tokens=2000, query=”Analyze data”, preserve_types=[“constraint”, “requirement”]

) print(f”Pruned from {len(all_context)} to {len(pruned)} items”) ```

async record_experience(task, result, approach, tools_used=None)[source]

Record an experience for learning and adaptation.

Parameters:
  • task (Dict[str, Any]) – Task specification

  • result (Dict[str, Any]) – Task execution result

  • approach (str) – Approach/strategy used

  • tools_used (List[str] | None) – List of tools used (if any)

Return type:

None

Example

```python await agent.record_experience(

task={“description”: “Analyze data”, “type”: “analysis”}, result={“success”: True, “execution_time”: 5.2}, approach=”statistical_analysis”, tools_used=[“pandas”, “numpy”]

Get recommended approach based on past experiences.

Analyzes similar past experiences to recommend the best approach for the current task.

Parameters:

task (Dict[str, Any]) – Task specification

Returns:

Recommended approach dict with ‘approach’, ‘confidence’, ‘reasoning’ or None if no relevant experiences

Return type:

Dict[str, Any] | None

Example

```python recommendation = await agent.get_recommended_approach(

task={“description”: “Analyze sales data”, “type”: “analysis”}

) if recommendation:

print(f”Recommended: {recommendation[‘approach’]}”) print(f”Confidence: {recommendation[‘confidence’]:.2f}”) print(f”Reasoning: {recommendation[‘reasoning’]}”)

```

async get_learning_insights()[source]

Get learning insights and analytics.

Provides analytics about agent learning including success rates, common patterns, and areas for improvement.

Returns:

Dict with learning insights and statistics

Return type:

Dict[str, Any]

Example

`python insights = await agent.get_learning_insights() print(f"Total experiences: {insights['total_experiences']}") print(f"Success rate: {insights['overall_success_rate']:.2%}") print(f"Most common task: {insights['most_common_task_type']}") `

async adapt_strategy(task)[source]

Adapt strategy based on learning insights.

Analyzes past experiences to suggest strategy adaptations for the current task.

Parameters:

task (Dict[str, Any]) – Task specification

Returns:

Dict with strategy adaptations and recommendations

Return type:

Dict[str, Any]

Example

```python adaptations = await agent.adapt_strategy(

task={“description”: “Complex analysis”, “type”: “analysis”}

) print(f”Recommended approach: {adaptations[‘recommended_approach’]}”) print(f”Suggested tools: {adaptations[‘suggested_tools’]}”) ```

async check_resource_availability()[source]

Check if resources are available for task execution.

Checks against configured resource limits including: - Concurrent task limits - Token rate limits - Tool call rate limits

Returns:

Dict with ‘available’ (bool) and details about resource status

Return type:

Dict[str, Any]

Example

```python status = await agent.check_resource_availability() if status[‘available’]:

await agent.execute_task(task, context)

else:

print(f”Resources unavailable: {status[‘reason’]}”)

```

async wait_for_resources(timeout=None)[source]

Wait for resources to become available.

Parameters:

timeout (float | None) – Maximum time to wait in seconds (uses resource_wait_timeout_seconds if None)

Returns:

True if resources became available, False if timeout

Return type:

bool

Example

```python if await agent.wait_for_resources(timeout=30):

await agent.execute_task(task, context)

else:

print(“Timeout waiting for resources”)

```

async get_resource_usage()[source]

Get current resource usage statistics.

Returns:

Dict with resource usage information

Return type:

Dict[str, Any]

Example

`python usage = await agent.get_resource_usage() print(f"Active tasks: {usage['active_tasks']}") print(f"Tokens/min: {usage['tokens_per_minute']}") print(f"Tool calls/min: {usage['tool_calls_per_minute']}") `

async execute_with_recovery(task, context, strategies=None)[source]

Execute task with advanced error recovery strategies.

Tries multiple recovery strategies in sequence until one succeeds: 1. Retry with exponential backoff 2. Simplify task and retry 3. Use fallback approach 4. Delegate to another agent

Parameters:
  • task (Dict[str, Any]) – Task specification

  • context (Dict[str, Any]) – Execution context

  • strategies (List[str] | None) – List of strategy names to try (uses default chain if None)

Returns:

Task execution result

Raises:

TaskExecutionError – If all recovery strategies fail

Return type:

Dict[str, Any]

Example

```python result = await agent.execute_with_recovery(

task={“description”: “Complex analysis”}, context={}, strategies=[“retry”, “simplify”, “delegate”]

__str__()[source]

String representation.

Return type:

str

__repr__()[source]

Detailed representation.

Return type:

str

Context

Context Engine

ContextEngine: Advanced Context and Session Management Engine

This engine extends TaskContext capabilities to provide comprehensive session management, conversation tracking, and persistent storage for BaseAIService.

Key Features: 1. Multi-session management (extends TaskContext from single task to multiple sessions) 2. Redis backend storage for persistence and scalability 3. Conversation history management with optimization 4. Performance metrics and analytics 5. Resource and lifecycle management 6. Integration with BaseServiceCheckpointer

class aiecs.domain.context.context_engine.DateTimeEncoder[source]

Bases: JSONEncoder

Custom JSON encoder to handle datetime objects.

default(obj)[source]

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return super().default(o)
class aiecs.domain.context.context_engine.SessionMetrics[source]

Bases: object

Session-level performance metrics.

session_id: str
user_id: str
created_at: datetime
last_activity: datetime
request_count: int = 0
error_count: int = 0
total_processing_time: float = 0.0
status: str = 'active'
to_dict()[source]
Return type:

Dict[str, Any]

classmethod from_dict(data)[source]
Parameters:

data (Dict[str, Any])

Return type:

SessionMetrics

__init__(session_id, user_id, created_at, last_activity, request_count=0, error_count=0, total_processing_time=0.0, status='active')
Parameters:
Return type:

None

class aiecs.domain.context.context_engine.ConversationMessage[source]

Bases: object

Structured conversation message.

role: str
content: str
timestamp: datetime
metadata: Dict[str, Any] | None = None
to_dict()[source]
Return type:

Dict[str, Any]

classmethod from_dict(data)[source]
Parameters:

data (Dict[str, Any])

Return type:

ConversationMessage

__init__(role, content, timestamp, metadata=None)
Parameters:
Return type:

None

class aiecs.domain.context.context_engine.CompressionConfig[source]

Bases: object

Configuration for conversation compression.

Provides flexible control over compression behavior with multiple strategies to manage conversation history size and reduce token usage.

Compression Strategies: - truncate: Fast truncation, keeps most recent N messages (no LLM required) - summarize: LLM-based summarization of older messages - semantic: Embedding-based deduplication of similar messages - hybrid: Combination of multiple strategies applied sequentially

Key Features: - Automatic compression triggers based on message count - Custom prompt templates for summarization - Configurable similarity thresholds for semantic deduplication - Performance timeouts to prevent long-running operations

Attributes:

strategy: Compression strategy to use. One of: “truncate”, “summarize”, “semantic”, “hybrid” max_messages: Maximum messages to keep (for truncation strategy) keep_recent: Always keep N most recent messages (applies to all strategies) summary_prompt_template: Custom prompt template for summarization (uses {messages} placeholder) summary_max_tokens: Maximum tokens for summary output include_summary_in_history: Whether to add summary as system message in history similarity_threshold: Similarity threshold for semantic deduplication (0.0-1.0) embedding_model: Embedding model name for semantic deduplication hybrid_strategies: List of strategies to combine for hybrid mode (default: [“truncate”, “summarize”]) auto_compress_enabled: Enable automatic compression when threshold exceeded auto_compress_threshold: Message count threshold to trigger auto-compression auto_compress_target: Target message count after auto-compression compression_timeout: Maximum time for compression operation in seconds

Examples:

# Example 1: Basic truncation configuration config = CompressionConfig(

strategy=”truncate”, max_messages=50, keep_recent=10

)

# Example 2: LLM-based summarization config = CompressionConfig(

strategy=”summarize”, keep_recent=10, summary_max_tokens=500, include_summary_in_history=True

)

# Example 3: Semantic deduplication config = CompressionConfig(

strategy=”semantic”, keep_recent=10, similarity_threshold=0.95, embedding_model=”text-embedding-ada-002”

)

# Example 4: Hybrid strategy (truncate then summarize) config = CompressionConfig(

strategy=”hybrid”, hybrid_strategies=[“truncate”, “summarize”], keep_recent=10, summary_max_tokens=500

)

# Example 5: Auto-compression enabled config = CompressionConfig(

auto_compress_enabled=True, auto_compress_threshold=100, auto_compress_target=50, strategy=”summarize”, keep_recent=10

)

# Example 6: Custom summarization prompt config = CompressionConfig(

strategy=”summarize”, summary_prompt_template=(

“Summarize the following conversation focusing on ” “key decisions and action items:

{messages}”

), summary_max_tokens=300

)

strategy: str = 'truncate'
max_messages: int = 50
keep_recent: int = 10
summary_prompt_template: str | None = None
summary_max_tokens: int = 500
include_summary_in_history: bool = True
similarity_threshold: float = 0.95
embedding_model: str = 'text-embedding-ada-002'
hybrid_strategies: List[str] | None = None
auto_compress_enabled: bool = False
auto_compress_threshold: int = 100
auto_compress_target: int = 50
compression_timeout: float = 30.0
__post_init__()[source]

Validate and set defaults.

__init__(strategy='truncate', max_messages=50, keep_recent=10, summary_prompt_template=None, summary_max_tokens=500, include_summary_in_history=True, similarity_threshold=0.95, embedding_model='text-embedding-ada-002', hybrid_strategies=None, auto_compress_enabled=False, auto_compress_threshold=100, auto_compress_target=50, compression_timeout=30.0)
Parameters:
  • strategy (str)

  • max_messages (int)

  • keep_recent (int)

  • summary_prompt_template (str | None)

  • summary_max_tokens (int)

  • include_summary_in_history (bool)

  • similarity_threshold (float)

  • embedding_model (str)

  • hybrid_strategies (List[str] | None)

  • auto_compress_enabled (bool)

  • auto_compress_threshold (int)

  • auto_compress_target (int)

  • compression_timeout (float)

Return type:

None

class aiecs.domain.context.context_engine.ContextEngine[source]

Bases: IStorageBackend, ICheckpointerBackend

Advanced Context and Session Management Engine.

Implements core storage interfaces to provide comprehensive session management with Redis backend storage for BaseAIService and BaseServiceCheckpointer.

This implementation follows the middleware’s core interface pattern, enabling dependency inversion and clean architecture.

Key Features: - Multi-session management with Redis backend - Conversation history management with compression - Performance metrics and analytics - Resource and lifecycle management - Integration with BaseServiceCheckpointer

Compression Strategies: - truncate: Fast truncation (no LLM required) - summarize: LLM-based summarization - semantic: Embedding-based deduplication - hybrid: Combination of multiple strategies

Examples:

# Example 1: Basic ContextEngine initialization engine = ContextEngine() await engine.initialize()

# Create session session = await engine.create_session(

session_id=”session-123”, user_id=”user-456”

)

# Add conversation messages await engine.add_conversation_message(

session_id=”session-123”, role=”user”, content=”Hello, I need help”

)

# Example 2: ContextEngine with compression (truncation strategy) from aiecs.domain.context.context_engine import CompressionConfig

compression_config = CompressionConfig(

strategy=”truncate”, max_messages=50, keep_recent=10 # Always keep 10 most recent messages

)

engine = ContextEngine(compression_config=compression_config) await engine.initialize()

# Add many messages for i in range(100):

await engine.add_conversation_message(

session_id=”session-123”, role=”user” if i % 2 == 0 else “assistant”, content=f”Message {i}”

)

# Compress conversation (truncates to 10 most recent) result = await engine.compress_conversation(“session-123”) print(f”Compressed from {result[‘original_count’]} to {result[‘compressed_count’]} messages”)

# Example 3: ContextEngine with LLM-based summarization from aiecs.llm import OpenAIClient

llm_client = OpenAIClient()

compression_config = CompressionConfig(

strategy=”summarize”, keep_recent=10, # Keep 10 most recent messages summary_max_tokens=500, include_summary_in_history=True

)

engine = ContextEngine(

compression_config=compression_config, llm_client=llm_client # Required for summarization

) await engine.initialize()

# Add conversation for i in range(50):

await engine.add_conversation_message(

session_id=”session-123”, role=”user” if i % 2 == 0 else “assistant”, content=f”Message {i}: Important information about topic {i % 5}”

)

# Compress using summarization result = await engine.compress_conversation(“session-123”, strategy=”summarize”) print(f”Compressed: {result[‘original_count’]} -> {result[‘compressed_count’]} messages”) print(f”Compression ratio: {result[‘compression_ratio’]:.1%}”)

# Example 4: ContextEngine with semantic deduplication compression_config = CompressionConfig(

strategy=”semantic”, keep_recent=10, similarity_threshold=0.95, # Remove messages >95% similar embedding_model=”text-embedding-ada-002”

)

engine = ContextEngine(

compression_config=compression_config, llm_client=llm_client # Required for embeddings

) await engine.initialize()

# Add conversation with similar messages messages = [

“What’s the weather?”, “What’s the weather today?”, “Tell me about the weather”, “What’s the temperature?”

] for msg in messages:

await engine.add_conversation_message(

session_id=”session-123”, role=”user”, content=msg

)

# Compress using semantic deduplication result = await engine.compress_conversation(“session-123”, strategy=”semantic”) print(f”Removed {result[‘original_count’] - result[‘compressed_count’]} similar messages”)

# Example 5: ContextEngine with hybrid compression compression_config = CompressionConfig(

strategy=”hybrid”, hybrid_strategies=[“truncate”, “summarize”], # Apply truncate then summarize keep_recent=10, summary_max_tokens=500

)

engine = ContextEngine(

compression_config=compression_config, llm_client=llm_client

) await engine.initialize()

# Compress using hybrid strategy result = await engine.compress_conversation(“session-123”, strategy=”hybrid”)

# Example 6: Auto-compression on message limit compression_config = CompressionConfig(

auto_compress_enabled=True, auto_compress_threshold=100, # Trigger at 100 messages auto_compress_target=50, # Compress to 50 messages strategy=”summarize”, keep_recent=10

)

engine = ContextEngine(

compression_config=compression_config, llm_client=llm_client

) await engine.initialize()

# Add messages - auto-compression triggers at 100 for i in range(105):

await engine.add_conversation_message(

session_id=”session-123”, role=”user” if i % 2 == 0 else “assistant”, content=f”Message {i}”

)

# Check if auto-compression was triggered result = await engine.auto_compress_on_limit(“session-123”) if result:

print(f”Auto-compressed: {result[‘original_count’]} -> {result[‘compressed_count’]}”)

# Example 7: Custom compression prompt template compression_config = CompressionConfig(

strategy=”summarize”, summary_prompt_template=(

“Summarize the following conversation focusing on key decisions, ” “action items, and important facts. Keep it concise:

{messages}”

), summary_max_tokens=300

)

engine = ContextEngine(

compression_config=compression_config, llm_client=llm_client

) await engine.initialize()

# Compress with custom prompt result = await engine.compress_conversation(“session-123”)

# Example 8: Get compressed context in different formats engine = ContextEngine(compression_config=compression_config, llm_client=llm_client) await engine.initialize()

# Get as formatted string context_string = await engine.get_compressed_context(

session_id=”session-123”, format=”string”, compress_first=True # Compress before returning

) print(context_string)

# Get as messages list messages = await engine.get_compressed_context(

session_id=”session-123”, format=”messages”, compress_first=False # Use existing compressed version

)

# Get as dictionary context_dict = await engine.get_compressed_context(

session_id=”session-123”, format=”dict”

)

# Example 9: Runtime compression config override engine = ContextEngine(

compression_config=CompressionConfig(strategy=”truncate”), llm_client=llm_client

) await engine.initialize()

# Override compression config for specific operation custom_config = CompressionConfig(

strategy=”summarize”, summary_max_tokens=1000

)

result = await engine.compress_conversation(

session_id=”session-123”, config_override=custom_config

)

# Example 10: Compression with custom LLM client class CustomLLMClient:

provider_name = “custom”

async def generate_text(self, messages, **kwargs):

# Custom summarization logic return LLMResponse(content=”Custom summary…”)

async def get_embeddings(self, texts, model):

# Custom embedding logic return [[0.1] * 1536 for _ in texts]

custom_llm = CustomLLMClient()

compression_config = CompressionConfig(strategy=”semantic”) engine = ContextEngine(

compression_config=compression_config, llm_client=custom_llm # Custom LLM client for compression

) await engine.initialize()

# Compress using custom LLM client result = await engine.compress_conversation(“session-123”, strategy=”semantic”)

__init__(use_existing_redis=True, compression_config=None, llm_client=None, permanent_backend=None)[source]

Initialize ContextEngine.

Parameters:
  • use_existing_redis (bool) – Whether to use the existing Redis client from infrastructure (已弃用: 现在总是创建独立的 RedisClient 实例以避免事件循环冲突)

  • compression_config (CompressionConfig | None) – Optional compression configuration for conversation compression

  • llm_client (Any | None) – Optional LLM client for summarization and embeddings (must implement LLMClientProtocol)

  • permanent_backend (IPermanentStorageBackend | None) – Optional backend for dual-write disk persistence (e.g. ClickHouse). Writes are fire-and-forget; failures do not block Redis path.

async initialize()[source]

Initialize Redis connection and validate setup.

Return type:

bool

async close()[source]

Close Redis connection and permanent backend.

async create_session(session_id, user_id, metadata=None)[source]

Create a new session.

Parameters:
Return type:

Dict[str, Any]

async get_session(session_id)[source]

Get session by ID.

Parameters:

session_id (str)

Return type:

Dict[str, Any] | None

async update_session(session_id, updates=None, increment_requests=False, add_processing_time=0.0, mark_error=False)[source]

Update session with activity and metrics.

Parameters:
Return type:

bool

async end_session(session_id, status='completed')[source]

End a session and update metrics.

Parameters:
  • session_id (str)

  • status (str)

Return type:

bool

async add_conversation_message(session_id, role, content, metadata=None)[source]

Add message to conversation history.

Parameters:
Return type:

bool

async get_conversation_history(session_id, limit=50)[source]

Get conversation history for a session.

Parameters:
  • session_id (str)

  • limit (int)

Return type:

List[Dict[str, Any]]

async get_task_context(session_id)[source]

Get TaskContext for a session.

Parameters:

session_id (str)

Return type:

TaskContext | None

async store_checkpoint(thread_id, checkpoint_id, checkpoint_data, metadata=None)[source]

Store checkpoint data for LangGraph workflows.

Automatically converts dataclasses to dictionaries to ensure JSON serialization compatibility.

Parameters:
Return type:

bool

async get_checkpoint(thread_id, checkpoint_id=None)[source]

Get checkpoint data. If checkpoint_id is None, get the latest.

Parameters:
  • thread_id (str)

  • checkpoint_id (str | None)

Return type:

Dict[str, Any] | None

async list_checkpoints(thread_id, limit=10)[source]

List checkpoints for a thread, ordered by creation time (newest first).

Parameters:
Return type:

List[Dict[str, Any]]

async cleanup_expired_sessions(max_idle_hours=24)[source]

Clean up expired sessions and associated data.

Parameters:

max_idle_hours (int)

Return type:

int

async get_metrics()[source]

Get comprehensive metrics.

Return type:

Dict[str, Any]

async health_check()[source]

Perform health check.

Return type:

Dict[str, Any]

async put_checkpoint(thread_id, checkpoint_id, checkpoint_data, metadata=None)[source]

Store a checkpoint for LangGraph workflows (ICheckpointerBackend interface).

Parameters:
Return type:

bool

async put_writes(thread_id, checkpoint_id, task_id, writes_data)[source]

Store intermediate writes for a checkpoint (ICheckpointerBackend interface).

Parameters:
Return type:

bool

async get_writes(thread_id, checkpoint_id)[source]

Get intermediate writes for a checkpoint (ICheckpointerBackend interface).

Parameters:
  • thread_id (str)

  • checkpoint_id (str)

Return type:

List[tuple]

async store_task_context(session_id, context)[source]

Store TaskContext for a session (ITaskContextStorage interface).

Parameters:
  • session_id (str)

  • context (Any)

Return type:

bool

async create_conversation_session(session_id, participants, session_type, metadata=None)[source]

Create an isolated conversation session between participants.

Parameters:
  • session_id (str) – Base session ID

  • participants (List[Dict[str, Any]]) – List of participant dictionaries with id, type, role

  • session_type (str) – Type of conversation (‘user_to_mc’, ‘mc_to_agent’, ‘agent_to_agent’, ‘user_to_agent’)

  • metadata (Dict[str, Any] | None) – Additional session metadata

Returns:

Generated session key for conversation isolation

Return type:

str

async add_agent_communication_message(session_key, sender_id, sender_type, sender_role, recipient_id, recipient_type, recipient_role, content, message_type='communication', metadata=None)[source]

Add a message to an agent communication session.

Parameters:
  • session_key (str) – Isolated session key

  • sender_id (str) – ID of the sender

  • sender_type (str) – Type of sender (‘master_controller’, ‘agent’, ‘user’)

  • sender_role (str | None) – Role of sender (for agents)

  • recipient_id (str) – ID of the recipient

  • recipient_type (str) – Type of recipient

  • recipient_role (str | None) – Role of recipient (for agents)

  • content (str) – Message content

  • message_type (str) – Type of message

  • metadata (Dict[str, Any] | None) – Additional message metadata

Returns:

Success status

Return type:

bool

async get_agent_conversation_history(session_key, limit=50, message_types=None)[source]

Get conversation history for an agent communication session.

Parameters:
  • session_key (str) – Isolated session key

  • limit (int) – Maximum number of messages to retrieve

  • message_types (List[str] | None) – Filter by message types

Returns:

List of conversation messages

Return type:

List[Dict[str, Any]]

async compress_conversation(session_id, strategy=None, config_override=None)[source]

Compress conversation history using specified strategy.

Parameters:
  • session_id (str) – Session ID to compress

  • strategy (str | None) – Compression strategy (overrides config if provided)

  • config_override (CompressionConfig | None) – Override compression config for this operation

Returns:

{

“success”: bool, “strategy”: str, “original_count”: int, “compressed_count”: int, “compression_ratio”: float, “tokens_saved”: int (if applicable), “time_taken”: float

}

Return type:

Dictionary with compression results

Example

result = await engine.compress_conversation(

session_id=”session-123”, strategy=”summarize”

) print(f”Compressed from {result[‘original_count’]} to {result[‘compressed_count’]} messages”)

async auto_compress_on_limit(session_id)[source]

Automatically compress conversation if it exceeds threshold.

Checks if conversation exceeds auto_compress_threshold and compresses to auto_compress_target if needed.

Parameters:

session_id (str) – Session ID to check

Returns:

Compression result dict if compression was triggered, None otherwise

Return type:

Dict[str, Any] | None

Example

# Configure auto-compression config = CompressionConfig(

auto_compress_enabled=True, auto_compress_threshold=100, auto_compress_target=50

) engine = ContextEngine(compression_config=config)

# Check and auto-compress if needed result = await engine.auto_compress_on_limit(session_id) if result:

print(f”Auto-compressed: {result[‘original_count’]} -> {result[‘compressed_count’]}”)

async get_compressed_context(session_id, format='messages', compress_first=False)[source]

Get conversation context in compressed format.

Parameters:
  • session_id (str) – Session ID

  • format (str) – Output format - “messages”, “string”, or “dict”

  • compress_first (bool) – Whether to compress before returning

Returns:

  • “messages”: List[ConversationMessage]

  • ”string”: Formatted string

  • ”dict”: List[Dict[str, Any]]

Return type:

Conversation in requested format

Example

# Get as formatted string context = await engine.get_compressed_context(

session_id=”session-123”, format=”string”

) print(context)

# Get as messages, compress first messages = await engine.get_compressed_context(

session_id=”session-456”, format=”messages”, compress_first=True

)

Task

Task Context

class aiecs.domain.task.task_context.ContextUpdate[source]

Bases: object

Represents a single update to the context (e.g., message, metadata, or resource).

timestamp: float
update_type: str
data: Any
metadata: Dict[str, Any]
__init__(timestamp, update_type, data, metadata)
Parameters:
Return type:

None

class aiecs.domain.task.task_context.TaskContext[source]

Bases: object

Enhanced context manager for task execution with: - Context history tracking and checkpointing - Resource acquisition and release - Performance tracking - File and model tracking - Persistent storage - Metadata toggles - Enhanced error handling

__init__(data, task_dir='./tasks')[source]
Parameters:
add_context_update(update_type, data, metadata=None)[source]

Add a context update (e.g., message, metadata change).

Parameters:
add_resource(name, resource)[source]

Add a resource that needs cleanup.

Parameters:
Return type:

None

track_file_operation(file_path, operation, source='task')[source]

Track a file operation (e.g., read, edit).

Parameters:
  • file_path (str)

  • operation (str)

  • source (str)

track_model_usage(model_id, provider_id, mode)[source]

Track AI model usage.

Parameters:
  • model_id (str)

  • provider_id (str)

  • mode (str)

optimize_context(max_size=1000)[source]

Optimize context by removing duplicates and old entries.

Parameters:

max_size (int)

Return type:

bool

async truncate_context_history(timestamp)[source]

Truncate context history after a given timestamp.

Parameters:

timestamp (float)

get_active_metadata()[source]

Return metadata filtered by toggles.

Return type:

Dict[str, Any]

to_dict()[source]

Convert context to dictionary.

Return type:

Dict[str, Any]

__enter__()[source]

Synchronous context entry.

__exit__(exc_type, exc_val, exc_tb)[source]

Synchronous context exit with cleanup.

async __aenter__()[source]

Asynchronous context entry.

async __aexit__(exc_type, exc_val, exc_tb)[source]

Asynchronous context exit with cleanup.

aiecs.domain.task.task_context.build_context(data)[source]

Build a simple context dictionary (for backward compatibility).

Parameters:

data (dict)

Return type:

dict

aiecs.domain.task.task_context.task_context(data, task_dir='./tasks')[source]

Async context manager for task execution.

Usage:
async with task_context(request_data, task_dir=”/path/to/tasks”) as context:

context.add_context_update(“message”, “User input”, {“source”: “user”}) context.track_file_operation(“example.py”, “read”, “tool”) result = await service_instance.run(data, context)

Parameters:
Return type:

AsyncGenerator[TaskContext, None]

Task Models

class aiecs.domain.task.model.TaskContext[source]

Bases: object

Task context model

__init__(user_id, task_id, session_id=None, metadata=None)[source]
Parameters:
set_variable(key, value)[source]

Set task variable

Parameters:
get_variable(key, default=None)[source]

Get task variable

Parameters:
Return type:

Any

dict()[source]
Return type:

Dict[str, Any]

class aiecs.domain.task.model.DSLStep[source]

Bases: object

DSL step model

__init__(step_type, condition=None, description='', params=None)[source]
Parameters:
dict()[source]
Return type:

Dict[str, Any]