DSL Processor Technical Documentation
Overview
Design Motivation and Problem Background
When building complex AI application systems, task execution and flow control face the following core challenges:
1. Complex Task Flow Management
Need to support complex control flows such as conditional branches, parallel execution, and loops
Traditional programming approaches struggle with dynamic task flows
Lack of declarative task flow definition mechanisms
2. Condition Evaluation Complexity
Need to support multiple condition types (intent, context, input, result)
Condition expressions need to support logical operations (AND, OR)
Lack of unified condition evaluation mechanism
3. Task Execution Flexibility
Need to support different types of task steps (if, parallel, sequence, task, loop)
Task execution needs to support asynchronous and concurrent execution
Lack of standardized task execution interface
4. Error Handling and Debugging
Complex DSL expressions are difficult to debug
Error information needs to precisely locate specific steps
Lack of comprehensive error handling and logging
DSL Processor System Solution:
Declarative DSL: JSON-based declarative task flow definition
Condition Evaluation Engine: Supports multiple condition types and logical operations
Step Type Support: Supports step types such as if, parallel, sequence, task, loop
Asynchronous Execution: Supports asynchronous and concurrent task execution
Error Handling: Comprehensive error handling and debugging support
Component Positioning
dsl_processor.py is a domain service component of the AIECS system, located in the Domain Layer, implementing core business logic for DSL parsing and execution. As the system’s flow control core, it provides declarative task flow definition and execution capabilities.
Component Type and Positioning
Component Type
Domain Service Component - Located in the Domain Layer, belongs to the business logic layer
Architecture Layers
┌─────────────────────────────────────────┐
│ Application Layer │ ← Components using DSL processor
│ (OperationExecutor, TaskManager) │
└─────────────────┬───────────────────────┘
│
┌─────────────────▼───────────────────────┐
│ Domain Layer │ ← DSL processor layer
│ (DSLProcessor, Business Logic) │
└─────────────────┬───────────────────────┘
│
┌─────────────────▼───────────────────────┐
│ Infrastructure Layer │ ← Components DSL processor depends on
│ (Tracing, Logging, AsyncIO) │
└─────────────────┬───────────────────────┘
│
┌─────────────────▼───────────────────────┐
│ External Systems │ ← External systems
│ (Task Execution, Monitoring) │
└─────────────────────────────────────────┘
Upstream Components (Consumers)
1. Application Layer Services
OperationExecutor (
application/executors/operation_executor.py)TaskManager (if exists)
ExecutionService (if exists)
2. Domain Services
TaskContext (
domain/task/task_context.py)Other task-related services (if exists)
3. Infrastructure Layer
Tracing System (via tracer parameter)
Logging System (via logger)
AsyncIO System (via asyncio)
Downstream Components (Dependencies)
1. Execution Models
TaskStepResult (
domain/execution/model.py)TaskStatus (
domain/execution/model.py)ErrorCode (
domain/execution/model.py)
2. Python Standard Library
re - Regular expression support
json - JSON processing
logging - Logging
asyncio - Asynchronous programming support
typing - Type annotation support
3. Task Execution Functions
execute_single_task - Single task execution function
execute_batch_task - Batch task execution function
Core Functionality Details
1. Condition Evaluation Engine
Supported Condition Types
# Intent condition
"intent.includes('data_analysis')"
# Context condition
"context.user_type == 'premium'"
"context.priority > 5"
# Input condition
"input.file_type == 'csv'"
"input.size > 1000"
# Result condition
"result[0].status == 'completed'"
"result[1].count > 100"
Logical Operation Support
# AND operation
"intent.includes('data_analysis') AND context.user_type == 'premium'"
# OR operation
"input.file_type == 'csv' OR input.file_type == 'xlsx'"
# Complex condition
"intent.includes('data_analysis') AND (context.priority > 5 OR input.size > 1000)"
Condition Evaluation Method
def evaluate_condition(self, condition: str, intent_categories: List[str],
context: Dict[str, Any] = None, input_data: Dict[str, Any] = None,
results: List[TaskStepResult] = None) -> bool:
"""Evaluate condition expression, supporting multiple condition types"""
try:
# 1. Complex condition: support AND (highest priority)
if " AND " in condition:
parts = condition.split(" AND ")
return all(self.evaluate_condition(part.strip(), intent_categories, context, input_data, results) for part in parts)
# 2. Complex condition: support OR (second priority)
if " OR " in condition:
parts = condition.split(" OR ")
return any(self.evaluate_condition(part.strip(), intent_categories, context, input_data, results) for part in parts)
# 3. Intent condition: intent.includes('category')
match = re.fullmatch(r"intent\.includes\('([^']+)'\)", condition)
if match:
category = match.group(1)
return category in intent_categories
# 4. Context condition: context.field == value
match = re.fullmatch(r"context\.(\w+)\s*(==|!=|>|<|>=|<=)\s*(.+)", condition)
if match and context:
field, operator, value = match.groups()
return self._evaluate_comparison(context.get(field), operator, self._parse_value(value))
# 5. Input condition: input.field == value
match = re.fullmatch(r"input\.(\w+)\s*(==|!=|>|<|>=|<=)\s*(.+)", condition)
if match and input_data:
field, operator, value = match.groups()
return self._evaluate_comparison(input_data.get(field), operator, self._parse_value(value))
# 6. Result condition: result[0].field == value
match = re.fullmatch(r"result\[(\d+)\]\.(\w+)\s*(==|!=|>|<|>=|<=)\s*(.+)", condition)
if match and results:
index, field, operator, value = match.groups()
index = int(index)
if index < len(results) and results[index].result:
result_value = results[index].result.get(field) if isinstance(results[index].result, dict) else None
return self._evaluate_comparison(result_value, operator, self._parse_value(value))
raise ValueError(f"Unsupported condition format: {condition}")
except Exception as e:
logger.error(f"Failed to evaluate condition '{condition}': {e}")
raise ValueError(f"Failed to evaluate condition '{condition}': {e}")
2. Step Type Support
IF Step - Conditional Branch
{
"if": {
"condition": "intent.includes('data_analysis')",
"then": {
"task": "pandas_tool.read_csv",
"params": {"file_path": "data.csv"}
},
"else": {
"task": "json_tool.read_json",
"params": {"file_path": "data.json"}
}
}
}
PARALLEL Step - Parallel Execution
{
"parallel": {
"tasks": [
{
"task": "pandas_tool.read_csv",
"params": {"file_path": "data1.csv"}
},
{
"task": "pandas_tool.read_csv",
"params": {"file_path": "data2.csv"}
}
]
}
}
SEQUENCE Step - Sequential Execution
{
"sequence": {
"steps": [
{
"task": "pandas_tool.read_csv",
"params": {"file_path": "data.csv"}
},
{
"task": "pandas_tool.clean_data",
"params": {"data": "{{result[0].result}}"}
}
]
}
}
TASK Step - Single Task Execution
{
"task": "pandas_tool.read_csv",
"params": {
"file_path": "data.csv",
"encoding": "utf-8"
}
}
LOOP Step - Loop Execution
{
"loop": {
"condition": "result[0].count > 0",
"steps": [
{
"task": "pandas_tool.process_batch",
"params": {"batch_size": 100}
}
]
}
}
3. Value Parsing and Comparison
Value Parsing Method
def _parse_value(self, value_str: str) -> Any:
"""Parse value string to appropriate type"""
value_str = value_str.strip()
# String value
if value_str.startswith('"') and value_str.endswith('"'):
return value_str[1:-1]
if value_str.startswith("'") and value_str.endswith("'"):
return value_str[1:-1]
# Boolean value
if value_str.lower() == "true":
return True
if value_str.lower() == "false":
return False
# Numeric value
try:
if "." in value_str:
return float(value_str)
else:
return int(value_str)
except ValueError:
pass
# Default return string
return value_str
Comparison Operation Support
def _evaluate_comparison(self, left_value: Any, operator: str, right_value: Any) -> bool:
"""Evaluate comparison operation"""
try:
if operator == "==":
return left_value == right_value
elif operator == "!=":
return left_value != right_value
elif operator == ">":
return left_value > right_value
elif operator == "<":
return left_value < right_value
elif operator == ">=":
return left_value >= right_value
elif operator == "<=":
return left_value <= right_value
else:
raise ValueError(f"Unsupported operator: {operator}")
except TypeError:
# Return False when types don't match
return False
4. Async Execution Support
Main Execution Method
async def execute_dsl_step(self, step: Dict, intent_categories: List[str], input_data: Dict,
context: Dict, execute_single_task: Callable, execute_batch_task: Callable,
results: List[TaskStepResult] = None) -> TaskStepResult:
"""Execute DSL step based on step type"""
span = self.tracer.start_span("execute_dsl_step") if self.tracer else None
if span:
span.set_tag("step", json.dumps(step))
try:
if "if" in step:
return await self._handle_if_step(step, intent_categories, input_data, context,
execute_single_task, execute_batch_task, span, results)
elif "parallel" in step:
return await self._handle_parallel_step(step, input_data, context, execute_batch_task, span)
elif "sequence" in step:
return await self._handle_sequence_step(step, intent_categories, input_data, context,
execute_single_task, execute_batch_task, span, results)
elif "task" in step:
return await self._handle_task_step(step, input_data, context, execute_single_task, span)
elif "loop" in step:
return await self._handle_loop_step(step, intent_categories, input_data, context,
execute_single_task, execute_batch_task, span, results)
else:
if span:
span.set_tag("error", True)
span.log_kv({"error_message": "Invalid DSL step"})
return TaskStepResult(
step="unknown",
result=None,
completed=False,
message="Invalid DSL step",
status=TaskStatus.FAILED.value,
error_code=ErrorCode.EXECUTION_ERROR.value,
error_message="Unknown DSL step type"
)
finally:
if span:
span.finish()
Design Patterns Explained
1. Strategy Pattern
# Different step types use different processing strategies
if "if" in step:
return await self._handle_if_step(...)
elif "parallel" in step:
return await self._handle_parallel_step(...)
elif "sequence" in step:
return await self._handle_sequence_step(...)
Advantages:
Algorithm Encapsulation: Encapsulates processing algorithms for different step types in independent methods
Easy Extension: Adding new step types only requires adding new processing methods
Code Reuse: Common logic can be implemented in base classes
2. Template Method Pattern
async def execute_dsl_step(self, step: Dict, ...):
"""Template method - defines common flow for executing steps"""
span = self.tracer.start_span("execute_dsl_step") if self.tracer else None
try:
# Select specific implementation based on step type
result = await self._execute_specific_step(step, ...)
return result
finally:
if span:
span.finish()
Advantages:
Unified Flow: Defines unified execution flow
Step Reuse: Common steps can be reused in template methods
Easy Maintenance: Modifying flow only requires modifying template method
3. Chain of Responsibility Pattern
def evaluate_condition(self, condition: str, ...):
"""Chain of responsibility - try condition evaluation in priority order"""
# 1. Complex condition: support AND (highest priority)
if " AND " in condition:
return self._handle_and_condition(condition, ...)
# 2. Complex condition: support OR (second priority)
if " OR " in condition:
return self._handle_or_condition(condition, ...)
# 3. Intent condition
if self._is_intent_condition(condition):
return self._handle_intent_condition(condition, ...)
# 4. Context condition
if self._is_context_condition(condition):
return self._handle_context_condition(condition, ...)
# More condition types...
Advantages:
Decoupling: Decouples condition evaluation logic into independent methods
Extensibility: Adding new condition types only requires adding new processing methods
Flexibility: Can dynamically adjust condition evaluation order
Usage Examples
1. Basic Condition Evaluation
from aiecs.domain.task.dsl_processor import DSLProcessor
# Create DSL processor
processor = DSLProcessor()
# Evaluate intent condition
intent_categories = ["data_analysis", "visualization"]
condition = "intent.includes('data_analysis')"
result = processor.evaluate_condition(condition, intent_categories)
print(f"Intent condition result: {result}") # True
# Evaluate context condition
context = {"user_type": "premium", "priority": 8}
condition = "context.user_type == 'premium' AND context.priority > 5"
result = processor.evaluate_condition(condition, [], context)
print(f"Context condition result: {result}") # True
# Evaluate input condition
input_data = {"file_type": "csv", "size": 1500}
condition = "input.file_type == 'csv' OR input.size > 1000"
result = processor.evaluate_condition(condition, [], None, input_data)
print(f"Input condition result: {result}") # True
2. Complex Condition Evaluation
# Complex condition evaluation
condition = "intent.includes('data_analysis') AND (context.user_type == 'premium' OR input.size > 1000)"
intent_categories = ["data_analysis"]
context = {"user_type": "basic"}
input_data = {"size": 1500}
result = processor.evaluate_condition(condition, intent_categories, context, input_data)
print(f"Complex condition result: {result}") # True
# Result condition evaluation
from aiecs.domain.execution.model import TaskStepResult, TaskStatus
results = [
TaskStepResult("step1", {"status": "completed", "count": 100}, True, "Step 1 completed", TaskStatus.COMPLETED.value),
TaskStepResult("step2", {"status": "failed", "count": 0}, False, "Step 2 failed", TaskStatus.FAILED.value)
]
condition = "result[0].status == 'completed' AND result[0].count > 50"
result = processor.evaluate_condition(condition, [], None, None, results)
print(f"Result condition result: {result}") # True
3. DSL Step Execution
import asyncio
async def execute_dsl_example():
"""Execute DSL example"""
# Define execution functions
async def execute_single_task(task_name: str, params: Dict[str, Any]) -> TaskStepResult:
"""Simulate single task execution"""
return TaskStepResult(
step=task_name,
result={"status": "completed", "data": f"Result of {task_name}"},
completed=True,
message=f"Task {task_name} completed",
status=TaskStatus.COMPLETED.value
)
async def execute_batch_task(tasks: List[Dict[str, Any]]) -> List[TaskStepResult]:
"""Simulate batch task execution"""
results = []
for task in tasks:
result = await execute_single_task(task["task"], task.get("params", {}))
results.append(result)
return results
# Create DSL processor
processor = DSLProcessor()
# Execute IF step
if_step = {
"if": {
"condition": "intent.includes('data_analysis')",
"then": {
"task": "pandas_tool.read_csv",
"params": {"file_path": "data.csv"}
},
"else": {
"task": "json_tool.read_json",
"params": {"file_path": "data.json"}
}
}
}
intent_categories = ["data_analysis"]
input_data = {}
context = {}
result = await processor.execute_dsl_step(
if_step, intent_categories, input_data, context,
execute_single_task, execute_batch_task
)
print(f"IF step result: {result}")
# Execute PARALLEL step
parallel_step = {
"parallel": {
"tasks": [
{
"task": "pandas_tool.read_csv",
"params": {"file_path": "data1.csv"}
},
{
"task": "pandas_tool.read_csv",
"params": {"file_path": "data2.csv"}
}
]
}
}
result = await processor.execute_dsl_step(
parallel_step, intent_categories, input_data, context,
execute_single_task, execute_batch_task
)
print(f"Parallel step result: {result}")
# Run example
asyncio.run(execute_dsl_example())
4. Condition Syntax Validation
# Validate condition syntax
def validate_conditions():
"""Validate condition syntax"""
processor = DSLProcessor()
# Valid conditions
valid_conditions = [
"intent.includes('data_analysis')",
"context.user_type == 'premium'",
"input.file_type == 'csv'",
"result[0].status == 'completed'",
"intent.includes('data_analysis') AND context.user_type == 'premium'",
"input.file_type == 'csv' OR input.file_type == 'xlsx'"
]
for condition in valid_conditions:
is_valid = processor.validate_condition_syntax(condition)
print(f"Condition '{condition}' is valid: {is_valid}")
# Invalid conditions
invalid_conditions = [
"",
"invalid_condition",
"context.field == value AND", # Incomplete AND
"result[].field == value" # Invalid array index
]
for condition in invalid_conditions:
is_valid = processor.validate_condition_syntax(condition)
print(f"Condition '{condition}' is valid: {is_valid}")
validate_conditions()
Maintenance Guide
1. Daily Maintenance
Condition Pattern Validation
def validate_condition_patterns():
"""Validate condition patterns"""
processor = DSLProcessor()
# Test supported condition patterns
test_conditions = [
"intent.includes('test')",
"context.field == 'value'",
"input.field > 100",
"result[0].field != 'error'"
]
for condition in test_conditions:
try:
result = processor.evaluate_condition(condition, ["test"], {"field": "value"}, {"field": 100})
print(f"✅ Condition '{condition}' evaluated successfully")
except Exception as e:
print(f"❌ Condition '{condition}' failed: {e}")
Step Type Validation
def validate_step_types():
"""Validate step types"""
processor = DSLProcessor()
# Test supported step types
step_types = ["if", "parallel", "sequence", "task", "loop"]
for step_type in step_types:
step = {step_type: {"test": "value"}}
try:
# This only tests step type recognition, doesn't execute actual tasks
if step_type in step:
print(f"✅ Step type '{step_type}' is supported")
else:
print(f"❌ Step type '{step_type}' is not supported")
except Exception as e:
print(f"❌ Step type '{step_type}' validation failed: {e}")
2. Troubleshooting
Common Issue Diagnosis
Issue 1: Condition Evaluation Failure
def diagnose_condition_evaluation_error():
"""Diagnose condition evaluation errors"""
processor = DSLProcessor()
# Test various condition types
test_cases = [
{
"condition": "intent.includes('test')",
"intent_categories": ["test"],
"expected": True
},
{
"condition": "context.field == 'value'",
"context": {"field": "value"},
"expected": True
},
{
"condition": "input.field > 100",
"input_data": {"field": 150},
"expected": True
}
]
for test_case in test_cases:
try:
result = processor.evaluate_condition(
test_case["condition"],
test_case.get("intent_categories", []),
test_case.get("context"),
test_case.get("input_data")
)
expected = test_case["expected"]
if result == expected:
print(f"✅ Condition '{test_case['condition']}' evaluated correctly")
else:
print(f"❌ Condition '{test_case['condition']}' evaluated incorrectly: {result} != {expected}")
except Exception as e:
print(f"❌ Condition '{test_case['condition']}' failed: {e}")
Issue 2: Step Execution Failure
def diagnose_step_execution_error():
"""Diagnose step execution errors"""
processor = DSLProcessor()
# Test invalid steps
invalid_steps = [
{"unknown": "value"}, # Unknown step type
{"if": "invalid"}, # Invalid if step
{"parallel": []}, # Empty parallel step
]
async def mock_execute_single_task(task_name: str, params: Dict[str, Any]):
return TaskStepResult(task_name, "mock_result", True, "Mock completed", TaskStatus.COMPLETED.value)
async def mock_execute_batch_task(tasks: List[Dict[str, Any]]):
return [TaskStepResult(task["task"], "mock_result", True, "Mock completed", TaskStatus.COMPLETED.value) for task in tasks]
async def test_steps():
for step in invalid_steps:
try:
result = await processor.execute_dsl_step(
step, [], {}, {}, mock_execute_single_task, mock_execute_batch_task
)
if result.completed:
print(f"✅ Step executed successfully: {step}")
else:
print(f"❌ Step execution failed: {result.message}")
except Exception as e:
print(f"❌ Step execution error: {e}")
import asyncio
asyncio.run(test_steps())
3. Performance Optimization
Condition Evaluation Optimization
def optimize_condition_evaluation():
"""Optimize condition evaluation performance"""
import time
processor = DSLProcessor()
# Test condition evaluation performance
condition = "intent.includes('test') AND context.field == 'value'"
intent_categories = ["test"]
context = {"field": "value"}
# Warm-up
for _ in range(100):
processor.evaluate_condition(condition, intent_categories, context)
# Performance test
start_time = time.time()
for _ in range(10000):
processor.evaluate_condition(condition, intent_categories, context)
end_time = time.time()
print(f"Condition evaluation time: {(end_time - start_time) * 1000:.2f}ms for 10000 iterations")
Memory Usage Optimization
def optimize_memory_usage():
"""Optimize memory usage"""
import gc
import sys
processor = DSLProcessor()
# Create many conditions
conditions = []
for i in range(10000):
condition = f"intent.includes('test_{i}')"
conditions.append(condition)
print(f"Memory usage before evaluation: {sys.getsizeof(conditions)} bytes")
# Evaluate conditions
intent_categories = ["test_0"]
for condition in conditions[:1000]: # Only evaluate first 1000
processor.evaluate_condition(condition, intent_categories)
# Cleanup
conditions.clear()
gc.collect()
print(f"Memory usage after cleanup: {sys.getsizeof(conditions)} bytes")
4. Extension Support
Adding New Condition Types
class ExtendedDSLProcessor(DSLProcessor):
"""Extended DSL processor supporting new condition types"""
def __init__(self, tracer=None):
super().__init__(tracer)
# Add new condition patterns
self.supported_conditions.extend([
r"config\.(\w+)\s*(==|!=|>|<|>=|<=)\s*(.+)", # Config condition
r"env\.(\w+)\s*(==|!=|>|<|>=|<=)\s*(.+)" # Environment variable condition
])
def evaluate_condition(self, condition: str, intent_categories: List[str],
context: Dict[str, Any] = None, input_data: Dict[str, Any] = None,
results: List[TaskStepResult] = None, config: Dict[str, Any] = None,
env_vars: Dict[str, str] = None) -> bool:
"""Extended condition evaluation method"""
try:
# Call parent method
result = super().evaluate_condition(condition, intent_categories, context, input_data, results)
if result is not None:
return result
# Handle config condition
match = re.fullmatch(r"config\.(\w+)\s*(==|!=|>|<|>=|<=)\s*(.+)", condition)
if match and config:
field, operator, value = match.groups()
return self._evaluate_comparison(config.get(field), operator, self._parse_value(value))
# Handle environment variable condition
match = re.fullmatch(r"env\.(\w+)\s*(==|!=|>|<|>=|<=)\s*(.+)", condition)
if match and env_vars:
field, operator, value = match.groups()
return self._evaluate_comparison(env_vars.get(field), operator, self._parse_value(value))
raise ValueError(f"Unsupported condition format: {condition}")
except Exception as e:
logger.error(f"Failed to evaluate condition '{condition}': {e}")
raise ValueError(f"Failed to evaluate condition '{condition}': {e}")
Adding New Step Types
class ExtendedDSLProcessor(DSLProcessor):
"""Extended DSL processor supporting new step types"""
async def execute_dsl_step(self, step: Dict, intent_categories: List[str], input_data: Dict,
context: Dict, execute_single_task: Callable, execute_batch_task: Callable,
results: List[TaskStepResult] = None) -> TaskStepResult:
"""Extended step execution method"""
# Check new step types
if "retry" in step:
return await self._handle_retry_step(step, intent_categories, input_data, context,
execute_single_task, execute_batch_task, results)
elif "timeout" in step:
return await self._handle_timeout_step(step, intent_categories, input_data, context,
execute_single_task, execute_batch_task, results)
else:
# Call parent method
return await super().execute_dsl_step(step, intent_categories, input_data, context,
execute_single_task, execute_batch_task, results)
async def _handle_retry_step(self, step: Dict, intent_categories: List[str], input_data: Dict,
context: Dict, execute_single_task: Callable, execute_batch_task: Callable,
results: List[TaskStepResult] = None) -> TaskStepResult:
"""Handle retry step"""
retry_config = step["retry"]
max_retries = retry_config.get("max_retries", 3)
delay = retry_config.get("delay", 1.0)
for attempt in range(max_retries + 1):
try:
result = await execute_single_task(retry_config["task"], retry_config.get("params", {}))
if result.completed:
return result
except Exception as e:
if attempt == max_retries:
return TaskStepResult(
step=retry_config["task"],
result=None,
completed=False,
message=f"Retry exhausted after {max_retries} attempts",
status=TaskStatus.FAILED.value,
error_code=ErrorCode.RETRY_EXHAUSTED.value,
error_message=str(e)
)
await asyncio.sleep(delay)
return TaskStepResult(
step=retry_config["task"],
result=None,
completed=False,
message="Retry step failed",
status=TaskStatus.FAILED.value,
error_code=ErrorCode.EXECUTION_ERROR.value
)
Monitoring and Logging
Performance Monitoring
import time
from typing import Dict, Any
class DSLProcessorMonitor:
"""DSL Processor Monitor"""
def __init__(self):
self.evaluation_metrics = {
"total_evaluations": 0,
"successful_evaluations": 0,
"failed_evaluations": 0
}
self.execution_metrics = {
"total_executions": 0,
"successful_executions": 0,
"failed_executions": 0
}
self.performance_metrics = {
"evaluation_times": [],
"execution_times": []
}
def record_condition_evaluation(self, condition: str, success: bool, evaluation_time: float):
"""Record condition evaluation metrics"""
self.evaluation_metrics["total_evaluations"] += 1
if success:
self.evaluation_metrics["successful_evaluations"] += 1
else:
self.evaluation_metrics["failed_evaluations"] += 1
self.performance_metrics["evaluation_times"].append(evaluation_time)
def record_step_execution(self, step_type: str, success: bool, execution_time: float):
"""Record step execution metrics"""
self.execution_metrics["total_executions"] += 1
if success:
self.execution_metrics["successful_executions"] += 1
else:
self.execution_metrics["failed_executions"] += 1
self.performance_metrics["execution_times"].append(execution_time)
def get_performance_report(self) -> Dict[str, Any]:
"""Get performance report"""
report = {
"evaluation_metrics": self.evaluation_metrics.copy(),
"execution_metrics": self.execution_metrics.copy()
}
# Calculate evaluation performance
if self.performance_metrics["evaluation_times"]:
times = self.performance_metrics["evaluation_times"]
report["evaluation_performance"] = {
"avg_time": sum(times) / len(times),
"min_time": min(times),
"max_time": max(times)
}
# Calculate execution performance
if self.performance_metrics["execution_times"]:
times = self.performance_metrics["execution_times"]
report["execution_performance"] = {
"avg_time": sum(times) / len(times),
"min_time": min(times),
"max_time": max(times)
}
return report
Logging
import logging
from typing import Dict, Any
class DSLProcessorLogger:
"""DSL Processor Logger"""
def __init__(self):
self.logger = logging.getLogger(__name__)
def log_condition_evaluation(self, condition: str, result: bool, evaluation_time: float):
"""Log condition evaluation"""
self.logger.info(f"Condition evaluated: '{condition}' -> {result} ({evaluation_time:.3f}s)")
def log_step_execution(self, step_type: str, step: Dict[str, Any], result: TaskStepResult):
"""Log step execution"""
if result.completed:
self.logger.info(f"Step executed successfully: {step_type} -> {result.step}")
else:
self.logger.error(f"Step execution failed: {step_type} -> {result.step}: {result.error_message}")
def log_validation_error(self, condition: str, error: Exception):
"""Log validation error"""
self.logger.error(f"Condition validation failed: '{condition}' - {error}")
def log_execution_error(self, step: Dict[str, Any], error: Exception):
"""Log execution error"""
self.logger.error(f"Step execution error: {step} - {error}")
Version History
v1.0.0: Initial version, basic condition evaluation and step execution
v1.1.0: Added parallel and sequential step support
v1.2.0: Added loop step support
v1.3.0: Added condition syntax validation
v1.4.0: Added performance monitoring and logging
v1.5.0: Added extension support