Execution Models Technical Documentation

Overview

Design Motivation and Problem Background

When building complex AI application systems, task execution and state management face the following core challenges:

1. Task State Management Complexity

  • Need to support multiple task states (pending, running, completed, cancelled, timed out, failed)

  • State transitions need to follow specific business rules

  • Lack of unified state definition and validation mechanisms

2. Error Handling Standardization

  • Different types of errors require different handling strategies

  • Error information needs to be structured and traceable

  • Lack of unified error code system and classification mechanism

3. Execution Result Encapsulation

  • Task execution results need to contain complete state information

  • Need to support both success and failure result types

  • Lack of standardized result data model

4. System Integration Requirements

  • Execution models need to integrate with multiple system components

  • Need to support serialization and deserialization

  • Lack of unified data contract definitions

Execution Model System’s Solution:

  • Enum Type Definitions: Type-safe state and error code definitions based on Python Enum

  • Result Model Encapsulation: Structured task step result model

  • Unified Error Handling: Standardized error code system and error information

  • Data Contract Support: Data models supporting serialization and deserialization

  • Type Safety: Type safety guarantees based on Python type system

Component Positioning

execution/model.py is a domain model component of the AIECS system, located in the Domain Layer, defining core data models related to task execution. As the system’s data contract layer, it provides type-safe, structured execution state, error codes, and result models.

Component Type and Positioning

Component Type

Domain Model Component - Located in the Domain Layer, belongs to data contract definitions

Architecture Layers

┌─────────────────────────────────────────┐
│         Application Layer               │  ← Components using execution models
│  (OperationExecutor, TaskManager)       │
└─────────────────┬───────────────────────┘
                  │
┌─────────────────▼───────────────────────┐
│         Domain Layer                    │  ← Execution models layer
│  (ExecutionModels, Data Contracts)      │
└─────────────────┬───────────────────────┘
                  │
┌─────────────────▼───────────────────────┐
│       Infrastructure Layer              │  ← Components execution models depend on
│  (Database, WebSocket, Celery)          │
└─────────────────┬───────────────────────┘
                  │
┌─────────────────▼───────────────────────┐
│         External Systems                │  ← External systems
│  (Redis, PostgreSQL, MessageQueue)      │
└─────────────────────────────────────────┘

Upstream Components (Consumers)

1. Application Layer Services

  • OperationExecutor (application/executors/operation_executor.py)

  • TaskManager (if exists)

  • ExecutionService (if exists)

2. Infrastructure Layer

  • DatabaseManager (infrastructure/persistence/database_manager.py)

  • WebSocketManager (infrastructure/messaging/websocket_manager.py)

  • CeleryTaskManager (infrastructure/messaging/celery_task_manager.py)

3. Interface Layer

  • ExecutionInterface (core/interface/execution_interface.py)

  • API Layer (via data conversion)

  • Message Queue (via message format)

Downstream Components (Dependencies)

1. Python Standard Library

  • enum - Provides enum type support

  • typing - Provides type annotation support

  • dataclasses - Provides dataclass support (if used)

2. Domain Models

  • TaskContext (if exists)

  • Other Domain Models (via result fields)

3. Utility Functions

  • Serialization Tools (via dict() method)

  • Validation Tools (via type checking)

Core Model Details

1. TaskStatus - Task Status Enum

class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    CANCELLED = "cancelled"
    TIMED_OUT = "timed_out"
    FAILED = "failed"

Status Descriptions:

  • PENDING: Waiting for execution - Task created but not yet started

  • RUNNING: Running - Task is currently executing

  • COMPLETED: Completed - Task completed successfully

  • CANCELLED: Cancelled - Task cancelled by user or system

  • TIMED_OUT: Execution timeout - Task execution exceeded time limit

  • FAILED: Execution failed - Error occurred during task execution

State Transition Rules:

PENDING → RUNNING → COMPLETED
       ↘ RUNNING → FAILED
       ↘ RUNNING → TIMED_OUT
       ↘ RUNNING → CANCELLED

