Infrastructure API
This section documents the infrastructure layer components.
Persistence
Database Manager
- class aiecs.infrastructure.persistence.database_manager.DatabaseManager[source]
Bases:
objectSpecialized handler for database connections, operations, and task history management
File Storage
File Storage Implementation with Google Cloud Storage
Provides file storage capabilities using Google Cloud Storage as the backend, with support for local fallback and caching.
- exception aiecs.infrastructure.persistence.file_storage.FileStorageError[source]
Bases:
ExceptionBase exception for file storage operations.
- class aiecs.infrastructure.persistence.file_storage.FileStorageConfig[source]
Bases:
objectConfiguration for file storage.
- class aiecs.infrastructure.persistence.file_storage.FileStorage[source]
Bases:
objectFile storage implementation with Google Cloud Storage backend.
Features: - Google Cloud Storage as primary backend - Local filesystem fallback - In-memory caching with TTL - Automatic compression for large files - Retry logic with exponential backoff - Metrics collection
- async initialize()[source]
Initialize the file storage system.
- Returns:
True if initialization was successful
- Return type:
Redis Client
- class aiecs.infrastructure.persistence.redis_client.RedisClient[source]
Bases:
objectRedis client singleton for sharing across different caching strategies
- async hset(name, key=None, value=None, mapping=None)[source]
Set hash fields
Supports two calling patterns: 1. hset(name, key, value) - Set single field (positional) 2. hset(name, key=key, value=value) - Set single field (keyword) 3. hset(name, mapping={…}) - Set multiple fields
- Parameters:
- Returns:
Number of fields that were added
- Raises:
ValueError – If neither (key, value) nor mapping is provided
- Return type:
Examples
# Single field with positional args await redis_client.hset(“myhash”, “field1”, “value1”)
# Single field with keyword args await redis_client.hset(“myhash”, key=”field1”, value=”value1”)
# Multiple fields with mapping await redis_client.hset(“myhash”, mapping={“field1”: “value1”, “field2”: “value2”})
- async aiecs.infrastructure.persistence.redis_client.initialize_redis_client()[source]
Create and initialize global Redis client instance at application startup.
- async aiecs.infrastructure.persistence.redis_client.close_redis_client()[source]
Close global Redis client instance at application shutdown.
Monitoring
Executor Metrics
- class aiecs.infrastructure.monitoring.executor_metrics.ExecutorMetrics[source]
Bases:
objectSpecialized handler for executor performance monitoring and metrics collection
Global Metrics Manager
Global Metrics Manager
This module provides a singleton ExecutorMetrics instance that can be shared across all components in the application. It follows the same pattern as other global managers in the infrastructure layer.
- Usage:
# In main.py startup: await initialize_global_metrics()
# In any component: from aiecs.infrastructure.monitoring.global_metrics_manager import get_global_metrics metrics = get_global_metrics()
- async aiecs.infrastructure.monitoring.global_metrics_manager.initialize_global_metrics(enable_metrics=True, metrics_port=None, config=None)[source]
Initialize the global ExecutorMetrics instance.
This should be called once during application startup (in main.py lifespan).
- Parameters:
- Returns:
The initialized ExecutorMetrics instance or None if initialization fails
- Return type:
ExecutorMetrics | None
Example
@asynccontextmanager async def lifespan(app: FastAPI):
# Startup await initialize_global_metrics() yield # Shutdown await close_global_metrics()
- aiecs.infrastructure.monitoring.global_metrics_manager.get_global_metrics()[source]
Get the global ExecutorMetrics instance.
- Returns:
The global ExecutorMetrics instance or None if not initialized
- Raises:
RuntimeError – If metrics are requested but not initialized
- Return type:
ExecutorMetrics | None
Example
metrics = get_global_metrics() if metrics:
metrics.record_operation(‘my_operation’, 1)
- async aiecs.infrastructure.monitoring.global_metrics_manager.close_global_metrics()[source]
Close the global metrics instance.
This should be called during application shutdown.
- aiecs.infrastructure.monitoring.global_metrics_manager.is_metrics_initialized()[source]
Check if global metrics are initialized.
- Returns:
True if metrics are initialized, False otherwise
- Return type:
- aiecs.infrastructure.monitoring.global_metrics_manager.get_metrics_summary()[source]
Get a summary of the global metrics status.
- aiecs.infrastructure.monitoring.global_metrics_manager.record_operation(operation_type, success=True, duration=None, **kwargs)[source]
Record an operation using global metrics.
- aiecs.infrastructure.monitoring.global_metrics_manager.record_duration(operation, duration, labels=None)[source]
Record operation duration using global metrics.
- aiecs.infrastructure.monitoring.global_metrics_manager.record_operation_success(operation, labels=None)[source]
Record operation success using global metrics.
Tracing Manager
- class aiecs.infrastructure.monitoring.tracing_manager.TracingManager[source]
Bases:
objectSpecialized handler for distributed tracing and link tracking
- __init__(service_name='service_executor', jaeger_host=None, jaeger_port=None, enable_tracing=None)[source]
- trace_database_operation(operation, table=None, query=None)[source]
Database operation tracing decorator
- inject_span_context(span, carrier)[source]
Inject span context into carrier (for cross-service propagation)
Messaging
Celery Task Manager
- class aiecs.infrastructure.messaging.celery_task_manager.CeleryTaskManager[source]
Bases:
objectSpecialized handler for Celery distributed task scheduling and execution
- execute_celery_task(task_name, queue, user_id, task_id, step, mode, service, input_data, context)[source]
Execute Celery task
- async execute_task(task_name, input_data, context)[source]
Execute a single task using Celery for asynchronous processing
WebSocket Manager
- class aiecs.infrastructure.messaging.websocket_manager.UserConfirmation[source]
Bases:
BaseModel- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class aiecs.infrastructure.messaging.websocket_manager.TaskStepResult[source]
Bases:
BaseModel- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class aiecs.infrastructure.messaging.websocket_manager.WebSocketManager[source]
Bases:
objectSpecialized handler for WebSocket server and client communication
- async notify_user(step_result, user_id, task_id, step)[source]
Notify user of task step result
- Parameters:
step_result (TaskStepResult)
user_id (str)
task_id (str)
step (int)
- Return type: