Execution Interfaces Technical Documentation

Overview

Design Motivation and Problem Background

When building large-scale AI application systems, the execution layer faces the following core challenges:

1. Lack of Interface Standardization

  • Different execution components (tool executors, operation executors, cache providers) lack unified interface specifications

  • Dependency relationships between components are unclear, leading to tight coupling and difficulty in testing

  • Lack of abstraction layer makes component replacement and extension difficult

2. Violation of Dependency Inversion Principle

  • High-level modules directly depend on specific implementations of low-level modules

  • Lack of abstract interfaces makes the system difficult to extend and maintain

  • Testing is difficult, unable to perform effective unit tests and integration tests

3. Insufficient Plugin Architecture Support

  • Adding new execution engines requires modifying existing code

  • Lack of unified execution interface, unable to support multiple execution strategies

  • Registration and discovery mechanisms for execution components are incomplete

4. Type Safety and Contract Management

  • Lack of clear interface contract definitions

  • Parameter types and return value types are unclear

  • Lack of backward compatibility guarantees when interfaces change

Execution Interface System Solution:

  • Interface Segregation Principle: Separate interfaces with different responsibilities to improve cohesion

  • Dependency Inversion: High-level modules depend on abstract interfaces, low-level modules implement interfaces

  • Plugin Support: Support multiple execution engines through unified interface

  • Type Safety: Interface definitions based on Python type system

  • Clear Contracts: Define clear interface contracts through abstract methods

Component Positioning

execution_interface.py is the core interface definition of the AIECS system, located in the Domain Layer, defining all abstract interfaces related to execution. As the contract layer of the system, it provides type-safe, clearly-responsible interface specifications.

Component Type and Positioning

Component Type

Domain Interface Component - Located in the Domain Layer, belongs to system contract definitions

Architecture Layers

┌─────────────────────────────────────────┐
│         Application Layer               │  ← Components using interfaces
│  (OperationExecutor, ServiceExecutor)   │
└─────────────────┬───────────────────────┘
                  │
┌─────────────────▼───────────────────────┐
│         Domain Layer                    │  ← Execution interfaces layer
│  (Execution Interfaces, Contracts)      │
└─────────────────┬───────────────────────┘
                  │
┌─────────────────▼───────────────────────┐
│       Infrastructure Layer              │  ← Components implementing interfaces
│  (ToolExecutor, CacheProvider, etc.)    │
└─────────────────┬───────────────────────┘
                  │
┌─────────────────▼───────────────────────┐
│         External Services               │  ← External dependencies
│  (Tools, Cache, Database, etc.)         │
└─────────────────────────────────────────┘

Upstream Components (Consumers)

1. Application Layer Executors

  • OperationExecutor (application/executors/operation_executor.py)

  • ServiceExecutor (if exists)

  • TaskExecutor (if exists)

2. Infrastructure Layer Implementations

  • ToolExecutor (tools/tool_executor/tool_executor.py)

  • ExecutionUtils (utils/execution_utils.py)

  • CacheProvider (if exists)

3. Domain Services

  • TaskService (if exists)

  • ExecutionService (if exists)

Downstream Components (Dependencies)

1. Python ABC System

  • Purpose: Provide abstract base class support

  • Functionality: Interface definition, abstract method declaration

  • Dependency Type: Language feature dependency

2. Domain Models

  • TaskStepResult (domain/execution/model.py)

  • TaskStatus (domain/execution/model.py)

  • ErrorCode (domain/execution/model.py)

3. Type System

  • Purpose: Provide type checking and type safety

  • Functionality: Parameter type validation, return value type checking

  • Dependency Type: Python type system

Core Interfaces Explained

1. IToolProvider - Tool Provider Interface

class IToolProvider(ABC):
    """Tool provider interface - Domain layer abstraction"""
    
    @abstractmethod
    def get_tool(self, tool_name: str) -> Any:
        """Get tool instance"""
        pass
    
    @abstractmethod
    def has_tool(self, tool_name: str) -> bool:
        """Check if tool exists"""
        pass

