# ExecutorMetrics Technical Documentation
## 1. Overview
### Purpose
`ExecutorMetrics` is a component specifically designed for executor performance monitoring and metrics collection, built on the Prometheus client library. It provides collection, recording, and exposure of key performance metrics such as task execution latency, success rate, retry count, etc., serving as core infrastructure for monitoring and observability in the AIECS system.
### Core Value
- **Performance Monitoring**: Real-time monitoring of key metrics such as task execution latency and success rate
- **Observability**: Provides comprehensive visibility of system operational status
- **Problem Diagnosis**: Quickly locate performance bottlenecks and anomalies through metrics data
- **Capacity Planning**: Predict system capacity requirements based on historical data
- **SLA Assurance**: Ensure system meets service level agreement requirements
## 2. Problem Background & Design Motivation
### Problem Background
In the AIECS system, a large number of complex task executions need to be handled, including:
- **Performance Bottleneck Identification**: Unable to quickly identify which task types or operations cause performance issues
- **System Health Monitoring**: Lack of real-time monitoring of overall system operational status
- **Capacity Planning Difficulties**: Unable to predict system load and resource requirements based on historical data
- **Complex Troubleshooting**: Difficult to quickly locate root causes when problems occur
- **Missing SLA Monitoring**: Unable to quantify whether the system meets service level agreements
### Design Motivation
1. **Performance Optimization**: Identify performance bottlenecks through metrics data to guide system optimization
2. **Fault Prevention**: Discover potential issues early through monitoring metrics
3. **Capacity Management**: Develop capacity planning strategies based on historical data
4. **Operational Efficiency**: Provide automated monitoring and alerting capabilities
5. **Business Insights**: Understand business usage patterns through metrics data
## 3. Architecture Positioning & Context
### System Architecture Location
```
┌─────────────────────────────────────────────────────────────┐
│ AIECS System Architecture │
├─────────────────────────────────────────────────────────────┤
│ Monitoring Layer │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ ExecutorMetrics │ │ Prometheus │ │
│ └─────────────────┘ └─────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Infrastructure Layer │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ CeleryTaskManager│ │ WebSocketManager│ │
│ └─────────────────┘ └─────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Domain Layer │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ TaskService │ │ DSLProcessor │ │
│ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────┘
```
### Upstream Callers
- **TaskService**: Task management service that needs to monitor task execution performance
- **DSLProcessor**: DSL processor that needs to monitor plan generation and execution performance
- **CeleryTaskManager**: Task executor that needs to monitor task execution metrics
- **WebSocketManager**: WebSocket manager that needs to monitor connection performance
### Downstream Dependencies
- **Prometheus**: Metrics collection and storage system
- **Grafana**: Metrics visualization and dashboards
- **AlertManager**: Alert system based on metrics
- **prometheus_client**: Python Prometheus client library
## 4. Core Features & Use Cases
### 4.1 Metrics Collection and Recording
#### Latency Metrics Recording
```python
# Create metrics collector
metrics = ExecutorMetrics(enable_metrics=True, metrics_port=8001)
# Record intent parsing latency
start_time = time.time()
intent_result = await parse_user_intent(user_input)
duration = time.time() - start_time
metrics.record_operation_latency("intent", duration)
# Record task planning latency
start_time = time.time()
plan = await generate_task_plan(intent_result)
duration = time.time() - start_time
metrics.record_operation_latency("plan", duration)
# Record task execution latency (with labels)
start_time = time.time()
result = await execute_task(task_data)
duration = time.time() - start_time
metrics.record_operation_latency("execute", duration)
```
#### Success Rate Metrics Recording
```python
# Record successful operations
try:
result = await process_task(task_data)
metrics.record_operation_success("execute", labels={"task_type": "data_processing"})
except Exception as e:
metrics.record_operation_failure("execute", "execution_error",
labels={"task_type": "data_processing"})
# Record retry count
for attempt in range(max_retries):
try:
result = await retry_operation()
break
except Exception:
metrics.record_retry("execute", attempt + 1)
```
### 4.2 Decorator Monitoring
#### Automatic Monitoring with Decorator
```python
# Use decorator to monitor function performance
@metrics.with_metrics("intent_parsing")
async def parse_user_intent(user_input: str) -> Dict[str, Any]:
"""Parse user intent"""
# Function execution logic
return intent_result
@metrics.with_metrics("task_planning", labels={"plan_type": "complex"})
async def generate_complex_plan(intent: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Generate complex task plan"""
# Plan generation logic
return plan_steps
# Monitoring with labels
@metrics.with_metrics("task_execution", labels={"task_type": "ml_training"})
async def execute_ml_training(data: Dict[str, Any]) -> Dict[str, Any]:
"""Execute machine learning training task"""
# Training logic
return training_result
```
### 4.3 Comprehensive Operation Recording
#### Record Complete Operation Metrics
```python
# Record complete operation metrics
async def process_user_request(user_input: str, user_id: str):
"""Complete process for handling user request"""
start_time = time.time()
try:
# 1. Intent parsing
intent_start = time.time()
intent = await parse_user_intent(user_input)
intent_duration = time.time() - intent_start
# 2. Task planning
plan_start = time.time()
plan = await generate_task_plan(intent)
plan_duration = time.time() - plan_start
# 3. Task execution
execute_start = time.time()
result = await execute_task(plan)
execute_duration = time.time() - execute_start
# Record success metrics
metrics.record_operation("intent", success=True, duration=intent_duration)
metrics.record_operation("plan", success=True, duration=plan_duration)
metrics.record_operation("execute", success=True, duration=execute_duration,
labels={"task_type": plan.get("type", "unknown")})
return result
except Exception as e:
# Record failure metrics
total_duration = time.time() - start_time
metrics.record_operation("intent", success=False, duration=intent_duration,
error_type=type(e).__name__)
raise
```
### 4.4 Custom Metrics Recording
#### Business-Specific Metrics
```python
# Record business-specific metrics
class BusinessMetrics:
def __init__(self, base_metrics: ExecutorMetrics):
self.base_metrics = base_metrics
async def record_user_engagement(self, user_id: str, action: str, duration: float):
"""Record user engagement metrics"""
self.base_metrics.record_operation(
"user_engagement",
success=True,
duration=duration,
labels={"action": action, "user_id": user_id}
)
async def record_data_processing_volume(self, data_size: int, processing_time: float):
"""Record data processing volume metrics"""
self.base_metrics.record_operation(
"data_processing",
success=True,
duration=processing_time,
labels={"data_size_category": self._categorize_data_size(data_size)}
)
def _categorize_data_size(self, size: int) -> str:
"""Categorize by data size"""
if size < 1024:
return "small"
elif size < 1024 * 1024:
return "medium"
else:
return "large"
```
### 4.5 Metrics Query and Monitoring
#### Get Metrics Summary
```python
# Get metrics summary information
summary = metrics.get_metrics_summary()
print(f"Metrics enabled status: {summary['metrics_enabled']}")
print(f"Metrics port: {summary['metrics_port']}")
print(f"Available metrics: {summary['available_metrics']}")
# Check if specific metric is available
if "intent_latency" in metrics.metrics:
print("Intent parsing latency metric enabled")
```
#### Prometheus Metrics Query Examples
```promql
# Query intent parsing average latency
rate(intent_latency_seconds_sum[5m]) / rate(intent_latency_seconds_count[5m])
# Query task execution success rate
rate(execute_success_total[5m]) / (rate(execute_success_total[5m]) + rate(execute_retries_total[5m]))
# Query execution latency grouped by task type
histogram_quantile(0.95, rate(execute_latency_seconds_bucket[5m]) by (le, task_type))
```
## 5. API Reference
### 5.1 Class Definition
#### `ExecutorMetrics`
```python
class ExecutorMetrics:
"""Executor performance monitoring and metrics collector"""
def __init__(self, enable_metrics: bool = True, metrics_port: int = 8001) -> None
"""Initialize metrics collector
Args:
enable_metrics: Whether to enable metrics collection
metrics_port: Prometheus metrics server port
"""
```
### 5.2 Public Methods
#### `record_operation_latency`
```python
def record_operation_latency(self, operation: str, duration: float) -> None
```
**Function**: Record operation latency
**Parameters**:
- `operation` (str): Operation name
- `duration` (float): Operation duration (seconds)
#### `record_operation_success`
```python
def record_operation_success(self, operation: str, labels: Optional[Dict[str, str]] = None) -> None
```
**Function**: Record operation success
**Parameters**:
- `operation` (str): Operation name
- `labels` (Optional[Dict[str, str]]): Labels dictionary
#### `record_operation_failure`
```python
def record_operation_failure(self, operation: str, error_type: str, labels: Optional[Dict[str, str]] = None) -> None
```
**Function**: Record operation failure
**Parameters**:
- `operation` (str): Operation name
- `error_type` (str): Error type
- `labels` (Optional[Dict[str, str]]): Labels dictionary
#### `record_retry`
```python
def record_retry(self, operation: str, attempt_number: int) -> None
```
**Function**: Record retry count
**Parameters**:
- `operation` (str): Operation name
- `attempt_number` (int): Attempt number
#### `with_metrics`
```python
def with_metrics(self, metric_name: str, labels: Optional[Dict[str, str]] = None) -> Callable
```
**Function**: Monitoring decorator
**Parameters**:
- `metric_name` (str): Metric name
- `labels` (Optional[Dict[str, str]]): Labels dictionary
**Returns**:
- `Callable`: Decorator function
#### `get_metrics_summary`
```python
def get_metrics_summary(self) -> Dict[str, Any]
```
**Function**: Get metrics summary
**Returns**:
- `Dict[str, Any]`: Metrics summary information
#### `record_operation`
```python
def record_operation(self, operation_type: str, success: bool = True, duration: Optional[float] = None, **kwargs) -> None
```
**Function**: Record comprehensive operation metrics
**Parameters**:
- `operation_type` (str): Operation type
- `success` (bool): Whether successful
- `duration` (Optional[float]): Operation duration
- `**kwargs`: Other parameters (labels, error_type, etc.)
#### `record_duration`
```python
def record_duration(self, operation: str, duration: float, labels: Optional[Dict[str, str]] = None) -> None
```
**Function**: Record operation duration
**Parameters**:
- `operation` (str): Operation name
- `duration` (float): Duration
- `labels` (Optional[Dict[str, str]]): Labels dictionary
## 6. Technical Implementation Details
### 6.1 Prometheus Metric Types
#### Histogram Metrics
```python
# Latency metrics use Histogram type
"intent_latency": Histogram("intent_latency_seconds", "Latency of intent parsing"),
"plan_latency": Histogram("plan_latency_seconds", "Latency of task planning"),
"execute_latency": Histogram("execute_latency_seconds", "Latency of task execution", ["task_type"])
# Histogram automatically provides the following metrics:
# - intent_latency_seconds_count: Total request count
# - intent_latency_seconds_sum: Total latency time
# - intent_latency_seconds_bucket: Bucket statistics
```
#### Counter Metrics
```python
# Count metrics use Counter type
"intent_success": Counter("intent_success_total", "Number of successful intent parsings"),
"intent_retries": Counter("intent_retries_total", "Number of intent parsing retries"),
"execute_success": Counter("execute_success_total", "Number of successful executions", ["task_type"])
# Counter only increases, suitable for counting successes, retries, etc.
```
### 6.2 Decorator Implementation Mechanism
#### Async Function Monitoring
```python
def with_metrics(self, metric_name: str, labels: Optional[Dict[str, str]] = None):
"""Monitoring decorator implementation"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
if not self.metrics or f"{metric_name}_latency" not in self.metrics:
return await func(*args, **kwargs)
labels_dict = labels or {}
metric = self.metrics[f"{metric_name}_latency"]
if labels:
metric = metric.labels(**labels_dict)
# Use context manager to automatically record time
with metric.time():
try:
result = await func(*args, **kwargs)
# Record success metric
if f"{metric_name}_success" in self.metrics:
success_metric = self.metrics[f"{metric_name}_success"]
if labels:
success_metric = success_metric.labels(**labels_dict)
success_metric.inc()
return result
except Exception as e:
logger.error(f"Error in {func.__name__}: {e}")
raise
return wrapper
return decorator
```
### 6.3 Label Management Mechanism
#### Dynamic Label Support
```python
def record_operation_success(self, operation: str, labels: Optional[Dict[str, str]] = None):
"""Record operation success (with label support)"""
if not self.enable_metrics or f"{operation}_success" not in self.metrics:
return
metric = self.metrics[f"{operation}_success"]
if labels:
# Create labeled instance for labeled metrics
metric = metric.labels(**labels)
metric.inc()
```
#### Label Validation and Cleaning
```python
def _validate_labels(self, labels: Dict[str, str]) -> Dict[str, str]:
"""Validate and clean labels"""
if not labels:
return {}
# Remove empty values and invalid characters
cleaned_labels = {}
for key, value in labels.items():
if value and isinstance(value, str) and len(value.strip()) > 0:
# Clean label value, remove special characters
cleaned_value = re.sub(r'[^a-zA-Z0-9_:]', '_', str(value).strip())
cleaned_labels[key] = cleaned_value
return cleaned_labels
```
### 6.4 Error Handling Strategy
#### Metrics Recording Fault Tolerance
```python
def record_operation(self, operation_type: str, success: bool = True, duration: Optional[float] = None, **kwargs):
"""Record comprehensive operation metrics (with fault tolerance)"""
if not self.enable_metrics:
return
try:
# Record operation success/failure
if success:
self.record_operation_success(operation_type, kwargs.get('labels'))
else:
error_type = kwargs.get('error_type', 'unknown')
self.record_operation_failure(operation_type, error_type, kwargs.get('labels'))
# Record operation latency
if duration is not None:
self.record_operation_latency(operation_type, duration)
except Exception as e:
# Metrics recording failure should not affect business logic
logger.warning(f"Failed to record operation metrics: {e}")
```
#### Metrics Initialization Fault Tolerance
```python
def _init_prometheus_metrics(self):
"""Initialize Prometheus metrics (with fault tolerance)"""
try:
start_http_server(self.metrics_port)
self.metrics = {
# Metric definitions...
}
logger.info(f"Prometheus metrics server started on port {self.metrics_port}")
except Exception as e:
logger.warning(f"Failed to start metrics server: {e}")
# Degrade to no-metrics mode
self.metrics = {}
self.enable_metrics = False
```
### 6.5 Performance Optimization Mechanism
#### Conditional Check Optimization
```python
def record_operation_latency(self, operation: str, duration: float):
"""Record operation latency (optimized version)"""
# Fast path: if metrics not enabled, return directly
if not self.enable_metrics:
return
# Check if metric exists
metric_key = f"{operation}_latency"
if metric_key not in self.metrics:
return
# Record metric
self.metrics[metric_key].observe(duration)
```
#### Batch Metrics Recording
```python
def record_batch_operations(self, operations: List[Dict[str, Any]]):
"""Batch record operation metrics"""
if not self.enable_metrics:
return
for op in operations:
try:
if op.get('success', True):
self.record_operation_success(
op['operation'],
op.get('labels')
)
else:
self.record_operation_failure(
op['operation'],
op.get('error_type', 'unknown'),
op.get('labels')
)
if 'duration' in op:
self.record_operation_latency(
op['operation'],
op['duration']
)
except Exception as e:
logger.warning(f"Failed to record batch operation: {e}")
```
## 7. Configuration & Deployment
### 7.1 Basic Configuration
#### Metrics Collector Configuration
```python
# Basic configuration
metrics = ExecutorMetrics(
enable_metrics=True,
metrics_port=8001
)
# Production environment configuration
production_metrics = ExecutorMetrics(
enable_metrics=True,
metrics_port=8001
)
```
#### Environment Variable Support
```bash
# Metrics configuration
export METRICS_ENABLED="true"
export METRICS_PORT="8001"
export METRICS_HOST="0.0.0.0"
# Prometheus configuration
export PROMETHEUS_ENDPOINT="http://prometheus:9090"
export PROMETHEUS_RETENTION="30d"
```
### 7.2 Docker Deployment
#### Dockerfile Configuration
```dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
# Expose metrics port
EXPOSE 8001
# Start command
CMD ["python", "-m", "aiecs.infrastructure.monitoring.executor_metrics"]
```
#### Docker Compose Configuration
```yaml
version: '3.8'
services:
executor-metrics:
build: .
ports:
- "8001:8001"
environment:
- METRICS_ENABLED=true
- METRICS_PORT=8001
volumes:
- ./logs:/app/logs
restart: unless-stopped
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'
```
### 7.3 Prometheus Configuration
#### prometheus.yml Configuration
```yaml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'executor-metrics'
static_configs:
- targets: ['executor-metrics:8001']
scrape_interval: 5s
metrics_path: /metrics
- job_name: 'aiecs-api'
static_configs:
- targets: ['aiecs-api:8000']
scrape_interval: 10s
```
### 7.4 Grafana Dashboard Configuration
#### Dashboard JSON Configuration
```json
{
"dashboard": {
"title": "AIECS Executor Metrics",
"panels": [
{
"title": "Request Rate",
"type": "graph",
"targets": [
{
"expr": "rate(intent_success_total[5m])",
"legendFormat": "Intent Success Rate"
}
]
},
{
"title": "Latency Percentiles",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(execute_latency_seconds_bucket[5m]))",
"legendFormat": "95th percentile"
}
]
}
]
}
}
```
## 8. Maintenance & Troubleshooting
### 8.1 Monitoring Metrics
#### Key Metrics
- **Request Rate**: `rate(intent_success_total[5m])`
- **Error Rate**: `rate(intent_retries_total[5m]) / rate(intent_success_total[5m])`
- **Latency Percentiles**: `histogram_quantile(0.95, rate(execute_latency_seconds_bucket[5m]))`
- **Success Rate**: `rate(execute_success_total[5m]) / (rate(execute_success_total[5m]) + rate(execute_retries_total[5m]))`
#### Monitoring Implementation
```python
class MetricsMonitor:
def __init__(self, metrics: ExecutorMetrics):
self.metrics = metrics
self.alert_thresholds = {
"error_rate": 0.05, # 5% error rate threshold
"latency_p95": 5.0, # 95% latency 5 second threshold
"success_rate": 0.95 # 95% success rate threshold
}
def check_health(self) -> Dict[str, Any]:
"""Check system health status"""
if not self.metrics.enable_metrics:
return {"status": "disabled", "message": "Metrics disabled"}
# Add specific health check logic here
return {
"status": "healthy",
"metrics_enabled": True,
"available_metrics": len(self.metrics.metrics)
}
```
### 8.2 Common Issues & Solutions
#### Issue 1: Metrics Server Startup Failure
**Symptoms**: `Failed to start metrics server` error
**Possible Causes**:
- Port already in use
- Insufficient permissions
- Network configuration issues
**Solutions**:
```python
# 1. Check port availability
import socket
def check_port_available(port: int) -> bool:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('localhost', port))
sock.close()
return result != 0
# 2. Use dynamic port
def find_available_port(start_port: int = 8001) -> int:
for port in range(start_port, start_port + 100):
if check_port_available(port):
return port
raise RuntimeError("No available port found")
# 3. Retry mechanism
def init_metrics_with_retry(max_retries: int = 3):
for attempt in range(max_retries):
try:
port = find_available_port()
metrics = ExecutorMetrics(metrics_port=port)
return metrics
except Exception as e:
if attempt == max_retries - 1:
raise
time.sleep(1)
```
#### Issue 2: Metrics Recording Failed
**Symptoms**: Metrics recording not working, data not updating
**Possible Causes**:
- Metrics not properly initialized
- Label format error
- Metric name mismatch
**Solutions**:
```python
# 1. Validate metrics initialization
def validate_metrics_initialization(metrics: ExecutorMetrics):
if not metrics.enable_metrics:
print("Warning: Metrics collection disabled")
return False
if not metrics.metrics:
print("Error: Metrics not properly initialized")
return False
print(f"Metrics initialized: {list(metrics.metrics.keys())}")
return True
# 2. Validate metrics recording
def test_metrics_recording(metrics: ExecutorMetrics):
try:
metrics.record_operation_success("test_operation")
metrics.record_operation_latency("test_operation", 1.0)
print("Metrics recording test successful")
return True
except Exception as e:
print(f"Metrics recording test failed: {e}")
return False
```
#### Issue 3: Prometheus Unable to Scrape Metrics
**Symptoms**: Prometheus shows target unreachable
**Possible Causes**:
- Network connection issues
- Firewall blocking
- Service not started
**Solutions**:
```bash
# 1. Check service status
curl http://localhost:8001/metrics
# 2. Check network connection
telnet localhost 8001
# 3. Check Prometheus configuration
promtool check config prometheus.yml
# 4. View Prometheus logs
docker logs prometheus
```
#### Issue 4: Metrics Data Inaccurate
**Symptoms**: Metrics data inconsistent with actual situation
**Possible Causes**:
- Incorrect metrics recording timing
- Improper label usage
- Concurrency issues
**Solutions**:
```python
# 1. Ensure correct metrics recording timing
async def correct_metrics_recording():
start_time = time.time()
try:
result = await process_task()
# Record on success
metrics.record_operation_success("process_task")
except Exception as e:
# Record on failure
metrics.record_operation_failure("process_task", type(e).__name__)
finally:
# Always record latency
duration = time.time() - start_time
metrics.record_operation_latency("process_task", duration)
# 2. Use thread-safe metrics recording
import threading
from concurrent.futures import ThreadPoolExecutor
class ThreadSafeMetrics:
def __init__(self, base_metrics: ExecutorMetrics):
self.base_metrics = base_metrics
self.lock = threading.Lock()
self.executor = ThreadPoolExecutor(max_workers=1)
def record_operation_async(self, operation: str, success: bool, duration: float):
"""Asynchronously record metrics, avoid blocking main thread"""
self.executor.submit(
self._record_with_lock, operation, success, duration
)
def _record_with_lock(self, operation: str, success: bool, duration: float):
"""Metrics recording with lock"""
with self.lock:
if success:
self.base_metrics.record_operation_success(operation)
else:
self.base_metrics.record_operation_failure(operation, "unknown")
self.base_metrics.record_operation_latency(operation, duration)
```
### 8.3 Performance Optimization Recommendations
#### Metrics Recording Optimization
```python
# 1. Batch metrics recording
class BatchMetricsRecorder:
def __init__(self, metrics: ExecutorMetrics, batch_size: int = 100):
self.metrics = metrics
self.batch_size = batch_size
self.batch = []
self.lock = threading.Lock()
def add_metric(self, operation: str, success: bool, duration: float):
"""Add metric to batch"""
with self.lock:
self.batch.append({
'operation': operation,
'success': success,
'duration': duration,
'timestamp': time.time()
})
if len(self.batch) >= self.batch_size:
self._flush_batch()
def _flush_batch(self):
"""Flush batch metrics"""
if not self.batch:
return
for metric in self.batch:
try:
if metric['success']:
self.metrics.record_operation_success(metric['operation'])
else:
self.metrics.record_operation_failure(metric['operation'], 'unknown')
self.metrics.record_operation_latency(metric['operation'], metric['duration'])
except Exception as e:
logger.warning(f"Failed to record batch metric: {e}")
self.batch.clear()
# 2. Async metrics recording
import asyncio
from asyncio import Queue
class AsyncMetricsRecorder:
def __init__(self, metrics: ExecutorMetrics):
self.metrics = metrics
self.queue = Queue(maxsize=1000)
self.running = False
async def start(self):
"""Start async metrics recorder"""
self.running = True
asyncio.create_task(self._process_queue())
async def record_metric(self, operation: str, success: bool, duration: float):
"""Asynchronously record metric"""
try:
await self.queue.put({
'operation': operation,
'success': success,
'duration': duration
})
except asyncio.QueueFull:
logger.warning("Metrics queue is full, dropping metric")
async def _process_queue(self):
"""Process metrics queue"""
while self.running:
try:
metric = await asyncio.wait_for(self.queue.get(), timeout=1.0)
# Record metric
if metric['success']:
self.metrics.record_operation_success(metric['operation'])
else:
self.metrics.record_operation_failure(metric['operation'], 'unknown')
self.metrics.record_operation_latency(metric['operation'], metric['duration'])
except asyncio.TimeoutError:
continue
except Exception as e:
logger.error(f"Error processing metrics queue: {e}")
```
## 9. Visualizations
### 9.1 System Architecture Diagram
```mermaid
graph TB
subgraph "Application Layer"
APP[AIECS Application]
TS[TaskService]
DS[DSLProcessor]
end
subgraph "Monitoring Layer"
EM[ExecutorMetrics]
PM[Prometheus]
GF[Grafana]
end
subgraph "Infrastructure Layer"
CTM[CeleryTaskManager]
WSM[WebSocketManager]
end
APP --> EM
TS --> EM
DS --> EM
CTM --> EM
WSM --> EM
EM --> PM
PM --> GF
```
### 9.2 Metrics Collection Flow Diagram
```mermaid
sequenceDiagram
participant App as Application
participant EM as ExecutorMetrics
participant PM as Prometheus
participant GF as Grafana
App->>EM: Record Operation Metrics
EM->>EM: Validate Metrics Configuration
EM->>EM: Record to Memory
EM->>PM: Expose Metrics Endpoint
PM->>PM: Scrape Metrics Data
PM->>GF: Provide Query Interface
GF->>GF: Render Dashboard
```
### 9.3 Metric Types Architecture Diagram
```mermaid
graph LR
subgraph "Metric Types"
H[Histogram
Latency Distribution]
C[Counter
Count Statistics]
G[Gauge
Instantaneous Value]
end
subgraph "Application Scenarios"
L[Latency Monitoring]
S[Success Rate Statistics]
R[Retry Count]
T[Throughput]
end
H --> L
C --> S
C --> R
G --> T
```
### 9.4 Monitoring Data Flow Diagram
```mermaid
flowchart TD
Start([Operation Start]) --> Record[Record Start Time]
Record --> Execute[Execute Operation]
Execute --> Success{Operation Success?}
Success -->|Yes| RecordSuccess[Record Success Metric]
Success -->|No| RecordFailure[Record Failure Metric]
RecordSuccess --> RecordLatency[Record Latency Metric]
RecordFailure --> RecordLatency
RecordLatency --> Expose[Expose to Prometheus]
Expose --> Visualize[Grafana Visualization]
```
## 10. Version History
### v1.0.0 (2024-01-15)
**New Features**:
- Basic metrics collection functionality
- Support Histogram and Counter metric types
- Integrate Prometheus client
- Provide decorator monitoring support
**Technical Features**:
- Built on prometheus_client library
- Support async function monitoring
- Provide label support
- Implement basic error handling
### v1.1.0 (2024-02-01)
**Feature Enhancements**:
- Add comprehensive operation recording functionality
- Support batch metrics recording
- Implement metrics summary query
- Enhance error handling mechanism
**Performance Optimizations**:
- Optimize metrics recording performance
- Reduce memory usage
- Improve concurrent processing
### v1.2.0 (2024-03-01)
**New Features**:
- Support custom metric labels
- Add metrics validation mechanism
- Implement metrics recording fault tolerance
- Support dynamic port allocation
**Stability Improvements**:
- Enhance exception handling
- Improve resource management
- Optimize monitoring performance
### v1.3.0 (2024-04-01)
**Architecture Upgrades**:
- Upgrade to prometheus_client 0.17.x
- Support multi-dimensional labels
- Add metrics aggregation functionality
- Implement async metrics recording
**Monitoring Enhancements**:
- Add health check interface
- Support metrics export
- Implement alert integration
- Add performance analysis tools
---
## Appendix
### A. Related Documentation
- [Celery Task Manager Documentation](../INFRASTRUCTURE_MESSAGEING/CELERY_TASK_MANAGER.md)
- [WebSocket Manager Documentation](../INFRASTRUCTURE_MESSAGEING/WEBSOCKET_MANAGER.md)
- [System Configuration Guide](../CONFIG/CONFIG_MANAGEMENT.md)
### B. External Dependencies
- [Prometheus Official Documentation](https://prometheus.io/docs/)
- [Grafana Official Documentation](https://grafana.com/docs/)
- [prometheus_client Documentation](https://prometheus.github.io/client_python/)
### C. Monitoring Best Practices
```python
# 1. Metric naming conventions
# Use descriptive names, include units
"request_duration_seconds"
"http_requests_total"
"memory_usage_bytes"
# 2. Label usage principles
# Use meaningful labels, avoid high cardinality
labels = {
"method": "POST", # Low cardinality
"endpoint": "/api/tasks", # Low cardinality
"status_code": "200" # Low cardinality
}
# 3. Metrics recording timing
# Record immediately after operation completes
try:
result = await operation()
metrics.record_operation_success("operation")
except Exception as e:
metrics.record_operation_failure("operation", type(e).__name__)
finally:
metrics.record_operation_latency("operation", duration)
```
### D. Contact Information
- Technical Lead: AIECS Development Team
- Issue Reporting: Through project Issue system
- Documentation Updates: Regular maintenance, version synchronization