Infrastructure API

This section documents the infrastructure layer components.

Persistence

Database Manager

class aiecs.infrastructure.persistence.database_manager.DatabaseManager[source]

Bases: object

Specialized handler for database connections, operations, and task history management

__init__(db_config=None)[source]
Parameters:

db_config (Dict[str, Any] | None)

async connect(min_size=10, max_size=20)[source]

Connect to database and initialize connection pool

Parameters:
  • min_size (int)

  • max_size (int)

async init_connection_pool(min_size=10, max_size=20)[source]

Initialize database connection pool

Parameters:
  • min_size (int)

  • max_size (int)

async init_database_schema()[source]

Initialize database table structure

async save_task_history(user_id, task_id, step, step_result)[source]

Save task execution history

Parameters:
  • user_id (str)

  • task_id (str)

  • step (int)

  • step_result (TaskStepResult)

async load_task_history(user_id, task_id)[source]

Load task execution history

Parameters:
Return type:

List[Dict]

async mark_task_as_cancelled(user_id, task_id)[source]

Mark task as cancelled

Parameters:
async check_task_status(user_id, task_id)[source]

Check task status

Parameters:
Return type:

TaskStatus

async get_user_tasks(user_id, limit=100)[source]

Get user task list

Parameters:
Return type:

List[Dict]

async cleanup_old_tasks(days_old=30)[source]

Clean up old task records

Parameters:

days_old (int)

async disconnect()[source]

Disconnect from database and close connection pool

async close()[source]

Close database connection pool

File Storage

File Storage Implementation with Google Cloud Storage

Provides file storage capabilities using Google Cloud Storage as the backend, with support for local fallback and caching.

exception aiecs.infrastructure.persistence.file_storage.FileStorageError[source]

Bases: Exception

Base exception for file storage operations.

class aiecs.infrastructure.persistence.file_storage.FileStorageConfig[source]

Bases: object

Configuration for file storage.

__init__(config)[source]
Parameters:

config (Dict[str, Any])

class aiecs.infrastructure.persistence.file_storage.FileStorage[source]

Bases: object

File storage implementation with Google Cloud Storage backend.

Features: - Google Cloud Storage as primary backend - Local filesystem fallback - In-memory caching with TTL - Automatic compression for large files - Retry logic with exponential backoff - Metrics collection

__init__(config)[source]
Parameters:

config (Dict[str, Any])

async initialize()[source]

Initialize the file storage system.

Returns:

True if initialization was successful

Return type:

bool

async store(key, data, metadata=None)[source]

Store data with the given key.

Parameters:
Returns:

True if storage was successful

Return type:

bool

async retrieve(key)[source]

Retrieve data by key.

Parameters:

key (str) – Storage key

Returns:

The stored data if found, None otherwise

Return type:

str | bytes | Dict[str, Any] | None

async delete(key)[source]

Delete data by key.

Parameters:

key (str) – Storage key

Returns:

True if deletion was successful

Return type:

bool

async exists(key)[source]

Check if data exists for the given key.

Parameters:

key (str) – Storage key

Returns:

True if data exists

Return type:

bool

async list_keys(prefix=None, limit=None)[source]

List storage keys with optional prefix filtering.

Parameters:
  • prefix (str | None) – Optional key prefix filter

  • limit (int | None) – Maximum number of keys to return

Returns:

List of storage keys

Return type:

List[str]

get_stats()[source]

Get storage statistics.

Return type:

Dict[str, Any]

aiecs.infrastructure.persistence.file_storage.get_file_storage(config=None)[source]

Get the global file storage instance.

Parameters:

config (Dict[str, Any] | None)

Return type:

FileStorage

async aiecs.infrastructure.persistence.file_storage.initialize_file_storage(config=None)[source]

Initialize and return the file storage instance.

Parameters:

config (Dict[str, Any] | None)

Return type:

FileStorage

Redis Client

class aiecs.infrastructure.persistence.redis_client.RedisClient[source]

Bases: object

Redis client singleton for sharing across different caching strategies

