Source code for aiecs.infrastructure.monitoring.executor_metrics

# /*---------------------------------------------------------------------------------------------
#  *  Copyright (c) IRETBL Corporation. All rights reserved.
#  *  Licensed under the Apache-2.0. See License.txt in the project root for license information.
#  *--------------------------------------------------------------------------------------------*/
import logging
import functools
from typing import Dict, Optional, Any
from prometheus_client import Counter, Histogram, start_http_server

logger = logging.getLogger(__name__)


[docs] class ExecutorMetrics: """ Specialized handler for executor performance monitoring and metrics collection """
[docs] def __init__(self, enable_metrics: bool = True, metrics_port: int = 8001): self.enable_metrics = enable_metrics self.metrics_port = metrics_port self.metrics: Dict[str, Any] = {} if self.enable_metrics: self._init_prometheus_metrics()
def _init_prometheus_metrics(self): """Initialize Prometheus metrics""" try: start_http_server(self.metrics_port) self.metrics = { "intent_latency": Histogram("intent_latency_seconds", "Latency of intent parsing"), "intent_success": Counter( "intent_success_total", "Number of successful intent parsings", ), "intent_retries": Counter("intent_retries_total", "Number of intent parsing retries"), "plan_latency": Histogram("plan_latency_seconds", "Latency of task planning"), "plan_success": Counter("plan_success_total", "Number of successful plans"), "plan_retries": Counter("plan_retries_total", "Number of plan retries"), "execute_latency": Histogram( "execute_latency_seconds", "Latency of task execution", ["task_type"], ), "execute_success": Counter( "execute_success_total", "Number of successful executions", ["task_type"], ), "execute_retries": Counter( "execute_retries_total", "Number of execution retries", ["task_type"], ), } logger.info(f"Prometheus metrics server started on port {self.metrics_port}") except Exception as e: logger.warning(f"Failed to start metrics server: {e}") self.metrics = {}
[docs] def record_operation_latency(self, operation: str, duration: float): """Record operation latency""" if not self.enable_metrics or f"{operation}_latency" not in self.metrics: return self.metrics[f"{operation}_latency"].observe(duration)
[docs] def record_operation_success(self, operation: str, labels: Optional[Dict[str, str]] = None): """Record operation success""" if not self.enable_metrics or f"{operation}_success" not in self.metrics: return metric = self.metrics[f"{operation}_success"] if labels: metric = metric.labels(**labels) metric.inc()
[docs] def record_operation_failure( self, operation: str, error_type: str, labels: Optional[Dict[str, str]] = None, ): """Record operation failure""" if not self.enable_metrics: return # Failure metrics can be added logger.error(f"Operation {operation} failed with error type: {error_type}")
[docs] def record_retry(self, operation: str, attempt_number: int): """Record retry""" if not self.enable_metrics or f"{operation}_retries" not in self.metrics: return if attempt_number > 1: self.metrics[f"{operation}_retries"].inc()
[docs] def with_metrics(self, metric_name: str, labels: Optional[Dict[str, str]] = None): """Monitoring decorator""" def decorator(func): @functools.wraps(func) async def wrapper(*args, **kwargs): if not self.metrics or f"{metric_name}_latency" not in self.metrics: return await func(*args, **kwargs) labels_dict = labels or {} metric = self.metrics[f"{metric_name}_latency"] if labels: metric = metric.labels(**labels_dict) with metric.time(): try: result = await func(*args, **kwargs) if f"{metric_name}_success" in self.metrics: success_metric = self.metrics[f"{metric_name}_success"] if labels: success_metric = success_metric.labels(**labels_dict) success_metric.inc() return result except Exception as e: logger.error(f"Error in {func.__name__}: {e}") raise return wrapper return decorator
[docs] def get_metrics_summary(self) -> Dict[str, Any]: """Get metrics summary""" if not self.enable_metrics: return {"metrics_enabled": False} return { "metrics_enabled": True, "metrics_port": self.metrics_port, "available_metrics": list(self.metrics.keys()), }
[docs] def record_operation( self, operation_type: str, success: bool = True, duration: Optional[float] = None, **kwargs, ): """Record a general operation for metrics tracking""" if not self.enable_metrics: return try: # Record operation success/failure if success: self.record_operation_success(operation_type, kwargs.get("labels")) else: error_type = kwargs.get("error_type", "unknown") self.record_operation_failure(operation_type, error_type, kwargs.get("labels")) # Record operation latency if provided if duration is not None: self.record_operation_latency(operation_type, duration) except Exception as e: logger.warning(f"Failed to record operation metrics: {e}")
[docs] def record_duration( self, operation: str, duration: float, labels: Optional[Dict[str, str]] = None, ): """Record operation duration for metrics tracking""" if not self.enable_metrics: return try: self.record_operation_latency(operation, duration) except Exception as e: logger.warning(f"Failed to record duration metrics: {e}")