Usage Examples:

from aiecs.domain.execution.model import TaskStatus

# Create task status
status = TaskStatus.PENDING
print(f"Task status: {status.value}")  # "pending"

# Status comparison
if status == TaskStatus.PENDING:
    print("Task is waiting to start")

# State transition
status = TaskStatus.RUNNING
print(f"Task is now: {status.value}")  # "running"

# Get all statuses
all_statuses = [status.value for status in TaskStatus]
print(f"All statuses: {all_statuses}")

2. ErrorCode - Error Code Enum

class ErrorCode(Enum):
    VALIDATION_ERROR = "E001"
    TIMEOUT_ERROR = "E002"
    EXECUTION_ERROR = "E003"
    CANCELLED_ERROR = "E004"
    RETRY_EXHAUSTED = "E005"
    DATABASE_ERROR = "E006"
    DSL_EVALUATION_ERROR = "E007"

Error Code Descriptions:

  • E001 - VALIDATION_ERROR: Parameter validation error - Input parameters do not meet requirements

  • E002 - TIMEOUT_ERROR: Execution timeout error - Task execution exceeded time limit

  • E003 - EXECUTION_ERROR: Execution error - Error occurred during task execution

  • E004 - CANCELLED_ERROR: Cancellation error - Task was cancelled

  • E005 - RETRY_EXHAUSTED: Retry exhausted error - Retry attempts exhausted

  • E006 - DATABASE_ERROR: Database error - Database operation failed

  • E007 - DSL_EVALUATION_ERROR: DSL evaluation error - DSL expression evaluation failed

Error Classification:

  • Client Errors (E001): Parameter validation errors

  • Timeout Errors (E002): Execution timeouts

  • Execution Errors (E003): Business logic errors

  • System Errors (E004, E005, E006, E007): System-level errors

Usage Examples:

from aiecs.domain.execution.model import ErrorCode

# Create error code
error_code = ErrorCode.VALIDATION_ERROR
print(f"Error code: {error_code.value}")  # "E001"

# Error code comparison
if error_code == ErrorCode.VALIDATION_ERROR:
    print("This is a validation error")

# Get error code descriptions
error_descriptions = {
    ErrorCode.VALIDATION_ERROR: "Parameter validation error",
    ErrorCode.TIMEOUT_ERROR: "Execution timeout error",
    ErrorCode.EXECUTION_ERROR: "Execution error",
    ErrorCode.CANCELLED_ERROR: "Cancellation error",
    ErrorCode.RETRY_EXHAUSTED: "Retry exhausted error",
    ErrorCode.DATABASE_ERROR: "Database error",
    ErrorCode.DSL_EVALUATION_ERROR: "DSL evaluation error"
}

print(f"Error description: {error_descriptions[error_code]}")

3. TaskStepResult - Task Step Result Model

class TaskStepResult:
    """Task step result model"""
    def __init__(self, step: str, result: Any, completed: bool = False,
                 message: str = "", status: str = "pending",
                 error_code: Optional[str] = None, error_message: Optional[str] = None):
        self.step = step
        self.result = result
        self.completed = completed
        self.message = message
        self.status = status
        self.error_code = error_code
        self.error_message = error_message

Field Descriptions:

  • step: Operation step identifier (e.g., “pandas_tool.read_csv”)

  • result: Operation execution result

  • completed: Whether completed

  • message: Status message

  • status: Execution status

  • error_code: Error code (optional)

  • error_message: Error message (optional)

Core Methods:

Serialization Method

def dict(self) -> Dict[str, Any]:
    """Convert to dictionary format"""
    return {
        "step": self.step,
        "result": self.result,
        "completed": self.completed,
        "message": self.message,
        "status": self.status,
        "error_code": self.error_code,
        "error_message": self.error_message
    }

String Representation