Responsibilities:

  • Tool Discovery: Get tool instance based on tool name

  • Tool Checking: Verify if tool is available

  • Tool Management: Manage tool lifecycle

Implementation Requirements:

  • Must support getting tool instances by name

  • Must provide tool existence checking

  • Should support dynamic tool registration and deregistration

2. IToolExecutor - Tool Executor Interface

class IToolExecutor(ABC):
    """Tool executor interface - Domain layer abstraction"""
    
    @abstractmethod
    def execute(self, tool: Any, operation_name: str, **params) -> Any:
        """Synchronously execute tool operation"""
        pass
    
    @abstractmethod
    async def execute_async(self, tool: Any, operation_name: str, **params) -> Any:
        """Asynchronously execute tool operation"""
        pass

Responsibilities:

  • Synchronous Execution: Support synchronous tool operation execution

  • Asynchronous Execution: Support asynchronous tool operation execution

  • Parameter Passing: Handle parameter passing for tool operations

  • Result Return: Unify return format for tool operations

Implementation Requirements:

  • Must support both synchronous and asynchronous execution modes

  • Must handle parameter validation and type conversion

  • Should provide error handling and retry mechanisms

3. ICacheProvider - Cache Provider Interface

class ICacheProvider(ABC):
    """Cache provider interface - Domain layer abstraction"""
    
    @abstractmethod
    def generate_cache_key(self, operation_type: str, user_id: str, task_id: str,
                          args: tuple, kwargs: Dict[str, Any]) -> str:
        """Generate cache key"""
        pass
    
    @abstractmethod
    def get_from_cache(self, cache_key: str) -> Optional[Any]:
        """Get data from cache"""
        pass
    
    @abstractmethod
    def add_to_cache(self, cache_key: str, value: Any) -> None:
        """Add data to cache"""
        pass

Responsibilities:

  • Cache Key Generation: Generate unique cache keys based on operation context

  • Cache Reading: Get data from cache

  • Cache Writing: Write data to cache

  • Cache Management: Manage cache expiration and cleanup

Implementation Requirements:

  • Must support context-aware cache key generation

  • Must provide thread-safe cache operations

  • Should support cache expiration and cleanup strategies

4. IOperationExecutor - Operation Executor Interface

class IOperationExecutor(ABC):
    """Operation executor interface - Domain layer abstraction"""
    
    @abstractmethod
    async def execute_operation(self, operation_spec: str, params: Dict[str, Any]) -> Any:
        """Execute single operation"""
        pass
    
    @abstractmethod
    async def batch_execute_operations(self, operations: List[Dict[str, Any]]) -> List[Any]:
        """Batch execute operations"""
        pass
    
    @abstractmethod
    async def execute_operations_sequence(self, operations: List[Dict[str, Any]],
                                        user_id: str, task_id: str,
                                        stop_on_failure: bool = False,
                                        save_callback: Optional[Callable] = None) -> List[TaskStepResult]:
        """Sequentially execute operation sequence"""
        pass
    
    @abstractmethod
    async def execute_parallel_operations(self, operations: List[Dict[str, Any]]) -> List[TaskStepResult]:
        """Execute operations in parallel"""
        pass

Responsibilities:

  • Single Operation Execution: Execute single tool operation

  • Batch Execution: Batch execute multiple operations

  • Sequential Execution: Execute operation sequence in order

  • Parallel Execution: Execute multiple operations in parallel

Implementation Requirements:

  • Must support multiple execution modes

  • Must handle dependencies between operations

  • Should provide error handling and recovery mechanisms

5. ExecutionInterface - Unified Execution Interface

