Source code for aiecs.core.interface.storage_interface

# /*---------------------------------------------------------------------------------------------
#  *  Copyright (c) IRETBL Corporation. All rights reserved.
#  *  Licensed under the Apache-2.0. See License.txt in the project root for license information.
#  *--------------------------------------------------------------------------------------------*/
"""
Storage interfaces for the middleware architecture.

This module defines the core storage abstractions following the same pattern
as other core interfaces, enabling dependency inversion and clean architecture.
"""

from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, List


[docs] class ISessionStorage(ABC): """Session storage interface - Domain layer abstraction"""
[docs] @abstractmethod async def create_session(self, session_id: str, user_id: str, metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Create a new session."""
[docs] @abstractmethod async def get_session(self, session_id: str) -> Optional[Dict[str, Any]]: """Get session by ID."""
[docs] @abstractmethod async def update_session( self, session_id: str, updates: Optional[Dict[str, Any]] = None, increment_requests: bool = False, add_processing_time: float = 0.0, mark_error: bool = False, ) -> bool: """Update session with activity and metrics."""
[docs] @abstractmethod async def end_session(self, session_id: str, status: str = "completed") -> bool: """End a session and update metrics."""
[docs] class IConversationStorage(ABC): """Conversation storage interface - Domain layer abstraction"""
[docs] @abstractmethod async def add_conversation_message( self, session_id: str, role: str, content: str, metadata: Optional[Dict[str, Any]] = None, ) -> bool: """Add message to conversation history."""
[docs] @abstractmethod async def get_conversation_history(self, session_id: str, limit: int = 50) -> List[Dict[str, Any]]: """Get conversation history for a session."""
[docs] class ICheckpointStorage(ABC): """Checkpoint storage interface - Domain layer abstraction"""
[docs] @abstractmethod async def store_checkpoint( self, thread_id: str, checkpoint_id: str, checkpoint_data: Dict[str, Any], metadata: Optional[Dict[str, Any]] = None, ) -> bool: """Store checkpoint data."""
[docs] @abstractmethod async def get_checkpoint(self, thread_id: str, checkpoint_id: Optional[str] = None) -> Optional[Dict[str, Any]]: """Get checkpoint data. If checkpoint_id is None, get the latest."""
[docs] @abstractmethod async def list_checkpoints(self, thread_id: str, limit: int = 10) -> List[Dict[str, Any]]: """List checkpoints for a thread, ordered by creation time."""
[docs] class ITaskContextStorage(ABC): """Task context storage interface - Domain layer abstraction"""
[docs] @abstractmethod async def get_task_context(self, session_id: str) -> Optional[Any]: """Get TaskContext for a session."""
[docs] @abstractmethod async def store_task_context(self, session_id: str, context: Any) -> bool: """Store TaskContext for a session."""
[docs] class IStorageBackend( ISessionStorage, IConversationStorage, ICheckpointStorage, ITaskContextStorage, ): """ Unified storage backend interface - Domain layer abstraction This interface combines all storage capabilities and follows the same pattern as other core interfaces in the middleware architecture. """
[docs] @abstractmethod async def initialize(self) -> bool: """Initialize the storage backend."""
[docs] @abstractmethod async def close(self): """Close the storage backend."""
[docs] @abstractmethod async def health_check(self) -> Dict[str, Any]: """Perform health check."""
[docs] @abstractmethod async def get_metrics(self) -> Dict[str, Any]: """Get comprehensive metrics."""
[docs] @abstractmethod async def cleanup_expired_sessions(self, max_idle_hours: int = 24) -> int: """Clean up expired sessions and associated data."""
[docs] class IPermanentStorageBackend(ABC): """ Permanent storage backend interface for disk-based cold archive. Used for dual-write alongside Redis (hot cache). Append-only semantics, optimized for analytics and long-term retention. Typical implementation: ClickHouse, PostgreSQL, etc. All methods are fire-and-forget: failures should not block the primary Redis write path. Implementations should handle errors internally. """
[docs] @abstractmethod async def append_session_event( self, session_id: str, user_id: str, event_type: str, payload: Dict[str, Any], created_at: Optional[str] = None, ) -> bool: """Append session create/update/end event for audit/analytics."""
[docs] @abstractmethod async def append_conversation_message( self, session_id: str, role: str, content: str, metadata: Optional[Dict[str, Any]] = None, created_at: Optional[str] = None, ) -> bool: """Append conversation message (append-only)."""
[docs] @abstractmethod async def append_checkpoint( self, thread_id: str, checkpoint_id: str, checkpoint_data: Dict[str, Any], metadata: Optional[Dict[str, Any]] = None, created_at: Optional[str] = None, ) -> bool: """Append checkpoint data."""
[docs] @abstractmethod async def append_checkpoint_writes( self, thread_id: str, checkpoint_id: str, task_id: str, writes_data: List[tuple], created_at: Optional[str] = None, ) -> bool: """Append checkpoint writes."""
[docs] @abstractmethod async def append_conversation_session( self, session_key: str, session_data: Dict[str, Any], created_at: Optional[str] = None, ) -> bool: """Append conversation session metadata."""
[docs] @abstractmethod async def append_task_context_snapshot( self, session_id: str, context_data: Dict[str, Any], created_at: Optional[str] = None, ) -> bool: """Append task context snapshot (versioned)."""
[docs] @abstractmethod async def initialize(self) -> bool: """Initialize the permanent storage backend."""
[docs] @abstractmethod async def close(self) -> None: """Close the permanent storage backend."""
[docs] class ICheckpointerBackend(ABC): """ Checkpointer backend interface for LangGraph integration. This interface defines the minimal contract needed by BaseServiceCheckpointer to work with any storage backend, following dependency inversion principle. """
[docs] @abstractmethod async def put_checkpoint( self, thread_id: str, checkpoint_id: str, checkpoint_data: Dict[str, Any], metadata: Optional[Dict[str, Any]] = None, ) -> bool: """Store a checkpoint for LangGraph workflows."""
[docs] @abstractmethod async def get_checkpoint(self, thread_id: str, checkpoint_id: Optional[str] = None) -> Optional[Dict[str, Any]]: """Retrieve a checkpoint for LangGraph workflows."""
[docs] @abstractmethod async def list_checkpoints(self, thread_id: str, limit: int = 10) -> List[Dict[str, Any]]: """List checkpoints for LangGraph workflows."""
[docs] @abstractmethod async def put_writes( self, thread_id: str, checkpoint_id: str, task_id: str, writes_data: List[tuple], ) -> bool: """Store intermediate writes for a checkpoint."""
[docs] @abstractmethod async def get_writes(self, thread_id: str, checkpoint_id: str) -> List[tuple]: """Get intermediate writes for a checkpoint."""