def __repr__(self) -> str:
    """String representation"""
    return f"TaskStepResult(step='{self.step}', status='{self.status}', completed={self.completed})"

Usage Examples:

from aiecs.domain.execution.model import TaskStepResult, TaskStatus, ErrorCode

# Create success result
success_result = TaskStepResult(
    step="pandas_tool.read_csv",
    result={"rows": 1000, "columns": 5},
    completed=True,
    message="Successfully read CSV file",
    status=TaskStatus.COMPLETED.value
)

print(f"Success result: {success_result}")
print(f"Result data: {success_result.dict()}")

# Create failure result
failure_result = TaskStepResult(
    step="pandas_tool.read_csv",
    result=None,
    completed=False,
    message="Failed to read CSV file",
    status=TaskStatus.FAILED.value,
    error_code=ErrorCode.EXECUTION_ERROR.value,
    error_message="File not found: data.csv"
)

print(f"Failure result: {failure_result}")
print(f"Error code: {failure_result.error_code}")
print(f"Error message: {failure_result.error_message}")

Design Patterns Explained

1. Enum Pattern

class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    # ...

Advantages:

  • Type Safety: Compile-time type checking

  • Value Constraints: Limits possible values

  • Readability: Code is easier to read and maintain

  • Extensibility: Easy to add new states

2. Value Object Pattern

class TaskStepResult:
    """Immutable value object"""
    def __init__(self, step: str, result: Any, ...):
        self.step = step
        self.result = result
        # ...

Advantages:

  • Immutability: Object cannot be modified after creation

  • Equality: Value-based rather than reference-based equality

  • Encapsulation: Encapsulates related data and behavior

  • Testability: Easy to unit test

3. Factory Pattern

# Create instance via constructor
result = TaskStepResult(
    step="operation_name",
    result=operation_result,
    completed=True,
    status=TaskStatus.COMPLETED.value
)

Advantages:

  • Unified Creation: Unified object creation interface

  • Parameter Validation: Parameter validation at creation time

  • Type Safety: Ensures created objects are of correct type

Usage Examples

1. Basic State Management

from aiecs.domain.execution.model import TaskStatus, ErrorCode, TaskStepResult

# Task state management
def manage_task_lifecycle():
    """Manage task lifecycle"""
    # Initial state
    current_status = TaskStatus.PENDING
    print(f"Task started with status: {current_status.value}")
    
    # State transition
    current_status = TaskStatus.RUNNING
    print(f"Task is now: {current_status.value}")
    
    # Check state
    if current_status == TaskStatus.RUNNING:
        print("Task is currently running")
    
    # Completion state
    current_status = TaskStatus.COMPLETED
    print(f"Task finished with status: {current_status.value}")

# Error handling
def handle_task_errors():
    """Handle task errors"""
    error_codes = [
        ErrorCode.VALIDATION_ERROR,
        ErrorCode.TIMEOUT_ERROR,
        ErrorCode.EXECUTION_ERROR
    ]
    
    for error_code in error_codes:
        print(f"Error code: {error_code.value}")
        
        if error_code == ErrorCode.VALIDATION_ERROR:
            print("This is a validation error - check input parameters")
        elif error_code == ErrorCode.TIMEOUT_ERROR:
            print("This is a timeout error - task took too long")
        elif error_code == ErrorCode.EXECUTION_ERROR:
            print("This is an execution error - check business logic")

2. Result Model Usage

# Create success result
def create_success_result():
    """Create success result"""
    result = TaskStepResult(
        step="data_processing",
        result={"processed_rows": 1000, "output_file": "result.csv"},
        completed=True,
        message="Data processing completed successfully",
        status=TaskStatus.COMPLETED.value
    )
    
    print(f"Success result: {result}")
    print(f"Result data: {result.dict()}")
    return result