class ExecutionInterface(ABC):
    """Unified execution interface - Support plugin execution engines"""
    
    @abstractmethod
    async def execute_operation(self, operation_spec: str, params: Dict[str, Any]) -> Any:
        """Execute single operation"""
        pass
    
    @abstractmethod
    async def execute_task(self, task_name: str, input_data: Dict[str, Any], context: Dict[str, Any]) -> Any:
        """Execute single task"""
        pass
    
    @abstractmethod
    async def batch_execute_operations(self, operations: List[Dict[str, Any]]) -> List[Any]:
        """Batch execute operations"""
        pass
    
    @abstractmethod
    async def batch_execute_tasks(self, tasks: List[Dict[str, Any]]) -> List[Any]:
        """Batch execute tasks"""
        pass
    
    def register_executor(self, executor_type: str, executor_instance: Any) -> None:
        """Register new executor type"""
        raise NotImplementedError("Executor registration is not implemented in this interface")

Responsibilities:

  • Unified Interface: Provide unified execution interface

  • Plugin Support: Support multiple execution engines

  • Task Execution: Support both task and operation execution modes

  • Batch Processing: Support batch execution of operations and tasks

Design Patterns Explained

1. Interface Segregation Principle

# Separate interfaces with different responsibilities
class IToolProvider(ABC):      # Tool provision
class IToolExecutor(ABC):      # Tool execution
class ICacheProvider(ABC):     # Cache management
class IOperationExecutor(ABC): # Operation execution

Advantages:

  • Single Responsibility: Each interface is responsible for only one specific function

  • Easy to Implement: Implementation classes only need to implement relevant interfaces

  • Easy to Test: Can test each interface independently

2. Dependency Inversion Principle

# High-level modules depend on abstract interfaces
class OperationExecutor:
    def __init__(self, tool_executor: IToolExecutor, cache_provider: ICacheProvider):
        self.tool_executor = tool_executor
        self.cache_provider = cache_provider

Advantages:

  • Loose Coupling: High-level modules don’t depend on specific implementations

  • Extensible: Can easily replace implementations

  • Testable: Can use mock objects for testing

3. Strategy Pattern

# Support multiple execution strategies
class ExecutionInterface(ABC):
    def register_executor(self, executor_type: str, executor_instance: Any):
        # Support registering different executors
        pass

Advantages:

  • Algorithm Encapsulation: Encapsulate execution algorithms in specific implementations

  • Dynamic Switching: Can switch execution strategies at runtime

  • Easy to Extend: Adding new execution strategies doesn’t require modifying existing code

Interface Implementation Standards

1. Interface Implementation Requirements

Required Methods

class ToolExecutorImpl(IToolExecutor):
    def execute(self, tool: Any, operation_name: str, **params) -> Any:
        """Must implement synchronous execution method"""
        # Implementation logic
        pass
    
    async def execute_async(self, tool: Any, operation_name: str, **params) -> Any:
        """Must implement asynchronous execution method"""
        # Implementation logic
        pass

Type Safety Requirements

from typing import TypeVar, Generic

T = TypeVar('T')

class TypedToolExecutor(IToolExecutor, Generic[T]):
    def execute(self, tool: Any, operation_name: str, **params) -> T:
        """Type-safe execution method"""
        # Implementation logic
        pass

2. Error Handling Standards

Exception Type Definitions

class ExecutionError(Exception):
    """Execution error base class"""
    pass

class ToolNotFoundError(ExecutionError):
    """Tool not found error"""
    pass

class OperationFailedError(ExecutionError):
    """Operation execution failed error"""
    pass

Error Handling Implementation

class RobustToolExecutor(IToolExecutor):
    def execute(self, tool: Any, operation_name: str, **params) -> Any:
        try:
            # Execution logic
            return result
        except ToolNotFoundError:
            # Handle tool not found
            raise
        except Exception as e:
            # Handle other errors
            raise OperationFailedError(f"Operation failed: {e}") from e

3. Logging and Monitoring Standards

Logging

import logging