__init__()[source]
Return type:

None

async initialize()[source]

Initialize Redis client

async get_client()[source]

Get Redis client instance

Return type:

Redis

async close()[source]

Close Redis connection

async hincrby(name, key, amount=1)[source]

Atomically increment hash field

Parameters:
Return type:

int

async hget(name, key)[source]

Get hash field value

Parameters:
Return type:

str | None

async hgetall(name)[source]

Get all hash fields

Parameters:

name (str)

Return type:

Dict[Any, Any]

async hset(name, key=None, value=None, mapping=None)[source]

Set hash fields

Supports two calling patterns: 1. hset(name, key, value) - Set single field (positional) 2. hset(name, key=key, value=value) - Set single field (keyword) 3. hset(name, mapping={…}) - Set multiple fields

Parameters:
  • name (str) – Redis hash key name

  • key (str | None) – Field name (for single field set)

  • value (str | None) – Field value (for single field set)

  • mapping (dict | None) – Dictionary of field-value pairs (for multiple fields)

Returns:

Number of fields that were added

Raises:

ValueError – If neither (key, value) nor mapping is provided

Return type:

int

Examples

# Single field with positional args await redis_client.hset(“myhash”, “field1”, “value1”)

# Single field with keyword args await redis_client.hset(“myhash”, key=”field1”, value=”value1”)

# Multiple fields with mapping await redis_client.hset(“myhash”, mapping={“field1”: “value1”, “field2”: “value2”})

async expire(name, time)[source]

Set expiration time

Parameters:
Return type:

bool

async exists(name)[source]

Check if key exists

Parameters:

name (str)

Return type:

bool

async ping()[source]

Test Redis connection

Return type:

bool

async info(section=None)[source]

Get Redis server information

Parameters:

section (str | None)

Return type:

Dict[Any, Any]

async delete(*keys)[source]

Delete one or more keys

Parameters:

keys (str)

Return type:

int

async set(key, value, ex=None)[source]

Set a key-value pair with optional expiration

Parameters:
Return type:

bool

async get(key)[source]

Get value by key

Parameters:

key (str)

Return type:

str | None

async aiecs.infrastructure.persistence.redis_client.initialize_redis_client()[source]

Create and initialize global Redis client instance at application startup.

async aiecs.infrastructure.persistence.redis_client.close_redis_client()[source]

Close global Redis client instance at application shutdown.

async aiecs.infrastructure.persistence.redis_client.get_redis_client()[source]

Get global Redis client instance

Return type:

RedisClient

Monitoring

Executor Metrics

class aiecs.infrastructure.monitoring.executor_metrics.ExecutorMetrics[source]

Bases: object

Specialized handler for executor performance monitoring and metrics collection

__init__(enable_metrics=True, metrics_port=8001)[source]
Parameters:
  • enable_metrics (bool)

  • metrics_port (int)

record_operation_latency(operation, duration)[source]

Record operation latency

Parameters:
record_operation_success(operation, labels=None)[source]

Record operation success

Parameters:
record_operation_failure(operation, error_type, labels=None)[source]

Record operation failure

Parameters:
record_retry(operation, attempt_number)[source]

Record retry

Parameters:
  • operation (str)

  • attempt_number (int)

with_metrics(metric_name, labels=None)[source]

Monitoring decorator

Parameters:
get_metrics_summary()[source]

Get metrics summary

Return type:

Dict[str, Any]

record_operation(operation_type, success=True, duration=None, **kwargs)[source]

Record a general operation for metrics tracking

Parameters:
  • operation_type (str)

  • success (bool)

  • duration (float | None)

record_duration(operation, duration, labels=None)[source]

Record operation duration for metrics tracking

Parameters:

Global Metrics Manager

Global Metrics Manager

This module provides a singleton ExecutorMetrics instance that can be shared across all components in the application. It follows the same pattern as other global managers in the infrastructure layer.

Usage:

# In main.py startup: await initialize_global_metrics()

# In any component: from aiecs.infrastructure.monitoring.global_metrics_manager import get_global_metrics metrics = get_global_metrics()