# Create failure result
def create_failure_result():
    """Create failure result"""
    result = TaskStepResult(
        step="data_processing",
        result=None,
        completed=False,
        message="Data processing failed",
        status=TaskStatus.FAILED.value,
        error_code=ErrorCode.EXECUTION_ERROR.value,
        error_message="Invalid data format detected"
    )
    
    print(f"Failure result: {result}")
    print(f"Error details: {result.error_code} - {result.error_message}")
    return result

# Result processing
def process_results(results: List[TaskStepResult]):
    """Process result list"""
    success_count = 0
    failure_count = 0
    
    for result in results:
        if result.completed:
            success_count += 1
            print(f"✅ {result.step}: {result.message}")
        else:
            failure_count += 1
            print(f"❌ {result.step}: {result.message}")
            if result.error_code:
                print(f"   Error: {result.error_code} - {result.error_message}")
    
    print(f"Summary: {success_count} successful, {failure_count} failed")

3. System Integration

# Database integration
def save_task_result_to_database(result: TaskStepResult, user_id: str, task_id: str):
    """Save task result to database"""
    from aiecs.infrastructure.persistence.database_manager import DatabaseManager
    
    db_manager = DatabaseManager()
    
    # Save to database
    db_manager.save_task_history(user_id, task_id, 1, result)
    
    print(f"Saved result for task {task_id}: {result.step}")

# WebSocket integration
def send_result_via_websocket(result: TaskStepResult, user_id: str, task_id: str):
    """Send result via WebSocket"""
    from aiecs.infrastructure.messaging.websocket_manager import WebSocketManager
    
    ws_manager = WebSocketManager()
    
    # Send result
    ws_manager.notify_user(result, user_id, task_id, 1)
    
    print(f"Sent result via WebSocket: {result.step}")

# Celery integration
def create_celery_task_result(status: TaskStatus, error_code: ErrorCode = None):
    """Create Celery task result"""
    result = {
        "status": status.value,
        "completed": status == TaskStatus.COMPLETED,
        "message": f"Task {status.value}"
    }
    
    if error_code:
        result["error_code"] = error_code.value
        result["error_message"] = f"Task failed with {error_code.value}"
    
    return result

4. Advanced Usage

# Result validation
def validate_result(result: TaskStepResult) -> bool:
    """Validate result validity"""
    if not result.step:
        print("❌ Result missing step identifier")
        return False
    
    if result.completed and result.result is None:
        print("❌ Completed result missing result data")
        return False
    
    if not result.completed and result.error_code is None:
        print("❌ Failed result missing error code")
        return False
    
    print("✅ Result validation passed")
    return True

# Result conversion
def convert_result_to_dict(result: TaskStepResult) -> Dict[str, Any]:
    """Convert result to dictionary format"""
    return result.dict()

def convert_dict_to_result(data: Dict[str, Any]) -> TaskStepResult:
    """Create result object from dictionary"""
    return TaskStepResult(
        step=data["step"],
        result=data["result"],
        completed=data["completed"],
        message=data["message"],
        status=data["status"],
        error_code=data.get("error_code"),
        error_message=data.get("error_message")
    )

# Result comparison
def compare_results(result1: TaskStepResult, result2: TaskStepResult) -> bool:
    """Compare if two results are equal"""
    return (result1.step == result2.step and
            result1.completed == result2.completed and
            result1.status == result2.status)

Maintenance Guide

1. Daily Maintenance

Model Validation

def validate_models_health():
    """Validate model health status"""
    try:
        # Test status enum
        status = TaskStatus.PENDING
        assert status.value == "pending"
        print("✅ TaskStatus validation passed")
        
        # Test error code enum
        error_code = ErrorCode.VALIDATION_ERROR
        assert error_code.value == "E001"
        print("✅ ErrorCode validation passed")
        
        # Test result model
        result = TaskStepResult(
            step="test_step",
            result="test_result",
            completed=True,
            status=TaskStatus.COMPLETED.value
        )
        assert result.step == "test_step"
        assert result.completed == True
        print("✅ TaskStepResult validation passed")
        
        return True
        
    except Exception as e:
        print(f"❌ Model validation failed: {e}")
        return False