class LoggingToolExecutor(IToolExecutor):
    def __init__(self):
        self.logger = logging.getLogger(__name__)
    
    def execute(self, tool: Any, operation_name: str, **params) -> Any:
        self.logger.info(f"Executing operation: {operation_name}")
        try:
            result = self._do_execute(tool, operation_name, **params)
            self.logger.info(f"Operation completed: {operation_name}")
            return result
        except Exception as e:
            self.logger.error(f"Operation failed: {operation_name}, error: {e}")
            raise

Performance Monitoring

import time
from functools import wraps

def monitor_execution(func):
    """Execution monitoring decorator"""
    @wraps(func)
    async def wrapper(self, *args, **kwargs):
        start_time = time.time()
        try:
            result = await func(self, *args, **kwargs)
            execution_time = time.time() - start_time
            self.logger.info(f"Execution completed in {execution_time:.3f}s")
            return result
        except Exception as e:
            execution_time = time.time() - start_time
            self.logger.error(f"Execution failed after {execution_time:.3f}s: {e}")
            raise
    return wrapper

Usage Examples

1. Basic Interface Implementation

Tool Provider Implementation

from aiecs.core.interface.execution_interface import IToolProvider

class ToolProviderImpl(IToolProvider):
    def __init__(self):
        self._tools = {}
    
    def register_tool(self, name: str, tool: Any):
        """Register tool"""
        self._tools[name] = tool
    
    def get_tool(self, tool_name: str) -> Any:
        """Get tool instance"""
        if tool_name not in self._tools:
            raise ValueError(f"Tool '{tool_name}' not found")
        return self._tools[tool_name]
    
    def has_tool(self, tool_name: str) -> bool:
        """Check if tool exists"""
        return tool_name in self._tools

Tool Executor Implementation

from aiecs.core.interface.execution_interface import IToolExecutor

class ToolExecutorImpl(IToolExecutor):
    def __init__(self, cache_provider: ICacheProvider = None):
        self.cache_provider = cache_provider
    
    def execute(self, tool: Any, operation_name: str, **params) -> Any:
        """Synchronously execute tool operation"""
        # Check cache
        if self.cache_provider:
            cache_key = self.cache_provider.generate_cache_key(
                operation_name, params.get('user_id', ''), 
                params.get('task_id', ''), (), params
            )
            cached_result = self.cache_provider.get_from_cache(cache_key)
            if cached_result is not None:
                return cached_result
        
        # Execute operation
        method = getattr(tool, operation_name)
        result = method(**params)
        
        # Cache result
        if self.cache_provider:
            self.cache_provider.add_to_cache(cache_key, result)
        
        return result
    
    async def execute_async(self, tool: Any, operation_name: str, **params) -> Any:
        """Asynchronously execute tool operation"""
        # Async implementation logic
        method = getattr(tool, operation_name)
        if asyncio.iscoroutinefunction(method):
            return await method(**params)
        else:
            return method(**params)

2. Cache Provider Implementation

from aiecs.core.interface.execution_interface import ICacheProvider
import hashlib
import json
from typing import Optional, Any

class CacheProviderImpl(ICacheProvider):
    def __init__(self, cache_size: int = 1000):
        self._cache = {}
        self._cache_size = cache_size
    
    def generate_cache_key(self, operation_type: str, user_id: str, task_id: str,
                          args: tuple, kwargs: Dict[str, Any]) -> str:
        """Generate cache key"""
        key_data = {
            'operation_type': operation_type,
            'user_id': user_id,
            'task_id': task_id,
            'args': args,
            'kwargs': kwargs
        }
        key_str = json.dumps(key_data, sort_keys=True)
        return hashlib.md5(key_str.encode()).hexdigest()
    
    def get_from_cache(self, cache_key: str) -> Optional[Any]:
        """Get data from cache"""
        return self._cache.get(cache_key)
    
    def add_to_cache(self, cache_key: str, value: Any) -> None:
        """Add data to cache"""
        if len(self._cache) >= self._cache_size:
            # Simple LRU implementation
            oldest_key = next(iter(self._cache))
            del self._cache[oldest_key]
        
        self._cache[cache_key] = value

3. Operation Executor Implementation

