OperationExecutor Component Technical Documentation
Overview
Design Motivation & Problem Background
When building AI-driven application systems, developers face the following core challenges:
1. Tool Operation Fragmentation
Different tools (data processing, document analysis, image processing, etc.) have their own calling interfaces
Lack of unified execution standards and error handling mechanisms
Difficult to manage and orchestrate dependencies between tools
2. Complex Business Process Orchestration Difficulties
Need to chain multiple tool operations into complex business pipelines
Lack of standardized ways for parameter passing and referencing between operations
Failure recovery and error handling strategies are inconsistent
3. Performance and Resource Management Challenges
Large numbers of tool operations need concurrent execution to improve efficiency
Lack of unified caching, rate limiting, and resource management mechanisms
Long-running tasks lack progress tracking and state management
4. High Development and Maintenance Costs
Each tool needs separate handling of async/sync execution
Lack of unified monitoring, logging, and debugging mechanisms
New tool integration requires repetitive writing of similar execution logic
OperationExecutor’s Solution:
Unified Execution Interface: Provides consistent calling methods for all tool operations
Intelligent Orchestration Capabilities: Supports sequential, parallel, batch processing, and other execution modes
Parameter Reference System: Implements data passing between operations through
$result[index]syntaxEnterprise Features: Built-in caching, rate limiting, retry, monitoring, and other production-essential features
Developer Friendly: Simplifies tool integration, provides rich error information and debugging support
Component Positioning
OperationExecutor is the core application component in the AIECS (AI Execute Services) system, responsible for unified management and execution of various tool operations. As an application-layer executor, it provides advanced operation orchestration, batch processing, concurrency control, and error handling capabilities.
Component Type & Positioning
Component Type
Application Component - Located in the Application Layer, belongs to the business logic layer
Architecture Layers
┌─────────────────────────────────────────┐
│ API Layer │ ← Upstream callers
│ (FastAPI, WebSocket, AIECS Client) │
└─────────────────┬───────────────────────┘
│
┌─────────────────▼───────────────────────┐
│ Application Layer │ ← OperationExecutor layer
│ (OperationExecutor) │
└─────────────────┬───────────────────────┘
│
┌─────────────────▼───────────────────────┐
│ Domain Layer │
│ (TaskStepResult, TaskStatus, ErrorCode)│
└─────────────────┬───────────────────────┘
│
┌─────────────────▼───────────────────────┐
│ Infrastructure Layer │ ← Downstream dependencies
│ (ToolExecutor, ExecutionUtils) │
└─────────────────────────────────────────┘
Upstream Components (Callers)
1. AIECS Client (aiecs_client.py)
Purpose: Main entry point for programmatic use of AIECS services
Calling Method: Initialize and use OperationExecutor in full mode
Dependency Relationship: Direct dependency, obtains instance through dependency injection
Calling Scenarios:
Programmatic tool calls and batch processing
Data analysis and processing pipelines
Automation scripts and task orchestration
2. FastAPI Application (main.py)
Purpose: Web API service, handles HTTP requests
Calling Method: Indirectly called through task execution API
Dependency Relationship: Indirect dependency, called through service layer
Calling Scenarios:
RESTful API interface calls
Task submission and status queries
Tool list and metadata retrieval
3. WebSocket Service (socket_server.py)
Purpose: Real-time communication, handles WebSocket connections
Calling Method: Indirectly called through message processing
Dependency Relationship: Indirect dependency, called through message routing
Calling Scenarios:
Real-time task progress pushing
Client real-time interaction
Real-time task status updates
4. Celery Task Manager (infrastructure/messaging/celery_task_manager.py)
Purpose: Distributed task scheduling and execution
Calling Method: Indirectly called through task queue
Dependency Relationship: Indirect dependency, called through task executor
Calling Scenarios:
Asynchronous task execution
Distributed computing tasks
Long-running task processing
5. Task Executor (tasks/worker.py)
Purpose: Specific task execution logic
Calling Method: Indirectly called through Celery tasks
Dependency Relationship: Indirect dependency, called through service instance
Calling Scenarios:
Fast task execution (fast_tasks queue)
Heavy task execution (heavy_tasks queue)
Task status management and progress pushing
Downstream Components (Dependencies)
1. ToolExecutor (tools/tool_executor/tool_executor.py)
Purpose: Low-level tool execution engine
Functionality: Provides sync/async tool execution, caching, retry, security validation
Dependency Type: Direct dependency, injected through constructor
2. ExecutionUtils (utils/execution_utils.py)
Purpose: Execution utility class, provides caching and retry mechanisms
Functionality: Cache management, retry strategies, timeout control
Dependency Type: Direct dependency, injected through constructor
3. Tool Registry (tools/__init__.py)
Purpose: Tool discovery and instance management
Functionality: Tool registration, obtaining tool instances
Dependency Type: Direct dependency, called through
get_tool()function
4. Domain Models (domain/execution/model.py)
Purpose: Data model definitions
Functionality: TaskStepResult, TaskStatus, ErrorCode, and other models
Dependency Type: Direct dependency, used for result encapsulation
Key Domain Models
TaskStepResult
Represents the execution result of a single operation step, containing complete execution state and result information.
@dataclass
class TaskStepResult:
step: str # Operation step identifier (e.g., "pandas_tool.read_csv")
result: Any # Operation execution result
completed: bool # Whether successfully completed
message: str # Execution message description
status: str # Status code (pending/running/completed/failed)
error_code: Optional[str] # Error code (e.g., "E003")
error_message: Optional[str] # Detailed error information
def dict(self) -> Dict[str, Any]:
"""Convert to dictionary format for serialization"""
return {
"step": self.step,
"result": self.result,
"completed": self.completed,
"message": self.message,
"status": self.status,
"error_code": self.error_code,
"error_message": self.error_message
}
Example Data:
{
"step": "pandas_tool.read_csv",
"result": {"rows": 1000, "columns": 5, "data": [...]},
"completed": true,
"message": "Successfully loaded CSV file",
"status": "completed",
"error_code": null,
"error_message": null
}
TaskStatus
Defines various states of task execution.
class TaskStatus(Enum):
PENDING = "pending" # Waiting for execution
RUNNING = "running" # Executing
COMPLETED = "completed" # Execution completed
CANCELLED = "cancelled" # Cancelled
TIMED_OUT = "timed_out" # Execution timeout
FAILED = "failed" # Execution failed
ErrorCode
Unified error code system for easy error classification and handling.
class ErrorCode(Enum):
VALIDATION_ERROR = "E001" # Parameter validation error
TIMEOUT_ERROR = "E002" # Execution timeout
EXECUTION_ERROR = "E003" # Execution error
CANCELLED_ERROR = "E004" # Cancellation error
RETRY_EXHAUSTED = "E005" # Retry exhausted
DATABASE_ERROR = "E006" # Database error
DSL_EVALUATION_ERROR = "E007" # DSL evaluation error
Core Features
1. Single Operation Execution
async def execute_operation(self, operation_spec: str, params: Dict[str, Any]) -> Any
Execute a single tool operation
Supports operation specifications in
tool_name.operation_nameformatAutomatically handles sync/async operations
Parameter filtering and validation
Parameters Dictionary Contract
The key-value pairs of the params dictionary are defined by specific tool operations, each tool operation has its specific parameter requirements:
Parameter Sources:
Tool Definition: Each tool class’s method signature defines required parameters
Tool Documentation: Obtain parameter information through
list_tools()or tool metadataType Hints: Tool methods use Python type hints to declare parameter types
Parameter Passing Rules:
# Example: pandas_tool.read_csv operation
params = {
"file_path": "data.csv", # Required parameter
"encoding": "utf-8", # Optional parameter
"delimiter": ",", # Optional parameter
"user_id": "user123", # System parameter (will be filtered)
"task_id": "task456" # System parameter (will be filtered)
}
# System parameters will be automatically filtered, only tool-required parameters are passed
# Actual parameters passed to tool:
# {
# "file_path": "data.csv",
# "encoding": "utf-8",
# "delimiter": ","
# }
How to Find Tool Parameters:
Use
list_tools()to get tool list and basic informationCheck method signatures and docstrings in tool source code
Refer to Tool Development Guide for parameter definition specifications
Runtime errors will prompt missing parameter names
2. Batch Operation Execution
async def batch_execute_operations(self, operations: List[Dict[str, Any]]) -> List[Any]
Batch execute multiple operations
Supports rate limiting and batch size control
Concurrent execution improves performance
3. Sequential Operation Execution
async def execute_operations_sequence(self, operations: List[Dict[str, Any]],
user_id: str, task_id: str,
stop_on_failure: bool = False,
save_callback=None) -> List[TaskStepResult]
Execute operation sequence in order
Supports stopping execution on failure
Supports step result save callback
Parameter reference handling (e.g.,
$result[0].data.field)
4. Parallel Operation Execution
async def execute_parallel_operations(self, operations: List[Dict[str, Any]]) -> List[TaskStepResult]
Execute multiple operations in parallel
Exception handling and result aggregation
Suitable for operations without dependencies
5. Tool Call Batch Processing
async def batch_tool_calls(self, tool_calls: List[Dict], tool_executor_func=None) -> List[Any]
Batch execute tool calls
Supports custom executor function
Rate limiting and cache support
Technical Features
1. Parameter Processing
System Parameter Filtering: Automatically filters system parameters like
user_id,task_idParameter Reference Parsing: Supports parameter references in
$result[index]formatNested Attribute Access: Supports attribute access in
$result[0].data.fieldformat
2. Caching Mechanism
Context-Aware Caching: Cache keys based on user ID, task ID, and parameters
TTL Support: Configurable cache time-to-live
Cache Invalidation: Automatically handles expired cache
3. Concurrency Control
Semaphore Limiting: Uses
asyncio.Semaphoreto control concurrencyRate Limiting: Configurable requests per second limit
Batch Size: Configurable batch processing size
4. Error Handling
Exception Catching: Comprehensive exception catching and handling
Error Code Mapping: Unified error code system
Failure Recovery: Supports stopping or continuing execution on failure
5. Tool Management
Lazy Loading: Tool instances created on demand
Instance Caching: Caches created tool instances
Tool Discovery: Supports dynamic tool discovery and registration
API Interface
Constructor
def __init__(self, tool_executor: ToolExecutor, execution_utils: ExecutionUtils, config: Dict[str, Any])
Parameters:
tool_executor: Tool executor instanceexecution_utils: Execution utility class instanceconfig: Configuration dictionary
Configuration Options:
rate_limit_requests_per_second: Requests per second limit (default: 5)batch_size: Batch processing size (default: 10)enable_cache: Enable cache (default: True)
Main Methods
1. execute_operation
async def execute_operation(self, operation_spec: str, params: Dict[str, Any]) -> Any
Execute a single operation
2. batch_execute_operations
async def batch_execute_operations(self, operations: List[Dict[str, Any]]) -> List[Any]
Batch execute operations
3. execute_operations_sequence
async def execute_operations_sequence(self, operations: List[Dict[str, Any]],
user_id: str, task_id: str,
stop_on_failure: bool = False,
save_callback=None) -> List[TaskStepResult]
Sequentially execute operation sequence
4. execute_parallel_operations
async def execute_parallel_operations(self, operations: List[Dict[str, Any]]) -> List[TaskStepResult]
Execute operations in parallel
5. batch_tool_calls
async def batch_tool_calls(self, tool_calls: List[Dict], tool_executor_func=None) -> List[Any]
Batch tool calls
Utility Methods
1. get_tool_instance
def get_tool_instance(self, tool_name: str)
Get tool instance
2. clear_tool_cache
def clear_tool_cache(self)
Clear tool cache
3. get_stats
def get_stats(self) -> Dict[str, Any]
Get executor statistics
Configuration Management
Default Configuration
{
"rate_limit_requests_per_second": 5,
"batch_size": 10,
"enable_cache": True
}
Environment Variable Support
Through ExecutionUtils and ToolExecutor support environment variable configuration:
TOOL_EXECUTOR_CACHE_SIZE: Cache sizeTOOL_EXECUTOR_CACHE_TTL: Cache TTLTOOL_EXECUTOR_MAX_WORKERS: Maximum worker threads
Error Handling
Exception Types & Examples
1. ValueError - Parameter Validation Error
Trigger Condition: Operation specification format error or parameter validation failure
# Example 1: Operation specification format error
try:
result = await executor.execute_operation("invalid_format", {})
except ValueError as e:
print(f"ValueError: {e}")
# Output: "Invalid operation spec: invalid_format, expected 'tool_name.operation_name'"
# Example 2: Operation specification contains extra parts
try:
result = await executor.execute_operation("tool.op.extra", {})
except ValueError as e:
print(f"ValueError: {e}")
# Output: "Invalid operation spec: tool.op.extra, expected 'tool_name.operation_name'"
# Example 3: Parameter reference index out of range
operations = [{"operation": "pandas_tool.read_csv", "params": {"file_path": "data.csv"}}]
try:
result = await executor.execute_operations_sequence(
[{"operation": "pandas_tool.describe", "params": {"df": "$result[5]"}}], # Index 5 doesn't exist
"user123", "task456"
)
except ValueError as e:
print(f"ValueError: {e}")
# Output: "Referenced result index 5 out of range"
2. AttributeError - Tool or Operation Not Found
Trigger Condition: Tool not registered or operation doesn’t exist
# Example 1: Tool doesn't exist
try:
result = await executor.execute_operation("nonexistent_tool.read_data", {})
except AttributeError as e:
print(f"AttributeError: {e}")
# Output: "Tool 'nonexistent_tool' is not registered"
# Example 2: Tool exists but operation doesn't exist
try:
result = await executor.execute_operation("pandas_tool.nonexistent_method", {})
except AttributeError as e:
print(f"AttributeError: {e}")
# Output: "Operation 'nonexistent_method' not found in tool 'pandas_tool'"
3. Exception - General Execution Error
Trigger Condition: Various errors during tool execution
# Example 1: File doesn't exist
try:
result = await executor.execute_operation(
"pandas_tool.read_csv",
{"file_path": "nonexistent_file.csv"}
)
except Exception as e:
print(f"Execution Error: {e}")
# Output: "Error executing read_csv: [Errno 2] No such file or directory: 'nonexistent_file.csv'"
# Example 2: Parameter type error
try:
result = await executor.execute_operation(
"pandas_tool.read_csv",
{"file_path": 123} # Should be string
)
except Exception as e:
print(f"Execution Error: {e}")
# Output: "Error executing read_csv: expected str, bytes or os.PathLike object, not int"
Error Code Mapping
Error Type |
Error Code |
Description |
Example Scenario |
|---|---|---|---|
ValueError |
E001 |
Parameter validation error |
Operation spec format error, invalid parameter reference |
TimeoutError |
E002 |
Execution timeout |
Operation execution time exceeds limit |
Exception |
E003 |
Execution error |
Various exceptions during tool execution |
CancelledError |
E004 |
Cancellation error |
Task actively cancelled |
RetryExhausted |
E005 |
Retry exhausted |
Retry attempts exhausted but still failed |
DatabaseError |
E006 |
Database error |
Database operation failed |
DSLEvaluationError |
E007 |
DSL evaluation error |
Parameter reference parsing failed |
Error Recovery Strategies
1. Single Operation Error Handling
# Single operation fails, returns error result but doesn't affect other operations
try:
result = await executor.execute_operation("risky_operation", params)
print(f"Success: {result}")
except Exception as e:
print(f"Operation failed: {e}")
# Can continue executing other operations
2. Sequence Operation Error Handling
# Configure to stop execution on failure
results = await executor.execute_operations_sequence(
operations,
user_id,
task_id,
stop_on_failure=True # Stop immediately on error
)
# Check results
for i, result in enumerate(results):
if not result.completed:
print(f"Step {i} failed: {result.error_message}")
break # Due to stop_on_failure=True, subsequent steps won't execute
3. Parallel Operation Error Handling
# Single failure in parallel operations doesn't affect other operations
results = await executor.execute_parallel_operations(operations)
# Check each result
for i, result in enumerate(results):
if result.completed:
print(f"Operation {i} succeeded: {result.result}")
else:
print(f"Operation {i} failed: {result.error_message}")
Error Monitoring and Debugging
# Get executor statistics, including error statistics
stats = executor.get_stats()
print(f"Total operations: {stats.get('total_operations', 0)}")
print(f"Failed operations: {stats.get('failed_operations', 0)}")
print(f"Success rate: {stats.get('success_rate', 0):.2%}")
# Detailed error logging
import logging
logging.basicConfig(level=logging.DEBUG)
# Error information will be logged with complete stack traces
Performance Optimization
1. Caching Strategy
LRU cache algorithm
Context-aware cache keys
Configurable TTL
2. Concurrency Control
Semaphore limiting concurrency
Batch processing reduces overhead
Async execution improves throughput
3. Resource Management
Tool instance reuse
Memory usage optimization
Connection pool management
Monitoring & Logging
Logging
Operation execution logs
Error and exception logs
Performance metrics logs
Statistics
{
"cached_tools": 5,
"tool_names": ["chart_tool", "pandas_tool", ...],
"semaphore_value": 3,
"config": {
"batch_size": 10,
"rate_limit": 5,
"enable_cache": True
}
}
Maintenance Guide
1. Daily Maintenance
Monitor Cache Hit Rate: Monitor cache performance through
get_stats()Check Tool Registration: Ensure required tools are properly registered
Performance Tuning: Adjust batch size and rate limits based on load
2. Troubleshooting
Tool Execution Failure: Check tool registration and parameter passing
Cache Issues: Check cache configuration and TTL settings
Concurrency Issues: Check semaphore configuration and resource limits
3. Extension Development
Add New Operation Types: Extend
execute_operationmethodCustom Tool Executor: Through
tool_executor_funcparameterNew Batch Processing Strategies: Extend batch processing methods
4. Configuration Tuning
# High concurrency configuration
config = {
"rate_limit_requests_per_second": 20,
"batch_size": 50,
"enable_cache": True
}
# Low latency configuration
config = {
"rate_limit_requests_per_second": 1,
"batch_size": 1,
"enable_cache": False
}
Dependency Diagram
Component Architecture Diagram
graph TB
subgraph "API Layer"
A[AIECS Client]
B[FastAPI App]
C[WebSocket Server]
end
subgraph "Application Layer"
D[OperationExecutor]
end
subgraph "Domain Layer"
E[TaskStepResult]
F[TaskStatus]
G[ErrorCode]
end
subgraph "Infrastructure Layer"
H[ToolExecutor]
I[ExecutionUtils]
J[Tool Registry]
end
subgraph "Tool Layer"
K[Tool Instances]
L[BaseTool]
end
A --> D
B --> D
C --> D
D --> H
D --> I
D --> J
D --> E
D --> F
D --> G
H --> K
I --> M[Cache System]
I --> N[Retry Logic]
J --> L
Data Flow Diagram
sequenceDiagram
participant Client as AIECS Client
participant OE as OperationExecutor
participant TE as ToolExecutor
participant EU as ExecutionUtils
participant TR as Tool Registry
Client->>OE: execute_operation(spec, params)
OE->>TR: get_tool(tool_name)
TR-->>OE: tool_instance
OE->>EU: generate_cache_key()
EU-->>OE: cache_key
OE->>EU: get_from_cache(cache_key)
alt Cache Hit
EU-->>OE: cached_result
OE-->>Client: result
else Cache Miss
OE->>TE: execute_async(tool, operation, params)
TE-->>OE: result
OE->>EU: add_to_cache(cache_key, result)
OE-->>Client: result
end
Calling Scenario Flow Diagram
graph TB
subgraph "API Calling Scenarios"
A1[Web API Request] --> B1[FastAPI App]
A2[WebSocket Connection] --> B2[Socket Server]
A3[Programmatic Call] --> B3[AIECS Client]
end
subgraph "Task Scheduling Scenarios"
C1[Celery Task] --> D1[Task Manager]
C2[Scheduled Task] --> D1
C3[Queue Task] --> D1
end
subgraph "Execution Layer"
B1 --> E[OperationExecutor]
B2 --> E
B3 --> E
D1 --> E
end
subgraph "Tool Execution"
E --> F1[Single Operation Execution]
E --> F2[Batch Operation Execution]
E --> F3[Sequential Operation Execution]
E --> F4[Parallel Operation Execution]
end
subgraph "Application Scenarios"
F1 --> G1[Data Analysis Pipeline]
F2 --> G2[Batch File Processing]
F3 --> G3[Workflow Automation]
F4 --> G4[Real-time Data Processing]
end
Concurrency Control Diagram
graph LR
subgraph "Concurrency Control"
A[Request 1] --> B[Semaphore]
C[Request 2] --> B
D[Request 3] --> B
E[Request N] --> B
end
B --> F[Rate Limiter]
F --> G[Batch Processor]
G --> H[Tool Executor]
H --> I[Result 1]
H --> J[Result 2]
H --> K[Result 3]
H --> L[Result N]
Software Function Scenarios
1. Data Analysis & Processing Pipeline
Scenario Description: Build complete data analysis pipeline through OperationExecutor Applicable Scenarios: Data science, business intelligence, report generation Implementation:
# Data reading → Cleaning → Analysis → Visualization → Report generation
operations = [
{"operation": "pandas_tool.read_csv", "params": {"file_path": "sales_data.csv"}},
{"operation": "pandas_tool.clean_data", "params": {"df": "$result[0]", "drop_na": True}},
{"operation": "pandas_tool.analyze_sales", "params": {"df": "$result[1]"}},
{"operation": "chart_tool.create_dashboard", "params": {"data": "$result[2]"}},
{"operation": "report_tool.generate_pdf", "params": {"charts": "$result[3]"}}
]
results = await executor.execute_operations_sequence(operations, user_id, task_id)
2. Document Processing & Content Extraction
Scenario Description: Batch process various format documents, extract key information Applicable Scenarios: Document management, content analysis, information extraction Implementation:
# Document reading → OCR recognition → Content analysis → Keyword extraction → Classification
operations = [
{"operation": "office_tool.read_document", "params": {"file_path": "document.pdf"}},
{"operation": "image_tool.ocr_text", "params": {"image": "$result[0].images"}},
{"operation": "classfire_tool.extract_keywords", "params": {"text": "$result[1]"}},
{"operation": "classfire_tool.classify_text", "params": {"text": "$result[1]"}}
]
results = await executor.batch_execute_operations(operations)
3. Real-time Data Processing & Monitoring
Scenario Description: Real-time processing of streaming data, monitoring and alerting Applicable Scenarios: System monitoring, real-time analysis, anomaly detection Implementation:
# Data collection → Real-time analysis → Anomaly detection → Alert pushing
operations = [
{"operation": "scraper_tool.collect_metrics", "params": {"endpoint": "api/metrics"}},
{"operation": "stats_tool.calculate_trends", "params": {"data": "$result[0]"}},
{"operation": "stats_tool.detect_anomalies", "params": {"data": "$result[1]"}},
{"operation": "search_api.send_alert", "params": {"anomalies": "$result[2]"}}
]
# Use parallel execution to improve real-time performance
results = await executor.execute_parallel_operations(operations)
4. Intelligent Content Generation & Optimization
Scenario Description: AI-based content generation, optimization, and personalized recommendations Applicable Scenarios: Content creation, marketing automation, personalized recommendations Implementation:
# Content analysis → Generate suggestions → Optimize content → Personalized recommendations
operations = [
{"operation": "research_tool.analyze_topic", "params": {"topic": "AI trends"}},
{"operation": "research_tool.generate_outline", "params": {"analysis": "$result[0]"}},
{"operation": "classfire_tool.optimize_content", "params": {"content": "$result[1]"}},
{"operation": "search_api.personalize_recommendation", "params": {"content": "$result[2]"}}
]
results = await executor.execute_operations_sequence(operations, user_id, task_id)
5. Batch File Processing & Conversion
Scenario Description: Batch process large numbers of files, format conversion and content extraction Applicable Scenarios: File management, format conversion, batch processing Implementation:
# File discovery → Batch conversion → Content extraction → Result aggregation
file_operations = [
{"operation": "office_tool.convert_to_pdf", "params": {"file_path": f"doc_{i}.docx"}}
for i in range(100)
]
results = await executor.batch_execute_operations(file_operations)
6. Multimodal Data Processing
Scenario Description: Process text, image, audio, and other multimodal data Applicable Scenarios: Multimedia analysis, content understanding, cross-modal search Implementation:
# Parallel processing of multimodal data
operations = [
{"operation": "image_tool.analyze_image", "params": {"image_path": "photo.jpg"}},
{"operation": "classfire_tool.analyze_text", "params": {"text": "description.txt"}},
{"operation": "research_tool.search_related", "params": {"query": "AI applications"}}
]
results = await executor.execute_parallel_operations(operations)
7. Workflow Automation & Orchestration
Scenario Description: Automate complex workflows, reduce manual intervention Applicable Scenarios: Business process automation, task orchestration, workflow management Implementation:
# Conditional branching → Parallel processing → Result merging → Subsequent processing
operations = [
{"operation": "pandas_tool.check_data_quality", "params": {"df": "input_data"}},
# Decide subsequent processing based on data quality
{"operation": "pandas_tool.clean_data", "params": {"df": "$result[0]", "if_quality_low": True}},
{"operation": "stats_tool.generate_report", "params": {"df": "$result[1]"}}
]
results = await executor.execute_operations_sequence(
operations, user_id, task_id, stop_on_failure=True
)
8. Real-time Collaboration & Sharing
Scenario Description: Support multi-user real-time collaboration and resource sharing Applicable Scenarios: Team collaboration, shared workspace, real-time editing Implementation:
# Push processing progress in real-time through WebSocket
async def process_with_progress(operations, user_id, task_id):
def progress_callback(step, result):
# Push progress to client in real-time
asyncio.create_task(push_progress(user_id, {
"step": step,
"result": result,
"status": "processing"
}))
return await executor.execute_operations_sequence(
operations, user_id, task_id, save_callback=progress_callback
)
Real-world Use Cases
Case 1: E-commerce Data Analysis Platform
Business Background: An e-commerce company needs real-time analysis of sales data, generate daily reports Technical Implementation:
# Daily sales data analysis pipeline
daily_analysis_operations = [
# 1. Data collection
{"operation": "pandas_tool.read_csv", "params": {"file_path": f"sales_{date}.csv"}},
{"operation": "scraper_tool.collect_external_data", "params": {"api_endpoint": "market_api"}},
# 2. Data cleaning and preprocessing
{"operation": "pandas_tool.clean_data", "params": {"df": "$result[0]", "remove_duplicates": True}},
{"operation": "pandas_tool.merge_data", "params": {"df1": "$result[2]", "df2": "$result[1]"}},
# 3. Data analysis
{"operation": "stats_tool.calculate_metrics", "params": {"df": "$result[3]"}},
{"operation": "pandas_tool.group_analysis", "params": {"df": "$result[3]", "group_by": "category"}},
# 4. Visualization generation
{"operation": "chart_tool.create_sales_chart", "params": {"data": "$result[4]"}},
{"operation": "chart_tool.create_trend_analysis", "params": {"data": "$result[5]"}},
# 5. Report generation
{"operation": "report_tool.generate_daily_report", "params": {
"charts": ["$result[6]", "$result[7]"],
"metrics": "$result[4]",
"template": "daily_sales_template"
}}
]
# Use sequential execution to ensure data dependencies
results = await executor.execute_operations_sequence(
daily_analysis_operations,
user_id="analyst_001",
task_id=f"daily_analysis_{date}",
stop_on_failure=True
)
Case 2: Intelligent Document Processing System
Business Background: Law firm needs batch processing of contract documents, extract key clauses Technical Implementation:
# Contract document intelligent analysis
contract_analysis_operations = [
# 1. Document reading and OCR
{"operation": "office_tool.read_document", "params": {"file_path": "contract.pdf"}},
{"operation": "image_tool.ocr_text", "params": {"image": "$result[0].scanned_pages"}},
# 2. Text analysis and key information extraction
{"operation": "classfire_tool.extract_entities", "params": {"text": "$result[1]"}},
{"operation": "classfire_tool.find_contract_terms", "params": {"text": "$result[1]"}},
{"operation": "research_tool.search_legal_precedents", "params": {"terms": "$result[3]"}},
# 3. Risk assessment and classification
{"operation": "classfire_tool.assess_risk_level", "params": {
"entities": "$result[2]",
"terms": "$result[3]",
"precedents": "$result[4]"
}},
{"operation": "classfire_tool.classify_contract_type", "params": {"text": "$result[1]"}},
# 4. Generate analysis report
{"operation": "report_tool.generate_legal_analysis", "params": {
"contract_text": "$result[1]",
"entities": "$result[2]",
"terms": "$result[3]",
"risk_assessment": "$result[5]",
"contract_type": "$result[6]"
}}
]
# Use batch processing for multiple contracts
contract_files = ["contract_001.pdf", "contract_002.pdf", "contract_003.pdf"]
all_results = []
for contract_file in contract_files:
# Create independent task for each contract
contract_ops = [
{**op, "params": {**op["params"], "file_path": contract_file}}
for op in contract_analysis_operations
]
result = await executor.execute_operations_sequence(
contract_ops,
user_id="lawyer_001",
task_id=f"contract_analysis_{contract_file}",
stop_on_failure=False # Single contract failure doesn't affect others
)
all_results.append(result)
Case 3: Real-time Monitoring & Alerting System
Business Background: Cloud service provider needs real-time monitoring of system performance, timely problem detection Technical Implementation:
# Real-time monitoring and alerting processing
async def monitor_system_health():
while True:
# Parallel collection of various monitoring data
monitoring_operations = [
{"operation": "scraper_tool.collect_cpu_metrics", "params": {"endpoint": "metrics_api"}},
{"operation": "scraper_tool.collect_memory_metrics", "params": {"endpoint": "metrics_api"}},
{"operation": "scraper_tool.collect_network_metrics", "params": {"endpoint": "metrics_api"}},
{"operation": "scraper_tool.collect_disk_metrics", "params": {"endpoint": "metrics_api"}}
]
# Parallel execution of monitoring data collection
metrics_results = await executor.execute_parallel_operations(monitoring_operations)
# Analyze monitoring data
analysis_operations = [
{"operation": "stats_tool.analyze_cpu_trends", "params": {"data": "$result[0]"}},
{"operation": "stats_tool.analyze_memory_usage", "params": {"data": "$result[1]"}},
{"operation": "stats_tool.detect_anomalies", "params": {"data": "$result[2]"}},
{"operation": "stats_tool.predict_failures", "params": {"data": "$result[3]"}}
]
analysis_results = await executor.execute_parallel_operations(analysis_operations)
# Check if alerting is needed
alert_operations = [
{"operation": "search_api.check_alert_conditions", "params": {"analysis": analysis_results}},
{"operation": "search_api.send_notifications", "params": {"alerts": "$result[0]"}},
{"operation": "search_api.update_dashboard", "params": {"metrics": metrics_results}}
]
await executor.execute_operations_sequence(
alert_operations,
user_id="system_monitor",
task_id=f"monitoring_{int(time.time())}"
)
# Wait for next monitoring cycle
await asyncio.sleep(60) # Monitor every minute
Usage Examples
Basic Usage
from aiecs.application.executors.operation_executor import OperationExecutor
from aiecs.tools.tool_executor import ToolExecutor
from aiecs.utils.execution_utils import ExecutionUtils
# Initialize
tool_executor = ToolExecutor()
execution_utils = ExecutionUtils()
config = {"rate_limit_requests_per_second": 10}
executor = OperationExecutor(tool_executor, execution_utils, config)
# Execute single operation
result = await executor.execute_operation(
"pandas_tool.read_csv",
{"file_path": "data.csv"}
)
# Batch execution
operations = [
{"operation": "pandas_tool.read_csv", "params": {"file_path": "data1.csv"}},
{"operation": "pandas_tool.read_csv", "params": {"file_path": "data2.csv"}}
]
results = await executor.batch_execute_operations(operations)
Advanced Usage
# Sequential execution with parameter references
operations = [
{"operation": "pandas_tool.read_csv", "params": {"file_path": "data.csv"}},
{"operation": "pandas_tool.describe", "params": {"df": "$result[0]"}}
]
results = await executor.execute_operations_sequence(
operations,
user_id="user123",
task_id="task456",
stop_on_failure=True
)
Version History
v1.0.0: Initial version, basic operation execution functionality
v1.1.0: Added batch processing and concurrency control
v1.2.0: Added parameter references and cache support
v1.3.0: Added tool management and statistics functionality