async aiecs.infrastructure.monitoring.global_metrics_manager.initialize_global_metrics(enable_metrics=True, metrics_port=None, config=None)[source]

Initialize the global ExecutorMetrics instance.

This should be called once during application startup (in main.py lifespan).

Parameters:
  • enable_metrics (bool) – Whether to enable metrics collection (default: True)

  • metrics_port (int | None) – Port for metrics server (default: from env or 8001)

  • config (Dict[str, Any] | None) – Additional configuration options

Returns:

The initialized ExecutorMetrics instance or None if initialization fails

Return type:

ExecutorMetrics | None

Example

@asynccontextmanager async def lifespan(app: FastAPI):

# Startup await initialize_global_metrics() yield # Shutdown await close_global_metrics()

aiecs.infrastructure.monitoring.global_metrics_manager.get_global_metrics()[source]

Get the global ExecutorMetrics instance.

Returns:

The global ExecutorMetrics instance or None if not initialized

Raises:

RuntimeError – If metrics are requested but not initialized

Return type:

ExecutorMetrics | None

Example

metrics = get_global_metrics() if metrics:

metrics.record_operation(‘my_operation’, 1)

async aiecs.infrastructure.monitoring.global_metrics_manager.close_global_metrics()[source]

Close the global metrics instance.

This should be called during application shutdown.

aiecs.infrastructure.monitoring.global_metrics_manager.is_metrics_initialized()[source]

Check if global metrics are initialized.

Returns:

True if metrics are initialized, False otherwise

Return type:

bool

aiecs.infrastructure.monitoring.global_metrics_manager.get_metrics_summary()[source]

Get a summary of the global metrics status.

Returns:

Dictionary containing metrics status information

Return type:

Dict[str, Any]

aiecs.infrastructure.monitoring.global_metrics_manager.record_operation(operation_type, success=True, duration=None, **kwargs)[source]

Record an operation using global metrics.

Parameters:
  • operation_type (str)

  • success (bool)

  • duration (float | None)

aiecs.infrastructure.monitoring.global_metrics_manager.record_duration(operation, duration, labels=None)[source]

Record operation duration using global metrics.

Parameters:
aiecs.infrastructure.monitoring.global_metrics_manager.record_operation_success(operation, labels=None)[source]

Record operation success using global metrics.

Parameters:
aiecs.infrastructure.monitoring.global_metrics_manager.record_operation_failure(operation, error_type, labels=None)[source]

Record operation failure using global metrics.

Parameters:
aiecs.infrastructure.monitoring.global_metrics_manager.record_retry(operation, attempt_number)[source]

Record retry using global metrics.

Parameters:
  • operation (str)

  • attempt_number (int)

Tracing Manager

class aiecs.infrastructure.monitoring.tracing_manager.TracingManager[source]

Bases: object

Specialized handler for distributed tracing and link tracking

__init__(service_name='service_executor', jaeger_host=None, jaeger_port=None, enable_tracing=None)[source]
Parameters:
  • service_name (str)

  • jaeger_host (str | None)

  • jaeger_port (int | None)

  • enable_tracing (bool | None)

start_span(operation_name, parent_span=None, tags=None)[source]

Start a tracing span

Parameters:
  • operation_name (str) – Operation name

  • parent_span (Span | None) – Parent span

  • tags (Dict[str, Any] | None) – Initial tags

Returns:

Span object or None (if tracing is not enabled)

Return type:

Span | None

finish_span(span, tags=None, logs=None, error=None)[source]

Finish tracing span

Parameters:
  • span (Span | None) – Span to finish

  • tags (Dict[str, Any] | None) – Additional tags

  • logs (Dict[str, Any] | None) – Log information

  • error (Exception | None) – Error information

with_tracing(operation_name, tags=None)[source]

Tracing decorator

Parameters:
  • operation_name (str) – Operation name

  • tags (Dict[str, Any] | None) – Initial tags

trace_database_operation(operation, table=None, query=None)[source]

Database operation tracing decorator