Data Consistency Check

def check_data_consistency(result: TaskStepResult):
    """Check result data consistency"""
    try:
        # Check basic fields
        if not result.step:
            print("❌ Missing step identifier")
            return False
        
        # Check status consistency
        if result.completed and result.status != TaskStatus.COMPLETED.value:
            print("❌ Completed result has wrong status")
            return False
        
        if not result.completed and result.status == TaskStatus.COMPLETED.value:
            print("❌ Incomplete result has completed status")
            return False
        
        # Check error information consistency
        if result.error_code and not result.error_message:
            print("❌ Error code without error message")
            return False
        
        if result.error_message and not result.error_code:
            print("❌ Error message without error code")
            return False
        
        print("✅ Data consistency check passed")
        return True
        
    except Exception as e:
        print(f"❌ Data consistency check failed: {e}")
        return False

2. Troubleshooting

Common Issue Diagnosis

Issue 1: State Transition Error

def diagnose_status_transition_error():
    """Diagnose state transition errors"""
    try:
        # Test invalid state transition
        current_status = TaskStatus.PENDING
        next_status = TaskStatus.COMPLETED  # Skipping RUNNING
        
        if current_status == TaskStatus.PENDING and next_status == TaskStatus.COMPLETED:
            print("❌ Invalid status transition: PENDING → COMPLETED")
            print("   Valid transitions from PENDING: RUNNING")
        
        # Test valid state transition
        current_status = TaskStatus.PENDING
        next_status = TaskStatus.RUNNING
        
        if current_status == TaskStatus.PENDING and next_status == TaskStatus.RUNNING:
            print("✅ Valid status transition: PENDING → RUNNING")
        
    except Exception as e:
        print(f"❌ Status transition diagnosis failed: {e}")

Issue 2: Error Code Mapping Error

def diagnose_error_code_mapping_error():
    """Diagnose error code mapping errors"""
    try:
        # Test error code mapping
        error_mappings = {
            "validation_error": ErrorCode.VALIDATION_ERROR,
            "timeout_error": ErrorCode.TIMEOUT_ERROR,
            "execution_error": ErrorCode.EXECUTION_ERROR,
            "cancelled_error": ErrorCode.CANCELLED_ERROR,
            "retry_exhausted": ErrorCode.RETRY_EXHAUSTED,
            "database_error": ErrorCode.DATABASE_ERROR,
            "dsl_evaluation_error": ErrorCode.DSL_EVALUATION_ERROR
        }
        
        for error_name, error_code in error_mappings.items():
            if error_code.value != f"E{error_code.value[1:].zfill(3)}":
                print(f"❌ Invalid error code format: {error_code.value}")
            else:
                print(f"✅ Valid error code: {error_code.value}")
        
    except Exception as e:
        print(f"❌ Error code mapping diagnosis failed: {e}")

3. Performance Optimization

Object Creation Optimization

def optimize_object_creation():
    """Optimize object creation performance"""
    import time
    
    # Test enum creation performance
    start_time = time.time()
    for i in range(10000):
        status = TaskStatus.PENDING
        error_code = ErrorCode.VALIDATION_ERROR
    enum_time = time.time() - start_time
    
    # Test result object creation performance
    start_time = time.time()
    for i in range(10000):
        result = TaskStepResult(
            step=f"step_{i}",
            result=f"result_{i}",
            completed=True,
            status=TaskStatus.COMPLETED.value
        )
    result_time = time.time() - start_time
    
    print(f"Enum creation time: {enum_time:.4f}s")
    print(f"Result creation time: {result_time:.4f}s")

Memory Usage Optimization

