# /*---------------------------------------------------------------------------------------------
# * Copyright (c) IRETBL Corporation. All rights reserved.
# * Licensed under the Apache-2.0. See License.txt in the project root for license information.
# *--------------------------------------------------------------------------------------------*/
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Callable
from aiecs.domain.execution.model import TaskStepResult
[docs]
class ICacheProvider(ABC):
"""Cache provider interface - Domain layer abstraction"""
[docs]
@abstractmethod
def generate_cache_key(
self,
operation_type: str,
user_id: str,
task_id: str,
args: tuple,
kwargs: Dict[str, Any],
) -> str:
"""Generate cache key"""
[docs]
@abstractmethod
def get_from_cache(self, cache_key: str) -> Optional[Any]:
"""Get data from cache"""
[docs]
@abstractmethod
def add_to_cache(self, cache_key: str, value: Any) -> None:
"""Add data to cache"""
[docs]
class IOperationExecutor(ABC):
"""Operation executor interface - Domain layer abstraction"""
[docs]
@abstractmethod
async def execute_operation(self, operation_spec: str, params: Dict[str, Any]) -> Any:
"""Execute single operation"""
[docs]
@abstractmethod
async def batch_execute_operations(self, operations: List[Dict[str, Any]]) -> List[Any]:
"""Batch execute operations"""
[docs]
@abstractmethod
async def execute_operations_sequence(
self,
operations: List[Dict[str, Any]],
user_id: str,
task_id: str,
stop_on_failure: bool = False,
save_callback: Optional[Callable] = None,
) -> List[TaskStepResult]:
"""Execute operations sequence sequentially"""
[docs]
@abstractmethod
async def execute_parallel_operations(self, operations: List[Dict[str, Any]]) -> List[TaskStepResult]:
"""Execute operations in parallel"""
[docs]
class ExecutionInterface(ABC):
"""
Unified execution interface that defines standard methods for service and tool execution.
Supports plugin-based execution engines, allowing future introduction of new executors without modifying upper-level code.
"""
[docs]
@abstractmethod
async def execute_operation(self, operation_spec: str, params: Dict[str, Any]) -> Any:
"""
Execute a single operation (e.g., tool operation or service subtask).
Args:
operation_spec (str): Operation specification, format as 'tool_name.operation_name' or other identifier
params (Dict[str, Any]): Operation parameters
Returns:
Any: Operation result
"""
[docs]
@abstractmethod
async def execute_task(
self,
task_name: str,
input_data: Dict[str, Any],
context: Dict[str, Any],
) -> Any:
"""
Execute a single task (e.g., service task).
Args:
task_name (str): Task name
input_data (Dict[str, Any]): Input data
context (Dict[str, Any]): Context information
Returns:
Any: Task result
"""
[docs]
@abstractmethod
async def batch_execute_operations(self, operations: List[Dict[str, Any]]) -> List[Any]:
"""
Batch execute multiple operations.
Args:
operations (List[Dict[str, Any]]): List of operations, each containing 'operation' and 'params'
Returns:
List[Any]: List of operation results
"""
[docs]
@abstractmethod
async def batch_execute_tasks(self, tasks: List[Dict[str, Any]]) -> List[Any]:
"""
Batch execute multiple tasks.
Args:
tasks (List[Dict[str, Any]]): List of tasks, each containing 'task_name', 'input_data', 'context'
Returns:
List[Any]: List of task results
"""
[docs]
def register_executor(self, executor_type: str, executor_instance: Any) -> None:
"""
Register new executor type, supporting plugin-based extension.
Args:
executor_type (str): Executor type identifier
executor_instance (Any): Executor instance
"""
raise NotImplementedError("Executor registration is not implemented in this interface")