Celery Task Worker Technical Documentation
1. Overview
Purpose: worker.py is the core task execution engine of the AIECS system, based on Celery distributed task queue for asynchronous task processing. This module is responsible for executing various AI service tasks, including fast tasks and heavy tasks, pushing task progress in real-time through WebSocket, ensuring high availability and scalability of the system.
Core Value:
Asynchronous Task Processing: Support long-running AI tasks, avoid blocking main thread
Distributed Architecture: Based on Celery for horizontal scaling and load balancing
Real-time Progress Pushing: Real-time task execution status through WebSocket
Task Classification Management: Distinguish fast tasks and heavy tasks, optimize resource allocation
Fault Tolerance Mechanism: Complete error handling and state management
2. Problem Background & Design Motivation
2.1 Business Pain Points
In AI application development, the following key challenges are faced:
Synchronous Blocking Issues: AI task execution time is long, synchronous processing causes request timeouts
Resource Competition: Multiple users request AI services simultaneously, uneven resource allocation
Poor User Experience: Long waiting without feedback, users don’t know task status
System Scalability: Single-machine processing capacity is limited, cannot handle high concurrency
Task Management Complexity: Lack of unified task scheduling and monitoring mechanisms
2.2 Design Motivation
Based on the above pain points, designed a distributed task processing system based on Celery:
Asynchronous Decoupling: Separate task execution from request processing, improve system responsiveness
Queue Management: Distinguish task priority and resource requirements through different queues
Real-time Communication: Integrate WebSocket to push task progress in real-time
Service Discovery: Dynamically obtain AI service instances through registry center
State Tracking: Complete task lifecycle management and state updates
3. Architecture Positioning & Context
3.1 System Architecture Diagram
graph TB
subgraph "Client Layer"
A[Web Frontend] --> B[API Gateway]
B --> C[Task Scheduler]
end
subgraph "Task Processing Layer"
C --> D[Celery Worker]
D --> E[Fast Tasks Queue]
D --> F[Heavy Tasks Queue]
end
subgraph "Service Layer"
E --> G[AI Service Registry Center]
F --> G
G --> H[OpenAI Service]
G --> I[Vertex AI Service]
G --> J[Other AI Services]
end
subgraph "Communication Layer"
D --> K[WebSocket Server]
K --> A
end
subgraph "Storage Layer"
L[Redis Message Queue] --> D
M[Task State Storage] --> D
end
3.2 Upstream and Downstream Dependencies
Upstream Callers:
Task scheduler (API layer)
Business service layer
User request processors
Downstream Dependencies:
AI service registry center
WebSocket server
Redis message queue
Task state storage
Peer Components:
Configuration management system
Logging and monitoring system
Error handling mechanisms
3.3 Data Flow
sequenceDiagram
participant U as User
participant API as API Gateway
participant S as Task Scheduler
participant W as Celery Worker
participant AI as AI Service
participant WS as WebSocket
U->>API: Submit Task Request
API->>S: Create Task
S->>W: Distribute to Queue
W->>WS: Push Start State
WS->>U: Real-time Progress Updates
W->>AI: Execute AI Task
AI->>W: Return Result
W->>WS: Push Completion State
WS->>U: Task Completion Notification
4. Core Features & Use Cases
4.1 Fast Task Processing
Function Description: Process lightweight, fast-completing AI tasks, such as text analysis, simple Q&A, etc.
Core Features:
Low latency execution
High concurrency processing
Fast response
Use Cases:
# Fast text analysis task
from aiecs.tasks.worker import execute_task
# Submit fast task
task_result = execute_task.delay(
task_name="analyze_text",
user_id="user_123",
task_id="task_456",
step=1,
mode="chat",
service="text_analyzer",
input_data={"text": "Analyze the sentiment tendency of this text"},
context={"language": "en", "model": "gpt-3.5-turbo"}
)
# Get task result
result = task_result.get(timeout=30)
print(f"Analysis result: {result['result']}")
Real-world Application Cases:
Intelligent Customer Service: Quickly answer user common questions
Content Review: Real-time detection text content compliance
Language Detection: Automatically identify text language type
Sentiment Analysis: Analyze user comment sentiment analysis
4.2 Heavy Task Processing
Function Description: Process complex, time-consuming AI tasks, such as document generation, code analysis, image processing, etc.
Core Features:
Long execution support
Resource-intensive processing
Progress real-time updates
Use Cases:
# Heavy document generation task
from aiecs.tasks.worker import execute_heavy_task
# Submit heavy task
heavy_task = execute_heavy_task.delay(
task_name="generate_document",
user_id="user_123",
task_id="task_789",
step=1,
mode="document",
service="doc_generator",
input_data={
"topic": "AI development trends",
"length": "5000 words",
"style": "academic paper"
},
context={
"template": "academic",
"references": True,
"citations": True
}
)
Real-world Application Cases:
Technical Document Generation: Automatically generate complete technical documents based on requirements
Code Review: Deep analysis code quality and potential problems
Data Analysis Report: Process large amounts of data and generate analysis reports
Multimedia Processing: Image, video AI processing and analysis
4.3 Real-time Progress Pushing
Function Description: Push task execution status and progress information through WebSocket.
Progress Bar Display: Front-end real-time display task execution progress
State Notification: Task completion or failure timely notify users
Real-time Communication: Integrate WebSocket to push task progress in real-time
Service Discovery: Dynamically obtain AI service instances through registry center
State Tracking: Complete task lifecycle management and state updates
5. API Reference
5.1 Celery Application Configuration
celery_app
celery_app = Celery("ai_worker", broker=settings.celery_broker_url)
Configuration: Message broker configuration
Task Classification Management: Distinguish fast tasks and heavy tasks, optimize resource allocation
Real-time Communication Mechanism: Integrate WebSocket to push task progress in real-time
State Management: Use TaskStatus enumeration management task status
Support PENDING, RUNNING, COMPLETED, FAILED, etc. state updates
Error Handling Mechanism: Layered error handling
Error Propagation: Ensure exceptions don’t cause Worker crashes
Record detailed error information
Push error state to clients
Task Context Management: Context building
Resource Tracking: File operation tracking
Model usage records
Performance metrics collection
6. Configuration & Deployment
6.1 Environment Variable Configuration
Required Configuration:
# Celery message broker
CELERY_BROKER_URL=redis://localhost:6379/0
# WebSocket configuration
CORS_ALLOWED_ORIGINS=http://localhost:3000,http://localhost:3001
# AI service configuration
OPENAI_API_KEY=sk-...
VERTEX_PROJECT_ID=your-project-id
XAI_API_KEY=xai-...
Optional Configuration:
- **Celery Advanced Configuration**: Task timeout configuration
CELERY_TASK_SOFT_TIME_LIMIT=300
CELERY_TASK_TIME_LIMIT=600
# Concurrency configuration
CELERY_WORKER_CONCURRENCY=4
CELERY_WORKER_PREFETCH_MULTIPLIER=1
6.2 Dependency Management
Core Dependencies:
# requirements.txt
celery>=5.3.0
redis>=4.5.0
python-socketio>=5.8.0
Development Dependencies:
# requirements-dev.txt
pytest>=7.0.0
pytest-celery>=0.0.0
flower>=2.0.0 # Celery monitoring tools
6.3 Deployment Configuration
Docker Configuration:
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
# Start Celery Worker
CMD ["celery", "-A", "aiecs.tasks.worker", "worker", "--loglevel=info"]
Docker Compose Configuration:
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
celery-worker:
build: .
command: celery -A aiecs.tasks.worker worker --loglevel=info
depends_on:
- redis
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- REDIS_URL=redis://redis:6379/0
celery-beat:
build: .
command: celery -A aiecs.tasks.worker beat --loglevel=info
depends_on:
- redis
environment:
- CELERY_BROKER_URL=redis://localhost:6379/0
Kubernetes Configuration:
apiVersion: apps/v1
kind: Deployment
metadata:
name: celery-worker
spec:
replicas: 3
selector:
matchLabels:
app: celery-worker
template:
metadata:
labels:
app: celery-worker
spec:
containers:
- name: worker
image: aiecs/worker:latest
command: ["celery", "-A", "aiecs.tasks.worker", "worker", "--loglevel=info"]
env:
- name: CELERY_BROKER_URL
value: "redis://redis-service:6379/0"
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"
6.4 Monitoring Configuration
Flower Monitoring:
# Start Flower monitoring interface
celery -A aiecs.tasks.worker flower --port=5555
Prometheus Metrics:
from prometheus_client import Counter, Histogram, Gauge
# Define monitoring metrics
task_executions = Counter('celery_task_executions_total', 'Total task executions', ['task_name', 'status'])
task_duration = Histogram('celery_task_duration_seconds', 'Task execution duration', ['task_name'])
active_tasks = Gauge('celery_active_tasks', 'Number of active tasks')
7. Maintenance & Troubleshooting
7.1 Monitoring Metrics
Key Metrics:
Task execution success rate
Task execution time distribution
Queue length and backlog situation
Worker process status
Memory and CPU usage rate
Monitoring Dashboard:
# Grafana query examples
# Task execution success rate
sum(rate(celery_task_executions_total[5m])) by (status)
# Average task execution time
histogram_quantile(0.95, rate(celery_task_duration_seconds_bucket[5m]))
# Active tasks number
celery_active_tasks
7.2 Common Faults & Solutions
7.2.1 Worker Process Crashes
Symptoms:
Tasks long time in PENDING state
Worker process abnormal exit
Logs appear memory errors
Troubleshooting Steps:
Check Worker logs:
tail -f /var/log/celery/worker.logCheck system resources:
htoportopCheck Redis connection:
redis-cli pingView task details:
celery -A aiecs.tasks.worker inspect active
Solutions:
# Restart Worker process
celery -A aiecs.tasks.worker worker --loglevel=info --concurrency=2
# Clean zombie tasks
celery -A aiecs.tasks.worker purge
# Increase memory limit
celery -A aiecs.tasks.worker worker --max-memory-per-child=200000
7.2.2 Task Execution Timeout
Symptoms:
Task status display TIMED_OUT
Long time no response
Resource occupation too high
Troubleshooting Steps:
Check task complexity: Analyze input data size
Check AI service response: Test API calls
Check network connection: ping external services
View task details:
celery -A aiecs.tasks.worker inspect scheduled
Solutions:
# Increase task timeout time
@celery_app.task(bind=True, time_limit=1800) # 30 minutes
def execute_heavy_task(self, ...):
pass
# Optimize task execution
def _execute_service_task(self, ...):
try:
# Add timeout control
import signal
def timeout_handler(signum, frame):
raise TimeoutError("Task execution timeout")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(1800) # 30 minutes timeout
# Execute task
result = method(input_data, context)
signal.alarm(0) # Cancel timeout
return result
except TimeoutError:
logger.error("Task execution timeout")
raise
7.2.3 WebSocket Connection Issues
Symptoms:
Frontend cannot receive progress updates
WebSocket connection disconnected
Task status out of sync
Troubleshooting Steps:
Check WebSocket service status
Verify user registration:
connected_clientsCheck network connection:
telnet localhost 8000View WebSocket logs
Solutions:
# Add connection retry mechanism
async def push_progress_with_retry(user_id: str, data: dict, max_retries=3):
for attempt in range(max_retries):
try:
await push_progress(user_id, data)
break
except Exception as e:
if attempt == max_retries - 1:
logger.error(f"Failed to push progress after {max_retries} attempts: {e}")
else:
await asyncio.sleep(2 ** attempt) # Exponential backoff
# Add heartbeat detection
@sio.event
async def ping(sid):
await sio.emit('pong', to=sid)
7.3 Performance Optimization
Concurrency Optimization:
# Adjust concurrency based on CPU core count
celery -A aiecs.tasks.worker worker --concurrency=4
# Use event loop optimization
celery -A aiecs.tasks.worker worker --pool=eventlet --concurrency=1000
Memory Optimization:
# Limit child process memory usage
celery -A aiecs.tasks.worker worker --max-memory-per-child=200000
# Regularly restart Worker
celery -A aiecs.tasks.worker worker --max-tasks-per-child=1000
Queue Optimization:
# Configure queue priority
task_routes = {
'aiecs.tasks.worker.execute_task': {
'queue': 'fast_tasks',
'priority': 10
},
'aiecs.tasks.worker.execute_heavy_task': {
'queue': 'heavy_tasks',
'priority': 5
}
7.4 Log Analysis
Log Configuration:
import logging
# Configure Celery logs
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/celery/worker.log'),
logging.StreamHandler()
]
)
Key Log Patterns:
# Find error logs
grep "ERROR" /var/log/celery/worker.log | tail -100
# Analyze task execution time
grep "Task completed" /var/log/celery/worker.log | awk '{print $NF}' | sort -n
# Monitor memory usage
grep "Memory usage" /var/log/celery/worker.log
8. Visualizations
8.1 Task Execution Flow Diagram
flowchart TD
A[Task Submission] --> B{Task Type}
B -->|Fast Tasks| C[fast_tasks Queue]
B -->|Heavy Tasks| D[heavy_tasks Queue]
C --> E[Worker Process]
D --> E
E --> F[Get Service Instance]
F --> G[Execute Task Method]
G --> H{Execution Result}
H -->|Success| I[Push Completion State]
H -->|Failure| J[Push Failure State]
I --> K[Return Result]
J --> L[Return Error]
K --> M[WebSocket Notification]
L --> M
8.2 System Architecture Diagram
graph TB
subgraph "Client Layer"
A[Web Frontend] --> B[API Gateway]
end
subgraph "Task Scheduling Layer"
B --> C[Task Scheduler]
C --> D[Task Distributor]
end
subgraph "Queue Layer"
D --> E[Redis Message Queue]
E --> F[fast_tasks Queue]
E --> G[heavy_tasks Queue]
end
subgraph "Execution Layer"
F --> H[Worker Process 1]
G --> I[Worker Process 2]
G --> J[Worker Process 3]
end
subgraph "Service Layer"
H --> K[AI Service Registry Center]
I --> K
J --> K
K --> L[OpenAI Service]
K --> M[Vertex AI Service]
K --> N[Other AI Services]
end
subgraph "Communication Layer"
H --> O[WebSocket Server]
I --> O
J --> O
O --> A
end
8.3 Task State Transition Diagram
stateDiagram-v2
[*] --> PENDING: Task Creation
PENDING --> RUNNING: Start Execution
RUNNING --> COMPLETED: Execution Success
RUNNING --> FAILED: Execution Failure
RUNNING --> TIMED_OUT: Execution Timeout
RUNNING --> CANCELLED: User Cancellation
COMPLETED --> [*]
FAILED --> [*]
TIMED_OUT --> [*]
CANCELLED --> [*]
8.4 Performance Monitoring Diagram
xychart-beta
title "Task Execution Time Trend"
x-axis ["Jan", "Feb", "Mar", "Apr", "May"]
y-axis "Execution Time(seconds)" 0 --> 300
line [120, 95, 110, 85, 100]
bar [150, 120, 140, 110, 130]
9. Version History
v1.0.0 (2024-01-15)
New Features:
Implement basic Celery Worker architecture
Support fast tasks and heavy tasks classification
Integrate WebSocket real-time progress pushing
Add basic error handling mechanism
Technical Features:
Message queue based on Redis
JSON serialization support
Service registry center integration
Task state management
v1.1.0 (2024-02-01)
New Features:
Optimize task execution performance
Add task timeout control
Implement task retry mechanism
Enhance error handling and logging
Performance Optimizations:
Concurrency processing optimization
Memory usage optimization
Queue routing optimization
v1.2.0 (2024-03-01)
New Features:
Add task priority support
Implement task dependency management
Integrate monitoring and metrics collection
Support task batch processing
Monitoring Enhancements:
Prometheus metrics integration
Flower monitoring interface
Detailed performance analysis
v1.3.0 (2024-04-01) [Planned]
Planned Features:
Add task scheduler
Implement task persistence
Support task version management
Add task template system
Architecture Optimizations:
Microservice architecture support
Containerized deployment optimization
Cloud native integration
Appendix
B. Example Code
C. Technical Support
Technical Documentation: https://docs.aiecs.com
Issue Reporting: https://github.com/aiecs/issues
Community Discussion: https://discord.gg/aiecs