CeleryTaskManager Technical Documentation
1. Overview
Purpose
CeleryTaskManager is a component specifically designed for distributed task scheduling and execution, built on the Celery framework. It provides core functionalities including asynchronous task execution, queue management, task monitoring, and batch processing, serving as the core infrastructure for the task execution layer in the AIECS system.
Core Value
Distributed Task Execution: Supports task distribution and execution across multiple worker nodes
Queue Management: Provides separate queue management for fast tasks and heavy tasks
Asynchronous Processing: Asynchronous task execution based on asyncio, improving system throughput
Task Monitoring: Provides task status tracking, result retrieval, and cancellation functionality
Fault Tolerance: Built-in timeout handling, error retry, and exception recovery mechanisms
2. Problem Background & Design Motivation
Problem Background
The AIECS system needs to handle a large number of compute-intensive tasks, including:
AI Model Inference: Requires significant computational resources with uncertain execution times
Data Processing Tasks: Involves heavy operations such as large file processing and data transformation
Batch Task Processing: Needs to process multiple related tasks simultaneously
Long-Running Tasks: Some tasks may take minutes to hours to complete
Design Motivation
Performance Isolation: Separates heavy tasks from fast tasks to avoid mutual interference
Resource Optimization: Achieves reasonable resource allocation through queue management
Scalability: Supports horizontal scaling, dynamically adjusting worker nodes based on load
Reliability: Provides task status tracking and error recovery mechanisms
Monitoring Capability: Provides detailed task execution statistics and queue status information
3. Architecture Positioning & Context
System Architecture Location
┌─────────────────────────────────────────────────────────────┐
│ AIECS System Architecture │
├─────────────────────────────────────────────────────────────┤
│ API Layer (FastAPI) │
├─────────────────────────────────────────────────────────────┤
│ Domain Layer (Task, Execution) │
├─────────────────────────────────────────────────────────────┤
│ Infrastructure Layer │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ CeleryTaskManager│ │ Other Infrastructure│ │
│ └─────────────────┘ └─────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Message Broker (Redis) │
├─────────────────────────────────────────────────────────────┤
│ Worker Nodes (Celery Workers) │
└─────────────────────────────────────────────────────────────┘
Upstream Callers
TaskService: Task management service responsible for task creation and scheduling
DSLProcessor: DSL processor that executes complex task workflows
API Controllers: REST API controllers that handle user requests
Downstream Dependencies
Redis: Serves as message broker and result storage
Celery Workers: Worker nodes that actually execute tasks
TaskStatus/ErrorCode: Task status and error code definitions
4. Core Features & Use Cases
4.1 Task Execution Functionality
Basic Task Execution
# Create task manager instance
config = {
'broker_url': 'redis://localhost:6379/0',
'backend_url': 'redis://localhost:6379/0',
'task_timeout_seconds': 300
}
task_manager = CeleryTaskManager(config)
# Execute a single task
result = await task_manager.execute_task(
task_name="data_processing",
input_data={
"file_path": "/data/input.csv",
"task_id": "task_123",
"step": 1
},
context={
"user_id": "user_456",
"mode": "batch"
}
)
Heavy Task Execution
# Execute compute-intensive task
heavy_result = await task_manager.execute_heavy_task(
task_name="ml_model_training",
input_data={
"dataset_path": "/data/training_data.parquet",
"model_config": {"epochs": 100, "batch_size": 32}
},
context={"user_id": "user_789"}
)
4.2 DSL Task Step Execution
Complex Workflow Execution
# Execute DSL-defined task step
dsl_step = {
"task": "data_validation",
"parameters": {
"validation_rules": ["not_null", "data_type_check"],
"error_threshold": 0.05
}
}
step_result = await task_manager.execute_dsl_task_step(
step=dsl_step,
input_data={"data_source": "database_table"},
context={
"user_id": "user_123",
"task_id": "workflow_456",
"step": 2
}
)
4.3 Batch Task Processing
Batch Execution of Multiple Tasks
# Prepare batch tasks
tasks = [
{
"task_name": "image_resize",
"input_data": {"image_path": f"/images/img_{i}.jpg"},
"context": {"user_id": "user_123"}
}
for i in range(100)
]
# Batch execution
batch_results = await task_manager.batch_execute_tasks(tasks)
4.4 Task Monitoring and Management
Task Status Query
# Get task result
task_result = task_manager.get_task_result("task_123")
print(f"Task status: {task_result['status']}")
print(f"Execution result: {task_result['result']}")
# Cancel task
success = task_manager.cancel_task("task_123")
Queue Status Monitoring
# Get queue information
queue_info = task_manager.get_queue_info()
print(f"Active tasks: {queue_info['active_tasks']}")
print(f"Scheduled tasks: {queue_info['scheduled_tasks']}")
# Get worker statistics
worker_stats = task_manager.get_worker_stats()
print(f"Worker statistics: {worker_stats}")
5. API Reference
5.1 Class Definition
CeleryTaskManager
class CeleryTaskManager:
"""Celery distributed task scheduling and execution manager"""
def __init__(self, config: Dict[str, Any]) -> None
"""Initialize task manager
Args:
config: Configuration dictionary containing Celery-related configuration
"""
5.2 Public Methods
execute_task
async def execute_task(
self,
task_name: str,
input_data: Dict[str, Any],
context: Dict[str, Any]
) -> Any
Function: Execute a single asynchronous task
Parameters:
task_name(str): Task nameinput_data(Dict[str, Any]): Task input datacontext(Dict[str, Any]): Task context information
Returns:
Any: Task execution result
Exceptions:
CeleryTimeoutError: Task execution timeoutException: Other execution exceptions
execute_heavy_task
async def execute_heavy_task(
self,
task_name: str,
input_data: Dict[str, Any],
context: Dict[str, Any]
) -> Any
Function: Execute a heavy task (automatically assigned to heavy task queue)
Parameters: Same as execute_task
Returns: Same as execute_task
execute_dsl_task_step
async def execute_dsl_task_step(
self,
step: Dict[str, Any],
input_data: Dict[str, Any],
context: Dict[str, Any]
) -> Dict[str, Any]
Function: Execute a DSL-defined task step
Parameters:
step(Dict[str, Any]): DSL step definitioninput_data(Dict[str, Any]): Input datacontext(Dict[str, Any]): Context information
Returns:
Dict[str, Any]: Dictionary containing step execution result
batch_execute_tasks
async def batch_execute_tasks(
self,
tasks: List[Dict[str, Any]]
) -> List[Any]
Function: Batch execute multiple tasks
Parameters:
tasks(List[Dict[str, Any]]): Task list
Returns:
List[Any]: Task execution result list
get_task_result
def get_task_result(self, task_id: str) -> Dict[str, Any]
Function: Get task execution result
Parameters:
task_id(str): Task ID
Returns:
Dict[str, Any]: Task status and result information
cancel_task
def cancel_task(self, task_id: str) -> bool
Function: Cancel specified task
Parameters:
task_id(str): Task ID
Returns:
bool: Whether cancellation operation succeeded
get_queue_info
def get_queue_info(self) -> Dict[str, Any]
Function: Get queue status information
Returns:
Dict[str, Any]: Queue status statistics
get_worker_stats
def get_worker_stats(self) -> Dict[str, Any]
Function: Get worker node statistics
Returns:
Dict[str, Any]: Worker node statistics
6. Technical Implementation Details
6.1 Queue Management Strategy
Dual Queue Architecture
task_queues = {
'fast_tasks': {
'exchange': 'fast_tasks',
'routing_key': 'fast_tasks'
},
'heavy_tasks': {
'exchange': 'heavy_tasks',
'routing_key': 'heavy_tasks'
}
}
fast_tasks: Handles lightweight tasks such as data validation, simple calculations, etc.
heavy_tasks: Handles compute-intensive tasks such as machine learning training, big data processing, etc.
Worker Node Configuration
worker_concurrency = {
'fast_worker': 10, # Fast task worker node concurrency
'heavy_worker': 2 # Heavy task worker node concurrency
}
6.2 Task Type Auto-Detection
The system automatically determines task type through the following logic:
# 1. Try to get task type
task_type_result = await self.execute_task(task_name, {"get_task_type": True}, context)
# 2. Determine queue based on return result
if isinstance(task_type_result, dict) and "task_type" in task_type_result:
task_type = task_type_result["task_type"]
else:
task_type = "fast" # Default type
# 3. Select corresponding queue
queue = "heavy_tasks" if task_type == "heavy" else "fast_tasks"
6.3 Timeout Handling Mechanism
Multi-Layer Timeout Control
# 1. Configuration-level timeout
timeout_seconds = self.config.get('task_timeout_seconds', 300)
# 2. Task-level timeout check
while not celery_task.ready():
if time.time() - start_time > timeout_seconds:
raise AsyncioTimeoutError(f"Task {task_name} timed out after {timeout_seconds} seconds")
await asyncio.sleep(0.5)
# 3. Celery native timeout
result = celery_task.get(timeout=timeout)
6.4 Error Handling Strategy
Exception Classification Handling
try:
# Task execution logic
result = await self.execute_task(...)
except CeleryTimeoutError as e:
# Timeout error
return {
"status": TaskStatus.TIMED_OUT,
"error_code": ErrorCode.TIMEOUT_ERROR,
"error_message": str(e)
}
except Exception as e:
# Other execution errors
return {
"status": TaskStatus.FAILED,
"error_code": ErrorCode.EXECUTION_ERROR,
"error_message": str(e)
}
6.5 Batch Processing Optimization
Batch Processing Strategy
# 1. Batch processing
batch_size = self.config.get('batch_size', 10)
for i in range(0, len(tasks), batch_size):
batch = tasks[i:i + batch_size]
# 2. Concurrent execution
batch_results = await asyncio.gather(
*[self.execute_task(...) for task in batch],
return_exceptions=True
)
# 3. Rate limiting
rate_limit = self.config.get('rate_limit_requests_per_second', 5)
await asyncio.sleep(1.0 / rate_limit)
7. Configuration & Deployment
7.1 Configuration Parameters
Required Configuration
config = {
# Message broker configuration
'broker_url': 'redis://localhost:6379/0',
'backend_url': 'redis://localhost:6379/0',
# Task timeout configuration
'task_timeout_seconds': 300,
# Batch processing configuration
'batch_size': 10,
'rate_limit_requests_per_second': 5
}
Optional Configuration
config = {
# Serialization configuration
'task_serializer': 'json',
'accept_content': ['json'],
'result_serializer': 'json',
# Timezone configuration
'timezone': 'UTC',
'enable_utc': True,
# Queue configuration
'task_queues': {
'fast_tasks': {'exchange': 'fast_tasks', 'routing_key': 'fast_tasks'},
'heavy_tasks': {'exchange': 'heavy_tasks', 'routing_key': 'heavy_tasks'}
},
# Worker node configuration
'worker_concurrency': {
'fast_worker': 10,
'heavy_worker': 2
}
}
7.2 Environment Variable Support
# Redis configuration
export CELERY_BROKER_URL="redis://localhost:6379/0"
export CELERY_BACKEND_URL="redis://localhost:6379/0"
# Task configuration
export CELERY_TASK_TIMEOUT="300"
export CELERY_BATCH_SIZE="10"
export CELERY_RATE_LIMIT="5"
# Queue configuration
export CELERY_FAST_WORKER_CONCURRENCY="10"
export CELERY_HEAVY_WORKER_CONCURRENCY="2"
7.3 Docker Deployment
Docker Compose Configuration
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
celery-worker-fast:
build: .
command: celery -A aiecs.tasks.worker worker --loglevel=info --queues=fast_tasks --concurrency=10
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_BACKEND_URL=redis://redis:6379/0
depends_on:
- redis
celery-worker-heavy:
build: .
command: celery -A aiecs.tasks.worker worker --loglevel=info --queues=heavy_tasks --concurrency=2
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_BACKEND_URL=redis://redis:6379/0
depends_on:
- redis
7.4 Production Environment Configuration
High Availability Configuration
# Production environment configuration example
production_config = {
'broker_url': 'redis://redis-cluster:6379/0',
'backend_url': 'redis://redis-cluster:6379/0',
'task_timeout_seconds': 1800, # 30 minutes
'batch_size': 5,
'rate_limit_requests_per_second': 2,
'worker_concurrency': {
'fast_worker': 20,
'heavy_worker': 4
},
# Enable result expiration
'result_expires': 3600,
# Enable task retry
'task_acks_late': True,
'worker_prefetch_multiplier': 1
}
8. Maintenance & Troubleshooting
8.1 Monitoring Metrics
Key Metrics
Task Execution Success Rate:
(Successful tasks / Total tasks) * 100%Average Task Execution Time: Average processing time for each queue
Queue Length: Number of pending tasks
Worker Node Status: Number of active worker nodes and health status
Error Rate: Error statistics categorized by error type
Monitoring Implementation
# Get system status
def get_system_health(self) -> Dict[str, Any]:
"""Get system health status"""
try:
inspect = self.celery_app.control.inspect()
# Worker node status
stats = inspect.stats()
active_tasks = inspect.active()
# Calculate key metrics
total_workers = len(stats) if stats else 0
total_active_tasks = sum(len(tasks) for tasks in active_tasks.values()) if active_tasks else 0
return {
"status": "healthy" if total_workers > 0 else "unhealthy",
"total_workers": total_workers,
"active_tasks": total_active_tasks,
"worker_stats": stats,
"timestamp": time.time()
}
except Exception as e:
return {
"status": "error",
"error": str(e),
"timestamp": time.time()
}
8.2 Common Issues & Solutions
Issue 1: Task Execution Timeout
Symptoms: Task remains in PENDING state for a long time, eventually returns timeout error
Possible Causes:
Worker node overload
Task complexity exceeds expectations
Network latency causing message delivery failure
Solutions:
# 1. Check worker node status
worker_stats = task_manager.get_worker_stats()
print(f"Worker node status: {worker_stats}")
# 2. Adjust timeout configuration
config['task_timeout_seconds'] = 600 # Increase to 10 minutes
# 3. Check queue status
queue_info = task_manager.get_queue_info()
print(f"Queue status: {queue_info}")
# 4. Restart worker nodes
# celery -A aiecs.tasks.worker worker --loglevel=info --queues=fast_tasks
Issue 2: Redis Connection Failure
Symptoms: Failed to initialize Celery error
Possible Causes:
Redis service not started
Network connection issues
Authentication configuration error
Solutions:
# 1. Check Redis service status
redis-cli ping
# 2. Check network connection
telnet redis-host 6379
# 3. Verify configuration
python -c "
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
print(r.ping())
"
Issue 3: Task Result Loss
Symptoms: Task executes successfully but result cannot be retrieved
Possible Causes:
Redis memory insufficient causing results to be cleaned up
Result expiration time set too short
Serialization/deserialization error
Solutions:
# 1. Check Redis memory usage
redis-cli info memory
# 2. Adjust result expiration time
config['result_expires'] = 7200 # 2 hours
# 3. Check serialization configuration
config['result_serializer'] = 'json'
config['task_serializer'] = 'json'
Issue 4: Worker Node Memory Leak
Symptoms: Worker node memory usage continuously increases
Possible Causes:
Memory not properly released during task processing
Large accumulation of task results
Third-party library memory leak
Solutions:
# 1. Monitor memory usage
import psutil
process = psutil.Process()
memory_info = process.memory_info()
print(f"Memory usage: {memory_info.rss / 1024 / 1024:.2f} MB")
# 2. Periodically clean results
config['result_expires'] = 3600 # 1 hour expiration
# 3. Limit worker node concurrency
config['worker_concurrency'] = {
'fast_worker': 5, # Reduce concurrency
'heavy_worker': 1
}
8.3 Performance Optimization Recommendations
Queue Optimization
# 1. Adjust queue configuration based on task characteristics
optimized_config = {
'task_queues': {
'fast_tasks': {
'exchange': 'fast_tasks',
'routing_key': 'fast_tasks',
'queue_arguments': {
'x-max-priority': 10, # Support priority
'x-message-ttl': 300000 # Message TTL
}
}
}
}
Batch Processing Optimization
# 2. Dynamically adjust batch size
def calculate_optimal_batch_size(queue_length: int, worker_count: int) -> int:
"""Calculate optimal batch size based on queue length and worker count"""
if queue_length < 10:
return 1
elif queue_length < 100:
return min(5, queue_length // worker_count)
else:
return min(10, queue_length // worker_count)
9. Visualizations
9.1 System Architecture Diagram
graph TB
subgraph "API Layer"
API[FastAPI Controllers]
DSL[DSL Processor]
TS[Task Service]
end
subgraph "Infrastructure Layer"
CTM[CeleryTaskManager]
Redis[(Redis Broker)]
end
subgraph "Worker Layer"
FW[Fast Worker<br/>Concurrency: 10]
HW[Heavy Worker<br/>Concurrency: 2]
end
subgraph "Task Queues"
FQ[fast_tasks]
HQ[heavy_tasks]
end
API --> TS
DSL --> CTM
TS --> CTM
CTM --> Redis
Redis --> FQ
Redis --> HQ
FQ --> FW
HQ --> HW
9.2 Task Execution Flow Diagram
sequenceDiagram
participant Client
participant CTM as CeleryTaskManager
participant Redis
participant Worker
Client->>CTM: execute_task(task_name, input_data, context)
CTM->>CTM: determine_task_type()
CTM->>CTM: select_queue()
CTM->>Redis: send_task(celery_task_name, kwargs, queue)
Redis->>Worker: dispatch_task()
Worker->>Worker: process_task()
Worker->>Redis: store_result()
CTM->>Redis: get_result(timeout)
Redis->>CTM: return_result()
CTM->>Client: return_result()
9.3 Error Handling Flow Diagram
flowchart TD
Start([Task Start]) --> Execute[Execute Task]
Execute --> Check{Task Complete?}
Check -->|Yes| Success[Return Success Result]
Check -->|No| Timeout{Timeout?}
Timeout -->|Yes| TimeoutError[Return Timeout Error]
Timeout -->|No| Exception{Exception?}
Exception -->|Yes| ExecutionError[Return Execution Error]
Exception -->|No| Check
Success --> End([End])
TimeoutError --> End
ExecutionError --> End
9.4 Queue Management Architecture Diagram
graph LR
subgraph "Task Distribution"
TM[TaskManager]
Router[Queue Router]
end
subgraph "Queue System"
FQ[fast_tasks<br/>Lightweight Tasks]
HQ[heavy_tasks<br/>Heavy Tasks]
end
subgraph "Worker Nodes"
FW1[Fast Worker 1]
FW2[Fast Worker 2]
FW3[Fast Worker N]
HW1[Heavy Worker 1]
HW2[Heavy Worker 2]
end
TM --> Router
Router -->|task_type=fast| FQ
Router -->|task_type=heavy| HQ
FQ --> FW1
FQ --> FW2
FQ --> FW3
HQ --> HW1
HQ --> HW2
10. Version History
v1.0.0 (2024-01-15)
New Features:
Basic Celery task manager implementation
Support for fast task and heavy task dual queue architecture
Implement asynchronous task execution interface
Add task status monitoring and result retrieval functionality
Technical Features:
Built on Celery 4.x
Support Redis as message broker
Implement task timeout and error handling mechanisms
Provide batch task processing capability
v1.1.0 (2024-02-01)
Feature Enhancements:
Add DSL task step execution support
Implement task type auto-detection mechanism
Enhance error handling and exception classification
Add worker node statistics and queue monitoring
Performance Optimizations:
Optimize batch processing algorithm
Add rate limiting mechanism
Improve memory usage efficiency
v1.2.0 (2024-03-01)
New Features:
Support task priority management
Add task cancellation and recovery functionality
Implement health check and monitoring metrics
Support dynamic configuration updates
Stability Improvements:
Enhance timeout handling mechanism
Improve error recovery strategy
Optimize connection pool management
v1.3.0 (2024-04-01)
Architecture Upgrades:
Upgrade to Celery 5.x
Support asynchronous task execution
Add task result caching mechanism
Implement distributed lock support
Monitoring Enhancements:
Add Prometheus metrics export
Implement task execution trace tracking
Support custom monitoring metrics
Appendix
B. External Dependencies
C. Contact Information
Technical Lead: AIECS Development Team
Issue Reporting: Through project Issue system
Documentation Updates: Regular maintenance, version synchronization