# Execution Interfaces Technical Documentation ## Overview ### Design Motivation and Problem Background When building large-scale AI application systems, the execution layer faces the following core challenges: **1. Lack of Interface Standardization** - Different execution components (tool executors, operation executors, cache providers) lack unified interface specifications - Dependency relationships between components are unclear, leading to tight coupling and difficulty in testing - Lack of abstraction layer makes component replacement and extension difficult **2. Violation of Dependency Inversion Principle** - High-level modules directly depend on specific implementations of low-level modules - Lack of abstract interfaces makes the system difficult to extend and maintain - Testing is difficult, unable to perform effective unit tests and integration tests **3. Insufficient Plugin Architecture Support** - Adding new execution engines requires modifying existing code - Lack of unified execution interface, unable to support multiple execution strategies - Registration and discovery mechanisms for execution components are incomplete **4. Type Safety and Contract Management** - Lack of clear interface contract definitions - Parameter types and return value types are unclear - Lack of backward compatibility guarantees when interfaces change **Execution Interface System Solution**: - **Interface Segregation Principle**: Separate interfaces with different responsibilities to improve cohesion - **Dependency Inversion**: High-level modules depend on abstract interfaces, low-level modules implement interfaces - **Plugin Support**: Support multiple execution engines through unified interface - **Type Safety**: Interface definitions based on Python type system - **Clear Contracts**: Define clear interface contracts through abstract methods ### Component Positioning `execution_interface.py` is the core interface definition of the AIECS system, located in the Domain Layer, defining all abstract interfaces related to execution. As the contract layer of the system, it provides type-safe, clearly-responsible interface specifications. ## Component Type and Positioning ### Component Type **Domain Interface Component** - Located in the Domain Layer, belongs to system contract definitions ### Architecture Layers ``` ┌─────────────────────────────────────────┐ │ Application Layer │ ← Components using interfaces │ (OperationExecutor, ServiceExecutor) │ └─────────────────┬───────────────────────┘ │ ┌─────────────────▼───────────────────────┐ │ Domain Layer │ ← Execution interfaces layer │ (Execution Interfaces, Contracts) │ └─────────────────┬───────────────────────┘ │ ┌─────────────────▼───────────────────────┐ │ Infrastructure Layer │ ← Components implementing interfaces │ (ToolExecutor, CacheProvider, etc.) │ └─────────────────┬───────────────────────┘ │ ┌─────────────────▼───────────────────────┐ │ External Services │ ← External dependencies │ (Tools, Cache, Database, etc.) │ └─────────────────────────────────────────┘ ``` ## Upstream Components (Consumers) ### 1. Application Layer Executors - **OperationExecutor** (`application/executors/operation_executor.py`) - **ServiceExecutor** (if exists) - **TaskExecutor** (if exists) ### 2. Infrastructure Layer Implementations - **ToolExecutor** (`tools/tool_executor/tool_executor.py`) - **ExecutionUtils** (`utils/execution_utils.py`) - **CacheProvider** (if exists) ### 3. Domain Services - **TaskService** (if exists) - **ExecutionService** (if exists) ## Downstream Components (Dependencies) ### 1. Python ABC System - **Purpose**: Provide abstract base class support - **Functionality**: Interface definition, abstract method declaration - **Dependency Type**: Language feature dependency ### 2. Domain Models - **TaskStepResult** (`domain/execution/model.py`) - **TaskStatus** (`domain/execution/model.py`) - **ErrorCode** (`domain/execution/model.py`) ### 3. Type System - **Purpose**: Provide type checking and type safety - **Functionality**: Parameter type validation, return value type checking - **Dependency Type**: Python type system ## Core Interfaces Explained ### 1. IToolProvider - Tool Provider Interface ```python class IToolProvider(ABC): """Tool provider interface - Domain layer abstraction""" @abstractmethod def get_tool(self, tool_name: str) -> Any: """Get tool instance""" pass @abstractmethod def has_tool(self, tool_name: str) -> bool: """Check if tool exists""" pass ``` **Responsibilities**: - **Tool Discovery**: Get tool instance based on tool name - **Tool Checking**: Verify if tool is available - **Tool Management**: Manage tool lifecycle **Implementation Requirements**: - Must support getting tool instances by name - Must provide tool existence checking - Should support dynamic tool registration and deregistration ### 2. IToolExecutor - Tool Executor Interface ```python class IToolExecutor(ABC): """Tool executor interface - Domain layer abstraction""" @abstractmethod def execute(self, tool: Any, operation_name: str, **params) -> Any: """Synchronously execute tool operation""" pass @abstractmethod async def execute_async(self, tool: Any, operation_name: str, **params) -> Any: """Asynchronously execute tool operation""" pass ``` **Responsibilities**: - **Synchronous Execution**: Support synchronous tool operation execution - **Asynchronous Execution**: Support asynchronous tool operation execution - **Parameter Passing**: Handle parameter passing for tool operations - **Result Return**: Unify return format for tool operations **Implementation Requirements**: - Must support both synchronous and asynchronous execution modes - Must handle parameter validation and type conversion - Should provide error handling and retry mechanisms ### 3. ICacheProvider - Cache Provider Interface ```python class ICacheProvider(ABC): """Cache provider interface - Domain layer abstraction""" @abstractmethod def generate_cache_key(self, operation_type: str, user_id: str, task_id: str, args: tuple, kwargs: Dict[str, Any]) -> str: """Generate cache key""" pass @abstractmethod def get_from_cache(self, cache_key: str) -> Optional[Any]: """Get data from cache""" pass @abstractmethod def add_to_cache(self, cache_key: str, value: Any) -> None: """Add data to cache""" pass ``` **Responsibilities**: - **Cache Key Generation**: Generate unique cache keys based on operation context - **Cache Reading**: Get data from cache - **Cache Writing**: Write data to cache - **Cache Management**: Manage cache expiration and cleanup **Implementation Requirements**: - Must support context-aware cache key generation - Must provide thread-safe cache operations - Should support cache expiration and cleanup strategies ### 4. IOperationExecutor - Operation Executor Interface ```python class IOperationExecutor(ABC): """Operation executor interface - Domain layer abstraction""" @abstractmethod async def execute_operation(self, operation_spec: str, params: Dict[str, Any]) -> Any: """Execute single operation""" pass @abstractmethod async def batch_execute_operations(self, operations: List[Dict[str, Any]]) -> List[Any]: """Batch execute operations""" pass @abstractmethod async def execute_operations_sequence(self, operations: List[Dict[str, Any]], user_id: str, task_id: str, stop_on_failure: bool = False, save_callback: Optional[Callable] = None) -> List[TaskStepResult]: """Sequentially execute operation sequence""" pass @abstractmethod async def execute_parallel_operations(self, operations: List[Dict[str, Any]]) -> List[TaskStepResult]: """Execute operations in parallel""" pass ``` **Responsibilities**: - **Single Operation Execution**: Execute single tool operation - **Batch Execution**: Batch execute multiple operations - **Sequential Execution**: Execute operation sequence in order - **Parallel Execution**: Execute multiple operations in parallel **Implementation Requirements**: - Must support multiple execution modes - Must handle dependencies between operations - Should provide error handling and recovery mechanisms ### 5. ExecutionInterface - Unified Execution Interface ```python class ExecutionInterface(ABC): """Unified execution interface - Support plugin execution engines""" @abstractmethod async def execute_operation(self, operation_spec: str, params: Dict[str, Any]) -> Any: """Execute single operation""" pass @abstractmethod async def execute_task(self, task_name: str, input_data: Dict[str, Any], context: Dict[str, Any]) -> Any: """Execute single task""" pass @abstractmethod async def batch_execute_operations(self, operations: List[Dict[str, Any]]) -> List[Any]: """Batch execute operations""" pass @abstractmethod async def batch_execute_tasks(self, tasks: List[Dict[str, Any]]) -> List[Any]: """Batch execute tasks""" pass def register_executor(self, executor_type: str, executor_instance: Any) -> None: """Register new executor type""" raise NotImplementedError("Executor registration is not implemented in this interface") ``` **Responsibilities**: - **Unified Interface**: Provide unified execution interface - **Plugin Support**: Support multiple execution engines - **Task Execution**: Support both task and operation execution modes - **Batch Processing**: Support batch execution of operations and tasks ## Design Patterns Explained ### 1. Interface Segregation Principle ```python # Separate interfaces with different responsibilities class IToolProvider(ABC): # Tool provision class IToolExecutor(ABC): # Tool execution class ICacheProvider(ABC): # Cache management class IOperationExecutor(ABC): # Operation execution ``` **Advantages**: - **Single Responsibility**: Each interface is responsible for only one specific function - **Easy to Implement**: Implementation classes only need to implement relevant interfaces - **Easy to Test**: Can test each interface independently ### 2. Dependency Inversion Principle ```python # High-level modules depend on abstract interfaces class OperationExecutor: def __init__(self, tool_executor: IToolExecutor, cache_provider: ICacheProvider): self.tool_executor = tool_executor self.cache_provider = cache_provider ``` **Advantages**: - **Loose Coupling**: High-level modules don't depend on specific implementations - **Extensible**: Can easily replace implementations - **Testable**: Can use mock objects for testing ### 3. Strategy Pattern ```python # Support multiple execution strategies class ExecutionInterface(ABC): def register_executor(self, executor_type: str, executor_instance: Any): # Support registering different executors pass ``` **Advantages**: - **Algorithm Encapsulation**: Encapsulate execution algorithms in specific implementations - **Dynamic Switching**: Can switch execution strategies at runtime - **Easy to Extend**: Adding new execution strategies doesn't require modifying existing code ## Interface Implementation Standards ### 1. Interface Implementation Requirements #### Required Methods ```python class ToolExecutorImpl(IToolExecutor): def execute(self, tool: Any, operation_name: str, **params) -> Any: """Must implement synchronous execution method""" # Implementation logic pass async def execute_async(self, tool: Any, operation_name: str, **params) -> Any: """Must implement asynchronous execution method""" # Implementation logic pass ``` #### Type Safety Requirements ```python from typing import TypeVar, Generic T = TypeVar('T') class TypedToolExecutor(IToolExecutor, Generic[T]): def execute(self, tool: Any, operation_name: str, **params) -> T: """Type-safe execution method""" # Implementation logic pass ``` ### 2. Error Handling Standards #### Exception Type Definitions ```python class ExecutionError(Exception): """Execution error base class""" pass class ToolNotFoundError(ExecutionError): """Tool not found error""" pass class OperationFailedError(ExecutionError): """Operation execution failed error""" pass ``` #### Error Handling Implementation ```python class RobustToolExecutor(IToolExecutor): def execute(self, tool: Any, operation_name: str, **params) -> Any: try: # Execution logic return result except ToolNotFoundError: # Handle tool not found raise except Exception as e: # Handle other errors raise OperationFailedError(f"Operation failed: {e}") from e ``` ### 3. Logging and Monitoring Standards #### Logging ```python import logging class LoggingToolExecutor(IToolExecutor): def __init__(self): self.logger = logging.getLogger(__name__) def execute(self, tool: Any, operation_name: str, **params) -> Any: self.logger.info(f"Executing operation: {operation_name}") try: result = self._do_execute(tool, operation_name, **params) self.logger.info(f"Operation completed: {operation_name}") return result except Exception as e: self.logger.error(f"Operation failed: {operation_name}, error: {e}") raise ``` #### Performance Monitoring ```python import time from functools import wraps def monitor_execution(func): """Execution monitoring decorator""" @wraps(func) async def wrapper(self, *args, **kwargs): start_time = time.time() try: result = await func(self, *args, **kwargs) execution_time = time.time() - start_time self.logger.info(f"Execution completed in {execution_time:.3f}s") return result except Exception as e: execution_time = time.time() - start_time self.logger.error(f"Execution failed after {execution_time:.3f}s: {e}") raise return wrapper ``` ## Usage Examples ### 1. Basic Interface Implementation #### Tool Provider Implementation ```python from aiecs.core.interface.execution_interface import IToolProvider class ToolProviderImpl(IToolProvider): def __init__(self): self._tools = {} def register_tool(self, name: str, tool: Any): """Register tool""" self._tools[name] = tool def get_tool(self, tool_name: str) -> Any: """Get tool instance""" if tool_name not in self._tools: raise ValueError(f"Tool '{tool_name}' not found") return self._tools[tool_name] def has_tool(self, tool_name: str) -> bool: """Check if tool exists""" return tool_name in self._tools ``` #### Tool Executor Implementation ```python from aiecs.core.interface.execution_interface import IToolExecutor class ToolExecutorImpl(IToolExecutor): def __init__(self, cache_provider: ICacheProvider = None): self.cache_provider = cache_provider def execute(self, tool: Any, operation_name: str, **params) -> Any: """Synchronously execute tool operation""" # Check cache if self.cache_provider: cache_key = self.cache_provider.generate_cache_key( operation_name, params.get('user_id', ''), params.get('task_id', ''), (), params ) cached_result = self.cache_provider.get_from_cache(cache_key) if cached_result is not None: return cached_result # Execute operation method = getattr(tool, operation_name) result = method(**params) # Cache result if self.cache_provider: self.cache_provider.add_to_cache(cache_key, result) return result async def execute_async(self, tool: Any, operation_name: str, **params) -> Any: """Asynchronously execute tool operation""" # Async implementation logic method = getattr(tool, operation_name) if asyncio.iscoroutinefunction(method): return await method(**params) else: return method(**params) ``` ### 2. Cache Provider Implementation ```python from aiecs.core.interface.execution_interface import ICacheProvider import hashlib import json from typing import Optional, Any class CacheProviderImpl(ICacheProvider): def __init__(self, cache_size: int = 1000): self._cache = {} self._cache_size = cache_size def generate_cache_key(self, operation_type: str, user_id: str, task_id: str, args: tuple, kwargs: Dict[str, Any]) -> str: """Generate cache key""" key_data = { 'operation_type': operation_type, 'user_id': user_id, 'task_id': task_id, 'args': args, 'kwargs': kwargs } key_str = json.dumps(key_data, sort_keys=True) return hashlib.md5(key_str.encode()).hexdigest() def get_from_cache(self, cache_key: str) -> Optional[Any]: """Get data from cache""" return self._cache.get(cache_key) def add_to_cache(self, cache_key: str, value: Any) -> None: """Add data to cache""" if len(self._cache) >= self._cache_size: # Simple LRU implementation oldest_key = next(iter(self._cache)) del self._cache[oldest_key] self._cache[cache_key] = value ``` ### 3. Operation Executor Implementation ```python from aiecs.core.interface.execution_interface import IOperationExecutor from aiecs.domain.execution.model import TaskStepResult, TaskStatus, ErrorCode class OperationExecutorImpl(IOperationExecutor): def __init__(self, tool_executor: IToolExecutor, tool_provider: IToolProvider): self.tool_executor = tool_executor self.tool_provider = tool_provider async def execute_operation(self, operation_spec: str, params: Dict[str, Any]) -> Any: """Execute single operation""" tool_name, operation_name = operation_spec.split('.', 1) tool = self.tool_provider.get_tool(tool_name) return await self.tool_executor.execute_async(tool, operation_name, **params) async def batch_execute_operations(self, operations: List[Dict[str, Any]]) -> List[Any]: """Batch execute operations""" tasks = [] for op in operations: task = self.execute_operation(op['operation'], op.get('params', {})) tasks.append(task) return await asyncio.gather(*tasks, return_exceptions=True) async def execute_operations_sequence(self, operations: List[Dict[str, Any]], user_id: str, task_id: str, stop_on_failure: bool = False, save_callback: Optional[Callable] = None) -> List[TaskStepResult]: """Sequentially execute operation sequence""" results = [] for i, op in enumerate(operations): try: result = await self.execute_operation(op['operation'], op.get('params', {})) step_result = TaskStepResult( step=op['operation'], result=result, completed=True, message=f"Operation {op['operation']} completed", status=TaskStatus.COMPLETED.value ) except Exception as e: step_result = TaskStepResult( step=op['operation'], result=None, completed=False, message=f"Operation {op['operation']} failed: {str(e)}", status=TaskStatus.FAILED.value, error_code=ErrorCode.EXECUTION_ERROR.value, error_message=str(e) ) if stop_on_failure: results.append(step_result) break if save_callback: await save_callback(user_id, task_id, i, step_result) results.append(step_result) return results async def execute_parallel_operations(self, operations: List[Dict[str, Any]]) -> List[TaskStepResult]: """Execute operations in parallel""" tasks = [] for i, op in enumerate(operations): task = self._execute_single_operation(op, i) tasks.append(task) return await asyncio.gather(*tasks, return_exceptions=True) async def _execute_single_operation(self, op: Dict[str, Any], index: int) -> TaskStepResult: """Execute single operation and return result""" try: result = await self.execute_operation(op['operation'], op.get('params', {})) return TaskStepResult( step=op['operation'], result=result, completed=True, message=f"Operation {op['operation']} completed", status=TaskStatus.COMPLETED.value ) except Exception as e: return TaskStepResult( step=op['operation'], result=None, completed=False, message=f"Operation {op['operation']} failed: {str(e)}", status=TaskStatus.FAILED.value, error_code=ErrorCode.EXECUTION_ERROR.value, error_message=str(e) ) ``` ## Maintenance Guide ### 1. Daily Maintenance #### Interface Compatibility Check ```python def check_interface_compatibility(implementation_class, interface_class): """Check if implementation class fully implements interface""" interface_methods = { name for name, method in interface_class.__dict__.items() if getattr(method, '__isabstractmethod__', False) } implementation_methods = { name for name, method in implementation_class.__dict__.items() if callable(method) and not name.startswith('_') } missing_methods = interface_methods - implementation_methods if missing_methods: raise TypeError(f"Missing methods: {missing_methods}") return True ``` #### Interface Version Management ```python from typing import Protocol, runtime_checkable @runtime_checkable class VersionedInterface(Protocol): """Versioned interface protocol""" version: str def get_version(self) -> str: """Get interface version""" return self.version def check_interface_version(implementation, required_version: str): """Check interface version compatibility""" if hasattr(implementation, 'get_version'): version = implementation.get_version() if version < required_version: raise ValueError(f"Interface version {version} is lower than required {required_version}") ``` ### 2. Troubleshooting #### Common Issue Diagnosis **Issue 1: Incomplete Interface Implementation** ```python # Error message TypeError: Can't instantiate abstract class ToolExecutorImpl with abstract methods execute_async # Diagnosis steps def diagnose_incomplete_implementation(cls): """Diagnose incomplete interface implementation issue""" abstract_methods = [] for name, method in cls.__dict__.items(): if getattr(method, '__isabstractmethod__', False): abstract_methods.append(name) if abstract_methods: print(f"Unimplemented abstract methods: {abstract_methods}") return False return True ``` **Issue 2: Type Mismatch** ```python # Error message TypeError: execute_operation() missing 1 required positional argument: 'params' # Diagnosis steps def diagnose_type_mismatch(implementation, interface): """Diagnose type mismatch issue""" import inspect for method_name in dir(interface): if method_name.startswith('_'): continue interface_method = getattr(interface, method_name) if not callable(interface_method): continue if hasattr(implementation, method_name): impl_method = getattr(implementation, method_name) interface_sig = inspect.signature(interface_method) impl_sig = inspect.signature(impl_method) if interface_sig != impl_sig: print(f"Method {method_name} signature mismatch:") print(f" Interface: {interface_sig}") print(f" Implementation: {impl_sig}") ``` ### 3. Interface Updates #### Adding New Methods ```python # 1. Add new method to interface class IToolExecutor(ABC): # Existing methods... @abstractmethod async def execute_with_retry(self, tool: Any, operation_name: str, max_retries: int = 3, **params) -> Any: """Execute method with retry""" pass # 2. Update implementation class class ToolExecutorImpl(IToolExecutor): # Existing methods... async def execute_with_retry(self, tool: Any, operation_name: str, max_retries: int = 3, **params) -> Any: """Execute method with retry implementation""" for attempt in range(max_retries): try: return await self.execute_async(tool, operation_name, **params) except Exception as e: if attempt == max_retries - 1: raise await asyncio.sleep(2 ** attempt) # Exponential backoff ``` #### Interface Version Control ```python from abc import ABC, abstractmethod from typing import Union class IToolExecutorV1(ABC): """Tool executor interface V1""" @abstractmethod def execute(self, tool: Any, operation_name: str, **params) -> Any: pass class IToolExecutorV2(IToolExecutorV1): """Tool executor interface V2 - Inherits V1 and adds new features""" @abstractmethod async def execute_with_metrics(self, tool: Any, operation_name: str, **params) -> Any: """Execute method with metrics""" pass # Support multi-version interfaces ToolExecutorInterface = Union[IToolExecutorV1, IToolExecutorV2] ``` ### 4. Interface Extension #### Support Generic Interfaces ```python from typing import TypeVar, Generic, Protocol T = TypeVar('T') R = TypeVar('R') class ITypedToolExecutor(Protocol[T, R]): """Typed tool executor interface""" def execute(self, tool: T, operation_name: str, **params) -> R: """Type-safe execution method""" ... class ITypedOperationExecutor(Protocol[T]): """Typed operation executor interface""" async def execute_operation(self, operation_spec: str, params: Dict[str, Any]) -> T: """Type-safe operation execution""" ... ``` #### Support Async Context Management ```python from typing import AsyncContextManager class IAsyncContextExecutor(ABC): """Executor interface supporting async context""" @abstractmethod async def __aenter__(self): """Async context entry""" pass @abstractmethod async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context exit""" pass @abstractmethod async def execute_in_context(self, operation: str, **params) -> Any: """Execute operation in context""" pass # Usage example async def use_async_context_executor(): async with IAsyncContextExecutor() as executor: result = await executor.execute_in_context("some_operation", param1="value1") ``` ## Performance Optimization ### 1. Interface Method Caching ```python from functools import lru_cache class CachedToolExecutor(IToolExecutor): @lru_cache(maxsize=128) def _get_tool_method(self, tool_class: type, operation_name: str): """Cache tool method retrieval""" return getattr(tool_class, operation_name) def execute(self, tool: Any, operation_name: str, **params) -> Any: method = self._get_tool_method(type(tool), operation_name) return method(**params) ``` ### 2. Async Interface Optimization ```python import asyncio from concurrent.futures import ThreadPoolExecutor class OptimizedToolExecutor(IToolExecutor): def __init__(self, max_workers: int = 4): self.executor = ThreadPoolExecutor(max_workers=max_workers) async def execute_async(self, tool: Any, operation_name: str, **params) -> Any: """Optimized async execution""" method = getattr(tool, operation_name) if asyncio.iscoroutinefunction(method): return await method(**params) else: loop = asyncio.get_event_loop() return await loop.run_in_executor( self.executor, lambda: method(**params) ) ``` ### 3. Interface Performance Monitoring ```python import time from typing import Dict, List from collections import defaultdict class MonitoredToolExecutor(IToolExecutor): def __init__(self): self.metrics = defaultdict(list) def execute(self, tool: Any, operation_name: str, **params) -> Any: start_time = time.time() try: result = self._do_execute(tool, operation_name, **params) execution_time = time.time() - start_time self.metrics[operation_name].append(execution_time) return result except Exception as e: execution_time = time.time() - start_time self.metrics[f"{operation_name}_error"].append(execution_time) raise def get_performance_metrics(self) -> Dict[str, Dict[str, float]]: """Get performance metrics""" metrics = {} for operation, times in self.metrics.items(): if times: metrics[operation] = { "avg_time": sum(times) / len(times), "min_time": min(times), "max_time": max(times), "count": len(times) } return metrics ``` ## Monitoring and Logging ### Interface Usage Monitoring ```python import logging from typing import Dict, Any class InterfaceMonitor: """Interface usage monitor""" def __init__(self): self.logger = logging.getLogger(__name__) self.usage_stats = defaultdict(int) self.error_stats = defaultdict(int) def record_interface_usage(self, interface_name: str, method_name: str, success: bool): """Record interface usage""" key = f"{interface_name}.{method_name}" self.usage_stats[key] += 1 if not success: self.error_stats[key] += 1 self.logger.info(f"Interface usage: {key}, success: {success}") def get_usage_report(self) -> Dict[str, Any]: """Get usage report""" return { "total_usage": sum(self.usage_stats.values()), "total_errors": sum(self.error_stats.values()), "error_rate": sum(self.error_stats.values()) / sum(self.usage_stats.values()) if self.usage_stats else 0, "usage_by_interface": dict(self.usage_stats), "errors_by_interface": dict(self.error_stats) } ``` ## Version History - **v1.0.0**: Initial version, basic interface definitions - **v1.1.0**: Added cache provider interface - **v1.2.0**: Added operation executor interface - **v1.3.0**: Added unified execution interface - **v1.4.0**: Support plugin execution engines - **v1.5.0**: Added type safety and performance optimization ## Related Documentation - [AIECS Project Overview](../PROJECT_SUMMARY.md) - [Operation Executor Documentation](../APPLICATION/OPERATION_EXECUTOR.md) - [Configuration Management Documentation](../CONFIG/CONFIG_MANAGEMENT.md) - [Service Registry Documentation](../CONFIG/SERVICE_REGISTRY.md)