ExecutorMetrics Technical Documentation
1. Overview
Purpose
ExecutorMetrics is a component specifically designed for executor performance monitoring and metrics collection, built on the Prometheus client library. It provides collection, recording, and exposure of key performance metrics such as task execution latency, success rate, retry count, etc., serving as core infrastructure for monitoring and observability in the AIECS system.
Core Value
Performance Monitoring: Real-time monitoring of key metrics such as task execution latency and success rate
Observability: Provides comprehensive visibility of system operational status
Problem Diagnosis: Quickly locate performance bottlenecks and anomalies through metrics data
Capacity Planning: Predict system capacity requirements based on historical data
SLA Assurance: Ensure system meets service level agreement requirements
2. Problem Background & Design Motivation
Problem Background
In the AIECS system, a large number of complex task executions need to be handled, including:
Performance Bottleneck Identification: Unable to quickly identify which task types or operations cause performance issues
System Health Monitoring: Lack of real-time monitoring of overall system operational status
Capacity Planning Difficulties: Unable to predict system load and resource requirements based on historical data
Complex Troubleshooting: Difficult to quickly locate root causes when problems occur
Missing SLA Monitoring: Unable to quantify whether the system meets service level agreements
Design Motivation
Performance Optimization: Identify performance bottlenecks through metrics data to guide system optimization
Fault Prevention: Discover potential issues early through monitoring metrics
Capacity Management: Develop capacity planning strategies based on historical data
Operational Efficiency: Provide automated monitoring and alerting capabilities
Business Insights: Understand business usage patterns through metrics data
3. Architecture Positioning & Context
System Architecture Location
┌─────────────────────────────────────────────────────────────┐
│ AIECS System Architecture │
├─────────────────────────────────────────────────────────────┤
│ Monitoring Layer │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ ExecutorMetrics │ │ Prometheus │ │
│ └─────────────────┘ └─────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Infrastructure Layer │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ CeleryTaskManager│ │ WebSocketManager│ │
│ └─────────────────┘ └─────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Domain Layer │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ TaskService │ │ DSLProcessor │ │
│ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Upstream Callers
TaskService: Task management service that needs to monitor task execution performance
DSLProcessor: DSL processor that needs to monitor plan generation and execution performance
CeleryTaskManager: Task executor that needs to monitor task execution metrics
WebSocketManager: WebSocket manager that needs to monitor connection performance
Downstream Dependencies
Prometheus: Metrics collection and storage system
Grafana: Metrics visualization and dashboards
AlertManager: Alert system based on metrics
prometheus_client: Python Prometheus client library
4. Core Features & Use Cases
4.1 Metrics Collection and Recording
Latency Metrics Recording
# Create metrics collector
metrics = ExecutorMetrics(enable_metrics=True, metrics_port=8001)
# Record intent parsing latency
start_time = time.time()
intent_result = await parse_user_intent(user_input)
duration = time.time() - start_time
metrics.record_operation_latency("intent", duration)
# Record task planning latency
start_time = time.time()
plan = await generate_task_plan(intent_result)
duration = time.time() - start_time
metrics.record_operation_latency("plan", duration)
# Record task execution latency (with labels)
start_time = time.time()
result = await execute_task(task_data)
duration = time.time() - start_time
metrics.record_operation_latency("execute", duration)
Success Rate Metrics Recording
# Record successful operations
try:
result = await process_task(task_data)
metrics.record_operation_success("execute", labels={"task_type": "data_processing"})
except Exception as e:
metrics.record_operation_failure("execute", "execution_error",
labels={"task_type": "data_processing"})
# Record retry count
for attempt in range(max_retries):
try:
result = await retry_operation()
break
except Exception:
metrics.record_retry("execute", attempt + 1)
4.2 Decorator Monitoring
Automatic Monitoring with Decorator
# Use decorator to monitor function performance
@metrics.with_metrics("intent_parsing")
async def parse_user_intent(user_input: str) -> Dict[str, Any]:
"""Parse user intent"""
# Function execution logic
return intent_result
@metrics.with_metrics("task_planning", labels={"plan_type": "complex"})
async def generate_complex_plan(intent: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Generate complex task plan"""
# Plan generation logic
return plan_steps
# Monitoring with labels
@metrics.with_metrics("task_execution", labels={"task_type": "ml_training"})
async def execute_ml_training(data: Dict[str, Any]) -> Dict[str, Any]:
"""Execute machine learning training task"""
# Training logic
return training_result
4.3 Comprehensive Operation Recording
Record Complete Operation Metrics
# Record complete operation metrics
async def process_user_request(user_input: str, user_id: str):
"""Complete process for handling user request"""
start_time = time.time()
try:
# 1. Intent parsing
intent_start = time.time()
intent = await parse_user_intent(user_input)
intent_duration = time.time() - intent_start
# 2. Task planning
plan_start = time.time()
plan = await generate_task_plan(intent)
plan_duration = time.time() - plan_start
# 3. Task execution
execute_start = time.time()
result = await execute_task(plan)
execute_duration = time.time() - execute_start
# Record success metrics
metrics.record_operation("intent", success=True, duration=intent_duration)
metrics.record_operation("plan", success=True, duration=plan_duration)
metrics.record_operation("execute", success=True, duration=execute_duration,
labels={"task_type": plan.get("type", "unknown")})
return result
except Exception as e:
# Record failure metrics
total_duration = time.time() - start_time
metrics.record_operation("intent", success=False, duration=intent_duration,
error_type=type(e).__name__)
raise
4.4 Custom Metrics Recording
Business-Specific Metrics
# Record business-specific metrics
class BusinessMetrics:
def __init__(self, base_metrics: ExecutorMetrics):
self.base_metrics = base_metrics
async def record_user_engagement(self, user_id: str, action: str, duration: float):
"""Record user engagement metrics"""
self.base_metrics.record_operation(
"user_engagement",
success=True,
duration=duration,
labels={"action": action, "user_id": user_id}
)
async def record_data_processing_volume(self, data_size: int, processing_time: float):
"""Record data processing volume metrics"""
self.base_metrics.record_operation(
"data_processing",
success=True,
duration=processing_time,
labels={"data_size_category": self._categorize_data_size(data_size)}
)
def _categorize_data_size(self, size: int) -> str:
"""Categorize by data size"""
if size < 1024:
return "small"
elif size < 1024 * 1024:
return "medium"
else:
return "large"
4.5 Metrics Query and Monitoring
Get Metrics Summary
# Get metrics summary information
summary = metrics.get_metrics_summary()
print(f"Metrics enabled status: {summary['metrics_enabled']}")
print(f"Metrics port: {summary['metrics_port']}")
print(f"Available metrics: {summary['available_metrics']}")
# Check if specific metric is available
if "intent_latency" in metrics.metrics:
print("Intent parsing latency metric enabled")
Prometheus Metrics Query Examples
# Query intent parsing average latency
rate(intent_latency_seconds_sum[5m]) / rate(intent_latency_seconds_count[5m])
# Query task execution success rate
rate(execute_success_total[5m]) / (rate(execute_success_total[5m]) + rate(execute_retries_total[5m]))
# Query execution latency grouped by task type
histogram_quantile(0.95, rate(execute_latency_seconds_bucket[5m]) by (le, task_type))
5. API Reference
5.1 Class Definition
ExecutorMetrics
class ExecutorMetrics:
"""Executor performance monitoring and metrics collector"""
def __init__(self, enable_metrics: bool = True, metrics_port: int = 8001) -> None
"""Initialize metrics collector
Args:
enable_metrics: Whether to enable metrics collection
metrics_port: Prometheus metrics server port
"""
5.2 Public Methods
record_operation_latency
def record_operation_latency(self, operation: str, duration: float) -> None
Function: Record operation latency
Parameters:
operation(str): Operation nameduration(float): Operation duration (seconds)
record_operation_success
def record_operation_success(self, operation: str, labels: Optional[Dict[str, str]] = None) -> None
Function: Record operation success
Parameters:
operation(str): Operation namelabels(Optional[Dict[str, str]]): Labels dictionary
record_operation_failure
def record_operation_failure(self, operation: str, error_type: str, labels: Optional[Dict[str, str]] = None) -> None
Function: Record operation failure
Parameters:
operation(str): Operation nameerror_type(str): Error typelabels(Optional[Dict[str, str]]): Labels dictionary
record_retry
def record_retry(self, operation: str, attempt_number: int) -> None
Function: Record retry count
Parameters:
operation(str): Operation nameattempt_number(int): Attempt number
with_metrics
def with_metrics(self, metric_name: str, labels: Optional[Dict[str, str]] = None) -> Callable
Function: Monitoring decorator
Parameters:
metric_name(str): Metric namelabels(Optional[Dict[str, str]]): Labels dictionary
Returns:
Callable: Decorator function
get_metrics_summary
def get_metrics_summary(self) -> Dict[str, Any]
Function: Get metrics summary
Returns:
Dict[str, Any]: Metrics summary information
record_operation
def record_operation(self, operation_type: str, success: bool = True, duration: Optional[float] = None, **kwargs) -> None
Function: Record comprehensive operation metrics
Parameters:
operation_type(str): Operation typesuccess(bool): Whether successfulduration(Optional[float]): Operation duration**kwargs: Other parameters (labels, error_type, etc.)
record_duration
def record_duration(self, operation: str, duration: float, labels: Optional[Dict[str, str]] = None) -> None
Function: Record operation duration
Parameters:
operation(str): Operation nameduration(float): Durationlabels(Optional[Dict[str, str]]): Labels dictionary
6. Technical Implementation Details
6.1 Prometheus Metric Types
Histogram Metrics
# Latency metrics use Histogram type
"intent_latency": Histogram("intent_latency_seconds", "Latency of intent parsing"),
"plan_latency": Histogram("plan_latency_seconds", "Latency of task planning"),
"execute_latency": Histogram("execute_latency_seconds", "Latency of task execution", ["task_type"])
# Histogram automatically provides the following metrics:
# - intent_latency_seconds_count: Total request count
# - intent_latency_seconds_sum: Total latency time
# - intent_latency_seconds_bucket: Bucket statistics
Counter Metrics
# Count metrics use Counter type
"intent_success": Counter("intent_success_total", "Number of successful intent parsings"),
"intent_retries": Counter("intent_retries_total", "Number of intent parsing retries"),
"execute_success": Counter("execute_success_total", "Number of successful executions", ["task_type"])
# Counter only increases, suitable for counting successes, retries, etc.
6.2 Decorator Implementation Mechanism
Async Function Monitoring
def with_metrics(self, metric_name: str, labels: Optional[Dict[str, str]] = None):
"""Monitoring decorator implementation"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
if not self.metrics or f"{metric_name}_latency" not in self.metrics:
return await func(*args, **kwargs)
labels_dict = labels or {}
metric = self.metrics[f"{metric_name}_latency"]
if labels:
metric = metric.labels(**labels_dict)
# Use context manager to automatically record time
with metric.time():
try:
result = await func(*args, **kwargs)
# Record success metric
if f"{metric_name}_success" in self.metrics:
success_metric = self.metrics[f"{metric_name}_success"]
if labels:
success_metric = success_metric.labels(**labels_dict)
success_metric.inc()
return result
except Exception as e:
logger.error(f"Error in {func.__name__}: {e}")
raise
return wrapper
return decorator
6.3 Label Management Mechanism
Dynamic Label Support
def record_operation_success(self, operation: str, labels: Optional[Dict[str, str]] = None):
"""Record operation success (with label support)"""
if not self.enable_metrics or f"{operation}_success" not in self.metrics:
return
metric = self.metrics[f"{operation}_success"]
if labels:
# Create labeled instance for labeled metrics
metric = metric.labels(**labels)
metric.inc()
Label Validation and Cleaning
def _validate_labels(self, labels: Dict[str, str]) -> Dict[str, str]:
"""Validate and clean labels"""
if not labels:
return {}
# Remove empty values and invalid characters
cleaned_labels = {}
for key, value in labels.items():
if value and isinstance(value, str) and len(value.strip()) > 0:
# Clean label value, remove special characters
cleaned_value = re.sub(r'[^a-zA-Z0-9_:]', '_', str(value).strip())
cleaned_labels[key] = cleaned_value
return cleaned_labels
6.4 Error Handling Strategy
Metrics Recording Fault Tolerance
def record_operation(self, operation_type: str, success: bool = True, duration: Optional[float] = None, **kwargs):
"""Record comprehensive operation metrics (with fault tolerance)"""
if not self.enable_metrics:
return
try:
# Record operation success/failure
if success:
self.record_operation_success(operation_type, kwargs.get('labels'))
else:
error_type = kwargs.get('error_type', 'unknown')
self.record_operation_failure(operation_type, error_type, kwargs.get('labels'))
# Record operation latency
if duration is not None:
self.record_operation_latency(operation_type, duration)
except Exception as e:
# Metrics recording failure should not affect business logic
logger.warning(f"Failed to record operation metrics: {e}")
Metrics Initialization Fault Tolerance
def _init_prometheus_metrics(self):
"""Initialize Prometheus metrics (with fault tolerance)"""
try:
start_http_server(self.metrics_port)
self.metrics = {
# Metric definitions...
}
logger.info(f"Prometheus metrics server started on port {self.metrics_port}")
except Exception as e:
logger.warning(f"Failed to start metrics server: {e}")
# Degrade to no-metrics mode
self.metrics = {}
self.enable_metrics = False
6.5 Performance Optimization Mechanism
Conditional Check Optimization
def record_operation_latency(self, operation: str, duration: float):
"""Record operation latency (optimized version)"""
# Fast path: if metrics not enabled, return directly
if not self.enable_metrics:
return
# Check if metric exists
metric_key = f"{operation}_latency"
if metric_key not in self.metrics:
return
# Record metric
self.metrics[metric_key].observe(duration)
Batch Metrics Recording
def record_batch_operations(self, operations: List[Dict[str, Any]]):
"""Batch record operation metrics"""
if not self.enable_metrics:
return
for op in operations:
try:
if op.get('success', True):
self.record_operation_success(
op['operation'],
op.get('labels')
)
else:
self.record_operation_failure(
op['operation'],
op.get('error_type', 'unknown'),
op.get('labels')
)
if 'duration' in op:
self.record_operation_latency(
op['operation'],
op['duration']
)
except Exception as e:
logger.warning(f"Failed to record batch operation: {e}")
7. Configuration & Deployment
7.1 Basic Configuration
Metrics Collector Configuration
# Basic configuration
metrics = ExecutorMetrics(
enable_metrics=True,
metrics_port=8001
)
# Production environment configuration
production_metrics = ExecutorMetrics(
enable_metrics=True,
metrics_port=8001
)
Environment Variable Support
# Metrics configuration
export METRICS_ENABLED="true"
export METRICS_PORT="8001"
export METRICS_HOST="0.0.0.0"
# Prometheus configuration
export PROMETHEUS_ENDPOINT="http://prometheus:9090"
export PROMETHEUS_RETENTION="30d"
7.2 Docker Deployment
Dockerfile Configuration
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
# Expose metrics port
EXPOSE 8001
# Start command
CMD ["python", "-m", "aiecs.infrastructure.monitoring.executor_metrics"]
Docker Compose Configuration
version: '3.8'
services:
executor-metrics:
build: .
ports:
- "8001:8001"
environment:
- METRICS_ENABLED=true
- METRICS_PORT=8001
volumes:
- ./logs:/app/logs
restart: unless-stopped
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'
7.3 Prometheus Configuration
prometheus.yml Configuration
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'executor-metrics'
static_configs:
- targets: ['executor-metrics:8001']
scrape_interval: 5s
metrics_path: /metrics
- job_name: 'aiecs-api'
static_configs:
- targets: ['aiecs-api:8000']
scrape_interval: 10s
7.4 Grafana Dashboard Configuration
Dashboard JSON Configuration
{
"dashboard": {
"title": "AIECS Executor Metrics",
"panels": [
{
"title": "Request Rate",
"type": "graph",
"targets": [
{
"expr": "rate(intent_success_total[5m])",
"legendFormat": "Intent Success Rate"
}
]
},
{
"title": "Latency Percentiles",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(execute_latency_seconds_bucket[5m]))",
"legendFormat": "95th percentile"
}
]
}
]
}
}
8. Maintenance & Troubleshooting
8.1 Monitoring Metrics
Key Metrics
Request Rate:
rate(intent_success_total[5m])Error Rate:
rate(intent_retries_total[5m]) / rate(intent_success_total[5m])Latency Percentiles:
histogram_quantile(0.95, rate(execute_latency_seconds_bucket[5m]))Success Rate:
rate(execute_success_total[5m]) / (rate(execute_success_total[5m]) + rate(execute_retries_total[5m]))
Monitoring Implementation
class MetricsMonitor:
def __init__(self, metrics: ExecutorMetrics):
self.metrics = metrics
self.alert_thresholds = {
"error_rate": 0.05, # 5% error rate threshold
"latency_p95": 5.0, # 95% latency 5 second threshold
"success_rate": 0.95 # 95% success rate threshold
}
def check_health(self) -> Dict[str, Any]:
"""Check system health status"""
if not self.metrics.enable_metrics:
return {"status": "disabled", "message": "Metrics disabled"}
# Add specific health check logic here
return {
"status": "healthy",
"metrics_enabled": True,
"available_metrics": len(self.metrics.metrics)
}
8.2 Common Issues & Solutions
Issue 1: Metrics Server Startup Failure
Symptoms: Failed to start metrics server error
Possible Causes:
Port already in use
Insufficient permissions
Network configuration issues
Solutions:
# 1. Check port availability
import socket
def check_port_available(port: int) -> bool:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('localhost', port))
sock.close()
return result != 0
# 2. Use dynamic port
def find_available_port(start_port: int = 8001) -> int:
for port in range(start_port, start_port + 100):
if check_port_available(port):
return port
raise RuntimeError("No available port found")
# 3. Retry mechanism
def init_metrics_with_retry(max_retries: int = 3):
for attempt in range(max_retries):
try:
port = find_available_port()
metrics = ExecutorMetrics(metrics_port=port)
return metrics
except Exception as e:
if attempt == max_retries - 1:
raise
time.sleep(1)
Issue 2: Metrics Recording Failed
Symptoms: Metrics recording not working, data not updating
Possible Causes:
Metrics not properly initialized
Label format error
Metric name mismatch
Solutions:
# 1. Validate metrics initialization
def validate_metrics_initialization(metrics: ExecutorMetrics):
if not metrics.enable_metrics:
print("Warning: Metrics collection disabled")
return False
if not metrics.metrics:
print("Error: Metrics not properly initialized")
return False
print(f"Metrics initialized: {list(metrics.metrics.keys())}")
return True
# 2. Validate metrics recording
def test_metrics_recording(metrics: ExecutorMetrics):
try:
metrics.record_operation_success("test_operation")
metrics.record_operation_latency("test_operation", 1.0)
print("Metrics recording test successful")
return True
except Exception as e:
print(f"Metrics recording test failed: {e}")
return False
Issue 3: Prometheus Unable to Scrape Metrics
Symptoms: Prometheus shows target unreachable
Possible Causes:
Network connection issues
Firewall blocking
Service not started
Solutions:
# 1. Check service status
curl http://localhost:8001/metrics
# 2. Check network connection
telnet localhost 8001
# 3. Check Prometheus configuration
promtool check config prometheus.yml
# 4. View Prometheus logs
docker logs prometheus
Issue 4: Metrics Data Inaccurate
Symptoms: Metrics data inconsistent with actual situation
Possible Causes:
Incorrect metrics recording timing
Improper label usage
Concurrency issues
Solutions:
# 1. Ensure correct metrics recording timing
async def correct_metrics_recording():
start_time = time.time()
try:
result = await process_task()
# Record on success
metrics.record_operation_success("process_task")
except Exception as e:
# Record on failure
metrics.record_operation_failure("process_task", type(e).__name__)
finally:
# Always record latency
duration = time.time() - start_time
metrics.record_operation_latency("process_task", duration)
# 2. Use thread-safe metrics recording
import threading
from concurrent.futures import ThreadPoolExecutor
class ThreadSafeMetrics:
def __init__(self, base_metrics: ExecutorMetrics):
self.base_metrics = base_metrics
self.lock = threading.Lock()
self.executor = ThreadPoolExecutor(max_workers=1)
def record_operation_async(self, operation: str, success: bool, duration: float):
"""Asynchronously record metrics, avoid blocking main thread"""
self.executor.submit(
self._record_with_lock, operation, success, duration
)
def _record_with_lock(self, operation: str, success: bool, duration: float):
"""Metrics recording with lock"""
with self.lock:
if success:
self.base_metrics.record_operation_success(operation)
else:
self.base_metrics.record_operation_failure(operation, "unknown")
self.base_metrics.record_operation_latency(operation, duration)
8.3 Performance Optimization Recommendations
Metrics Recording Optimization
# 1. Batch metrics recording
class BatchMetricsRecorder:
def __init__(self, metrics: ExecutorMetrics, batch_size: int = 100):
self.metrics = metrics
self.batch_size = batch_size
self.batch = []
self.lock = threading.Lock()
def add_metric(self, operation: str, success: bool, duration: float):
"""Add metric to batch"""
with self.lock:
self.batch.append({
'operation': operation,
'success': success,
'duration': duration,
'timestamp': time.time()
})
if len(self.batch) >= self.batch_size:
self._flush_batch()
def _flush_batch(self):
"""Flush batch metrics"""
if not self.batch:
return
for metric in self.batch:
try:
if metric['success']:
self.metrics.record_operation_success(metric['operation'])
else:
self.metrics.record_operation_failure(metric['operation'], 'unknown')
self.metrics.record_operation_latency(metric['operation'], metric['duration'])
except Exception as e:
logger.warning(f"Failed to record batch metric: {e}")
self.batch.clear()
# 2. Async metrics recording
import asyncio
from asyncio import Queue
class AsyncMetricsRecorder:
def __init__(self, metrics: ExecutorMetrics):
self.metrics = metrics
self.queue = Queue(maxsize=1000)
self.running = False
async def start(self):
"""Start async metrics recorder"""
self.running = True
asyncio.create_task(self._process_queue())
async def record_metric(self, operation: str, success: bool, duration: float):
"""Asynchronously record metric"""
try:
await self.queue.put({
'operation': operation,
'success': success,
'duration': duration
})
except asyncio.QueueFull:
logger.warning("Metrics queue is full, dropping metric")
async def _process_queue(self):
"""Process metrics queue"""
while self.running:
try:
metric = await asyncio.wait_for(self.queue.get(), timeout=1.0)
# Record metric
if metric['success']:
self.metrics.record_operation_success(metric['operation'])
else:
self.metrics.record_operation_failure(metric['operation'], 'unknown')
self.metrics.record_operation_latency(metric['operation'], metric['duration'])
except asyncio.TimeoutError:
continue
except Exception as e:
logger.error(f"Error processing metrics queue: {e}")
9. Visualizations
9.1 System Architecture Diagram
graph TB
subgraph "Application Layer"
APP[AIECS Application]
TS[TaskService]
DS[DSLProcessor]
end
subgraph "Monitoring Layer"
EM[ExecutorMetrics]
PM[Prometheus]
GF[Grafana]
end
subgraph "Infrastructure Layer"
CTM[CeleryTaskManager]
WSM[WebSocketManager]
end
APP --> EM
TS --> EM
DS --> EM
CTM --> EM
WSM --> EM
EM --> PM
PM --> GF
9.2 Metrics Collection Flow Diagram
sequenceDiagram
participant App as Application
participant EM as ExecutorMetrics
participant PM as Prometheus
participant GF as Grafana
App->>EM: Record Operation Metrics
EM->>EM: Validate Metrics Configuration
EM->>EM: Record to Memory
EM->>PM: Expose Metrics Endpoint
PM->>PM: Scrape Metrics Data
PM->>GF: Provide Query Interface
GF->>GF: Render Dashboard
9.3 Metric Types Architecture Diagram
graph LR
subgraph "Metric Types"
H[Histogram<br/>Latency Distribution]
C[Counter<br/>Count Statistics]
G[Gauge<br/>Instantaneous Value]
end
subgraph "Application Scenarios"
L[Latency Monitoring]
S[Success Rate Statistics]
R[Retry Count]
T[Throughput]
end
H --> L
C --> S
C --> R
G --> T
9.4 Monitoring Data Flow Diagram
flowchart TD
Start([Operation Start]) --> Record[Record Start Time]
Record --> Execute[Execute Operation]
Execute --> Success{Operation Success?}
Success -->|Yes| RecordSuccess[Record Success Metric]
Success -->|No| RecordFailure[Record Failure Metric]
RecordSuccess --> RecordLatency[Record Latency Metric]
RecordFailure --> RecordLatency
RecordLatency --> Expose[Expose to Prometheus]
Expose --> Visualize[Grafana Visualization]
10. Version History
v1.0.0 (2024-01-15)
New Features:
Basic metrics collection functionality
Support Histogram and Counter metric types
Integrate Prometheus client
Provide decorator monitoring support
Technical Features:
Built on prometheus_client library
Support async function monitoring
Provide label support
Implement basic error handling
v1.1.0 (2024-02-01)
Feature Enhancements:
Add comprehensive operation recording functionality
Support batch metrics recording
Implement metrics summary query
Enhance error handling mechanism
Performance Optimizations:
Optimize metrics recording performance
Reduce memory usage
Improve concurrent processing
v1.2.0 (2024-03-01)
New Features:
Support custom metric labels
Add metrics validation mechanism
Implement metrics recording fault tolerance
Support dynamic port allocation
Stability Improvements:
Enhance exception handling
Improve resource management
Optimize monitoring performance
v1.3.0 (2024-04-01)
Architecture Upgrades:
Upgrade to prometheus_client 0.17.x
Support multi-dimensional labels
Add metrics aggregation functionality
Implement async metrics recording
Monitoring Enhancements:
Add health check interface
Support metrics export
Implement alert integration
Add performance analysis tools
Appendix
B. External Dependencies
C. Monitoring Best Practices
# 1. Metric naming conventions
# Use descriptive names, include units
"request_duration_seconds"
"http_requests_total"
"memory_usage_bytes"
# 2. Label usage principles
# Use meaningful labels, avoid high cardinality
labels = {
"method": "POST", # Low cardinality
"endpoint": "/api/tasks", # Low cardinality
"status_code": "200" # Low cardinality
}
# 3. Metrics recording timing
# Record immediately after operation completes
try:
result = await operation()
metrics.record_operation_success("operation")
except Exception as e:
metrics.record_operation_failure("operation", type(e).__name__)
finally:
metrics.record_operation_latency("operation", duration)
D. Contact Information
Technical Lead: AIECS Development Team
Issue Reporting: Through project Issue system
Documentation Updates: Regular maintenance, version synchronization