def optimize_memory_usage():
    """Optimize memory usage"""
    import gc
    import sys
    
    # Create many objects
    results = []
    for i in range(10000):
        result = TaskStepResult(
            step=f"step_{i}",
            result=f"result_{i}",
            completed=True,
            status=TaskStatus.COMPLETED.value
        )
        results.append(result)
    
    print(f"Memory usage before cleanup: {sys.getsizeof(results)} bytes")
    
    # Clean up objects
    results.clear()
    gc.collect()
    
    print(f"Memory usage after cleanup: {sys.getsizeof(results)} bytes")

4. Data Migration

Model Version Upgrade

def migrate_models_to_new_version(old_data: Dict[str, Any]) -> Dict[str, Any]:
    """Migrate model data to new version"""
    # Check version
    version = old_data.get("version", "1.0")
    
    if version == "1.0":
        # Upgrade from 1.0 to 1.1
        if "status" in old_data:
            # Update status values
            if old_data["status"] == "success":
                old_data["status"] = "completed"
            elif old_data["status"] == "error":
                old_data["status"] = "failed"
        
        old_data["version"] = "1.1"
    
    return old_data

Data Format Conversion

def convert_data_formats():
    """Convert data formats"""
    # Convert from old format to new format
    old_result = {
        "operation": "data_processing",
        "output": {"rows": 1000},
        "success": True,
        "message": "Processing completed"
    }
    
    # Convert to new format
    new_result = TaskStepResult(
        step=old_result["operation"],
        result=old_result["output"],
        completed=old_result["success"],
        message=old_result["message"],
        status=TaskStatus.COMPLETED.value if old_result["success"] else TaskStatus.FAILED.value
    )
    
    print(f"Converted result: {new_result}")

Monitoring and Logging

Model Usage Monitoring

import time
from typing import Dict, Any

class ExecutionModelsMonitor:
    """Execution Models Monitor"""
    
    def __init__(self):
        self.creation_metrics = {
            "status_objects": 0,
            "error_code_objects": 0,
            "result_objects": 0
        }
        self.performance_metrics = {
            "status_creation_time": [],
            "error_code_creation_time": [],
            "result_creation_time": []
        }
    
    def record_status_creation(self, creation_time: float):
        """Record status object creation metrics"""
        self.creation_metrics["status_objects"] += 1
        self.performance_metrics["status_creation_time"].append(creation_time)
    
    def record_error_code_creation(self, creation_time: float):
        """Record error code object creation metrics"""
        self.creation_metrics["error_code_objects"] += 1
        self.performance_metrics["error_code_creation_time"].append(creation_time)
    
    def record_result_creation(self, creation_time: float):
        """Record result object creation metrics"""
        self.creation_metrics["result_objects"] += 1
        self.performance_metrics["result_creation_time"].append(creation_time)
    
    def get_performance_report(self) -> Dict[str, Any]:
        """Get performance report"""
        report = {}
        
        for metric_name, times in self.performance_metrics.items():
            if times:
                report[metric_name] = {
                    "count": len(times),
                    "avg_time": sum(times) / len(times),
                    "min_time": min(times),
                    "max_time": max(times)
                }
        
        return report

Logging

import logging
from typing import Dict, Any

class ExecutionModelsLogger:
    """Execution Models Logger"""
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
    
    def log_status_creation(self, status: TaskStatus):
        """Log status creation"""
        self.logger.info(f"TaskStatus created: {status.value}")
    
    def log_error_code_creation(self, error_code: ErrorCode):
        """Log error code creation"""
        self.logger.info(f"ErrorCode created: {error_code.value}")
    
    def log_result_creation(self, result: TaskStepResult):
        """Log result creation"""
        self.logger.info(f"TaskStepResult created: {result.step} - {result.status}")
    
    def log_validation_error(self, error: Exception, context: str):
        """Log validation error"""
        self.logger.error(f"Validation error in {context}: {error}")

Version History

  • v1.0.0: Initial version, basic state and error code definitions

  • v1.1.0: Added task step result model

  • v1.2.0: Added serialization support

  • v1.3.0: Added performance monitoring and logging

  • v1.4.0: Added data migration support