from aiecs.core.interface.execution_interface import IOperationExecutor
from aiecs.domain.execution.model import TaskStepResult, TaskStatus, ErrorCode

class OperationExecutorImpl(IOperationExecutor):
    def __init__(self, tool_executor: IToolExecutor, tool_provider: IToolProvider):
        self.tool_executor = tool_executor
        self.tool_provider = tool_provider
    
    async def execute_operation(self, operation_spec: str, params: Dict[str, Any]) -> Any:
        """Execute single operation"""
        tool_name, operation_name = operation_spec.split('.', 1)
        tool = self.tool_provider.get_tool(tool_name)
        return await self.tool_executor.execute_async(tool, operation_name, **params)
    
    async def batch_execute_operations(self, operations: List[Dict[str, Any]]) -> List[Any]:
        """Batch execute operations"""
        tasks = []
        for op in operations:
            task = self.execute_operation(op['operation'], op.get('params', {}))
            tasks.append(task)
        
        return await asyncio.gather(*tasks, return_exceptions=True)
    
    async def execute_operations_sequence(self, operations: List[Dict[str, Any]],
                                        user_id: str, task_id: str,
                                        stop_on_failure: bool = False,
                                        save_callback: Optional[Callable] = None) -> List[TaskStepResult]:
        """Sequentially execute operation sequence"""
        results = []
        
        for i, op in enumerate(operations):
            try:
                result = await self.execute_operation(op['operation'], op.get('params', {}))
                step_result = TaskStepResult(
                    step=op['operation'],
                    result=result,
                    completed=True,
                    message=f"Operation {op['operation']} completed",
                    status=TaskStatus.COMPLETED.value
                )
            except Exception as e:
                step_result = TaskStepResult(
                    step=op['operation'],
                    result=None,
                    completed=False,
                    message=f"Operation {op['operation']} failed: {str(e)}",
                    status=TaskStatus.FAILED.value,
                    error_code=ErrorCode.EXECUTION_ERROR.value,
                    error_message=str(e)
                )
                
                if stop_on_failure:
                    results.append(step_result)
                    break
            
            if save_callback:
                await save_callback(user_id, task_id, i, step_result)
            
            results.append(step_result)
        
        return results
    
    async def execute_parallel_operations(self, operations: List[Dict[str, Any]]) -> List[TaskStepResult]:
        """Execute operations in parallel"""
        tasks = []
        for i, op in enumerate(operations):
            task = self._execute_single_operation(op, i)
            tasks.append(task)
        
        return await asyncio.gather(*tasks, return_exceptions=True)
    
    async def _execute_single_operation(self, op: Dict[str, Any], index: int) -> TaskStepResult:
        """Execute single operation and return result"""
        try:
            result = await self.execute_operation(op['operation'], op.get('params', {}))
            return TaskStepResult(
                step=op['operation'],
                result=result,
                completed=True,
                message=f"Operation {op['operation']} completed",
                status=TaskStatus.COMPLETED.value
            )
        except Exception as e:
            return TaskStepResult(
                step=op['operation'],
                result=None,
                completed=False,
                message=f"Operation {op['operation']} failed: {str(e)}",
                status=TaskStatus.FAILED.value,
                error_code=ErrorCode.EXECUTION_ERROR.value,
                error_message=str(e)
            )

Maintenance Guide

1. Daily Maintenance

Interface Compatibility Check

def check_interface_compatibility(implementation_class, interface_class):
    """Check if implementation class fully implements interface"""
    interface_methods = {
        name for name, method in interface_class.__dict__.items()
        if getattr(method, '__isabstractmethod__', False)
    }
    
    implementation_methods = {
        name for name, method in implementation_class.__dict__.items()
        if callable(method) and not name.startswith('_')
    }
    
    missing_methods = interface_methods - implementation_methods
    if missing_methods:
        raise TypeError(f"Missing methods: {missing_methods}")
    
    return True

Interface Version Management

from typing import Protocol, runtime_checkable