Parameters:
  • operation (str)

  • table (str | None)

  • query (str | None)

trace_external_call(service_name, endpoint=None)[source]

External service call tracing decorator

Parameters:
  • service_name (str)

  • endpoint (str | None)

trace_tool_execution(tool_name, operation)[source]

Tool execution tracing decorator

Parameters:
  • tool_name (str)

  • operation (str)

create_child_span(parent_span, operation_name, tags=None)[source]

Create child span

Parameters:
  • parent_span (Span | None)

  • operation_name (str)

  • tags (Dict[str, Any] | None)

Return type:

Span | None

inject_span_context(span, carrier)[source]

Inject span context into carrier (for cross-service propagation)

Parameters:
extract_span_context(carrier)[source]

Extract span context from carrier

Parameters:

carrier (Dict[str, str])

Return type:

Any | None

get_active_span()[source]

Get current active span

Return type:

Span | None

close_tracer()[source]

Close tracer

get_tracer_info()[source]

Get tracer information

Return type:

Dict[str, Any]

Messaging

Celery Task Manager

class aiecs.infrastructure.messaging.celery_task_manager.CeleryTaskManager[source]

Bases: object

Specialized handler for Celery distributed task scheduling and execution

__init__(config)[source]
Parameters:

config (Dict[str, Any])

execute_celery_task(task_name, queue, user_id, task_id, step, mode, service, input_data, context)[source]

Execute Celery task

Parameters:
  • task_name (str) – Task name

  • queue (str) – Queue name (‘fast_tasks’ or ‘heavy_tasks’)

  • user_id (str) – User ID

  • task_id (str) – Task ID

  • step (int) – Step number

  • mode (str) – Service mode

  • service (str) – Service name

  • input_data (Dict[str, Any]) – Input data

  • context (Dict[str, Any]) – Context information

Returns:

Celery AsyncResult object

async execute_task(task_name, input_data, context)[source]

Execute a single task using Celery for asynchronous processing

Parameters:
Return type:

Any

async execute_heavy_task(task_name, input_data, context)[source]

Execute heavy task

Parameters:
Return type:

Any

async execute_dsl_task_step(step, input_data, context)[source]

Execute DSL task step

Parameters:
Return type:

Dict[str, Any]

get_task_result(task_id)[source]

Get task result

Parameters:

task_id (str)

cancel_task(task_id)[source]

Cancel task

Parameters:

task_id (str)

async batch_execute_tasks(tasks)[source]

Batch execute tasks

Parameters:

tasks (List[Dict[str, Any]])

Return type:

List[Any]

get_queue_info()[source]

Get queue information

Return type:

Dict[str, Any]

get_worker_stats()[source]

Get worker statistics

Return type:

Dict[str, Any]

WebSocket Manager

class aiecs.infrastructure.messaging.websocket_manager.UserConfirmation[source]

Bases: BaseModel

proceed: bool
feedback: str | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class aiecs.infrastructure.messaging.websocket_manager.TaskStepResult[source]

Bases: BaseModel

step: str
result: Any
completed: bool
message: str
status: str
error_code: str | None
error_message: str | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class aiecs.infrastructure.messaging.websocket_manager.WebSocketManager[source]

Bases: object

Specialized handler for WebSocket server and client communication

__init__(host='python-middleware-api', port=8765)[source]
Parameters:
async start_server()[source]

Start WebSocket server

async stop_server()[source]

Stop WebSocket server

async notify_user(step_result, user_id, task_id, step)[source]

Notify user of task step result

Parameters:
Return type:

UserConfirmation

async send_heartbeat(user_id, task_id, interval=30)[source]

Send heartbeat message

Parameters:
async broadcast_message(message)[source]

Broadcast message to all connected clients

Parameters:

message (Dict[str, Any])

async send_to_user(user_id, message)[source]

Send message to specific user (requires user connection mapping implementation)

Parameters:
get_connection_count()[source]

Get active connection count

Return type:

int

get_status()[source]

Get WebSocket manager status

Return type:

Dict[str, Any]