@runtime_checkable
class VersionedInterface(Protocol):
    """Versioned interface protocol"""
    version: str
    
    def get_version(self) -> str:
        """Get interface version"""
        return self.version

def check_interface_version(implementation, required_version: str):
    """Check interface version compatibility"""
    if hasattr(implementation, 'get_version'):
        version = implementation.get_version()
        if version < required_version:
            raise ValueError(f"Interface version {version} is lower than required {required_version}")

2. Troubleshooting

Common Issue Diagnosis

Issue 1: Incomplete Interface Implementation

# Error message
TypeError: Can't instantiate abstract class ToolExecutorImpl with abstract methods execute_async

# Diagnosis steps
def diagnose_incomplete_implementation(cls):
    """Diagnose incomplete interface implementation issue"""
    abstract_methods = []
    for name, method in cls.__dict__.items():
        if getattr(method, '__isabstractmethod__', False):
            abstract_methods.append(name)
    
    if abstract_methods:
        print(f"Unimplemented abstract methods: {abstract_methods}")
        return False
    return True

Issue 2: Type Mismatch

# Error message
TypeError: execute_operation() missing 1 required positional argument: 'params'

# Diagnosis steps
def diagnose_type_mismatch(implementation, interface):
    """Diagnose type mismatch issue"""
    import inspect
    
    for method_name in dir(interface):
        if method_name.startswith('_'):
            continue
            
        interface_method = getattr(interface, method_name)
        if not callable(interface_method):
            continue
            
        if hasattr(implementation, method_name):
            impl_method = getattr(implementation, method_name)
            interface_sig = inspect.signature(interface_method)
            impl_sig = inspect.signature(impl_method)
            
            if interface_sig != impl_sig:
                print(f"Method {method_name} signature mismatch:")
                print(f"  Interface: {interface_sig}")
                print(f"  Implementation: {impl_sig}")

3. Interface Updates

Adding New Methods

# 1. Add new method to interface
class IToolExecutor(ABC):
    # Existing methods...
    
    @abstractmethod
    async def execute_with_retry(self, tool: Any, operation_name: str, 
                                max_retries: int = 3, **params) -> Any:
        """Execute method with retry"""
        pass

# 2. Update implementation class
class ToolExecutorImpl(IToolExecutor):
    # Existing methods...
    
    async def execute_with_retry(self, tool: Any, operation_name: str, 
                                max_retries: int = 3, **params) -> Any:
        """Execute method with retry implementation"""
        for attempt in range(max_retries):
            try:
                return await self.execute_async(tool, operation_name, **params)
            except Exception as e:
                if attempt == max_retries - 1:
                    raise
                await asyncio.sleep(2 ** attempt)  # Exponential backoff

Interface Version Control

from abc import ABC, abstractmethod
from typing import Union

class IToolExecutorV1(ABC):
    """Tool executor interface V1"""
    
    @abstractmethod
    def execute(self, tool: Any, operation_name: str, **params) -> Any:
        pass

class IToolExecutorV2(IToolExecutorV1):
    """Tool executor interface V2 - Inherits V1 and adds new features"""
    
    @abstractmethod
    async def execute_with_metrics(self, tool: Any, operation_name: str, **params) -> Any:
        """Execute method with metrics"""
        pass

# Support multi-version interfaces
ToolExecutorInterface = Union[IToolExecutorV1, IToolExecutorV2]

4. Interface Extension

Support Generic Interfaces

from typing import TypeVar, Generic, Protocol

T = TypeVar('T')
R = TypeVar('R')

class ITypedToolExecutor(Protocol[T, R]):
    """Typed tool executor interface"""
    
    def execute(self, tool: T, operation_name: str, **params) -> R:
        """Type-safe execution method"""
        ...

class ITypedOperationExecutor(Protocol[T]):
    """Typed operation executor interface"""
    
    async def execute_operation(self, operation_spec: str, params: Dict[str, Any]) -> T:
        """Type-safe operation execution"""
        ...

Support Async Context Management

from typing import AsyncContextManager

class IAsyncContextExecutor(ABC):
    """Executor interface supporting async context"""
    
    @abstractmethod
    async def __aenter__(self):
        """Async context entry"""
        pass
    
    @abstractmethod
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context exit"""
        pass
    
    @abstractmethod
    async def execute_in_context(self, operation: str, **params) -> Any:
        """Execute operation in context"""
        pass

# Usage example
async def use_async_context_executor():
    async with IAsyncContextExecutor() as executor:
        result = await executor.execute_in_context("some_operation", param1="value1")

Performance Optimization

1. Interface Method Caching

from functools import lru_cache

class CachedToolExecutor(IToolExecutor):
    @lru_cache(maxsize=128)
    def _get_tool_method(self, tool_class: type, operation_name: str):
        """Cache tool method retrieval"""
        return getattr(tool_class, operation_name)
    
    def execute(self, tool: Any, operation_name: str, **params) -> Any:
        method = self._get_tool_method(type(tool), operation_name)
        return method(**params)

2. Async Interface Optimization

import asyncio
from concurrent.futures import ThreadPoolExecutor

class OptimizedToolExecutor(IToolExecutor):
    def __init__(self, max_workers: int = 4):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
    
    async def execute_async(self, tool: Any, operation_name: str, **params) -> Any:
        """Optimized async execution"""
        method = getattr(tool, operation_name)
        
        if asyncio.iscoroutinefunction(method):
            return await method(**params)
        else:
            loop = asyncio.get_event_loop()
            return await loop.run_in_executor(
                self.executor, 
                lambda: method(**params)
            )

3. Interface Performance Monitoring

import time
from typing import Dict, List
from collections import defaultdict

class MonitoredToolExecutor(IToolExecutor):
    def __init__(self):
        self.metrics = defaultdict(list)
    
    def execute(self, tool: Any, operation_name: str, **params) -> Any:
        start_time = time.time()
        try:
            result = self._do_execute(tool, operation_name, **params)
            execution_time = time.time() - start_time
            self.metrics[operation_name].append(execution_time)
            return result
        except Exception as e:
            execution_time = time.time() - start_time
            self.metrics[f"{operation_name}_error"].append(execution_time)
            raise
    
    def get_performance_metrics(self) -> Dict[str, Dict[str, float]]:
        """Get performance metrics"""
        metrics = {}
        for operation, times in self.metrics.items():
            if times:
                metrics[operation] = {
                    "avg_time": sum(times) / len(times),
                    "min_time": min(times),
                    "max_time": max(times),
                    "count": len(times)
                }
        return metrics

Monitoring and Logging

Interface Usage Monitoring

import logging
from typing import Dict, Any

class InterfaceMonitor:
    """Interface usage monitor"""
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.usage_stats = defaultdict(int)
        self.error_stats = defaultdict(int)
    
    def record_interface_usage(self, interface_name: str, method_name: str, success: bool):
        """Record interface usage"""
        key = f"{interface_name}.{method_name}"
        self.usage_stats[key] += 1
        
        if not success:
            self.error_stats[key] += 1
        
        self.logger.info(f"Interface usage: {key}, success: {success}")
    
    def get_usage_report(self) -> Dict[str, Any]:
        """Get usage report"""
        return {
            "total_usage": sum(self.usage_stats.values()),
            "total_errors": sum(self.error_stats.values()),
            "error_rate": sum(self.error_stats.values()) / sum(self.usage_stats.values()) if self.usage_stats else 0,
            "usage_by_interface": dict(self.usage_stats),
            "errors_by_interface": dict(self.error_stats)
        }

Version History

  • v1.0.0: Initial version, basic interface definitions

  • v1.1.0: Added cache provider interface

  • v1.2.0: Added operation executor interface

  • v1.3.0: Added unified execution interface

  • v1.4.0: Support plugin execution engines

  • v1.5.0: Added type safety and performance optimization