Storage Interfaces Technical Documentation

Overview

Design Motivation and Problem Background

When building large-scale AI application systems, data storage faces the following core challenges:

1. Diverse Storage Requirements

  • Different types of data: session data, conversation history, checkpoints, task context, etc.

  • Different data have different storage characteristics (temporary, persistent, query requirements)

  • Lack of unified storage abstraction layer leads to scattered storage logic

2. Storage Backend Heterogeneity

  • Support multiple storage backends (Redis, PostgreSQL, file system, cloud storage)

  • Different storage backends have different APIs and characteristics

  • Switching storage backends requires extensive code modifications

3. Data Consistency and Reliability

  • Data consistency guarantees in distributed environments

  • Data backup and recovery mechanisms

  • Degradation and recovery strategies when storage fails

4. Performance and Scalability Challenges

  • Performance optimization under heavy concurrent access

  • Data sharding and load balancing

  • Caching strategies and query optimization

Storage Interface System Solution:

  • Interface Segregation Principle: Separate different storage responsibilities into independent interfaces

  • Unified Abstraction Layer: Provide unified storage operation interfaces

  • Dependency Inversion: High-level modules depend on abstract interfaces, low-level modules implement interfaces

  • Plugin Support: Support dynamic switching of multiple storage backends

  • Type Safety: Interface definitions based on Python type system

Component Positioning

storage_interface.py is the storage interface definition of the AIECS system, located in the Domain Layer, defining all abstract interfaces related to storage. As the storage contract layer of the system, it provides type-safe, clearly-responsible storage operation specifications.

Component Type and Positioning

Component Type

Domain Interface Component - Located in the Domain Layer, belongs to system contract definitions

Architecture Layers

┌─────────────────────────────────────────┐
│         Application Layer               │  ← Components using storage interfaces
│  (ContextEngine, ServiceLayer)         │
└─────────────────┬───────────────────────┘
                  │
┌─────────────────▼───────────────────────┐
│         Domain Layer                    │  ← Storage interfaces layer
│  (Storage Interfaces, Data Contracts)  │
└─────────────────┬───────────────────────┘
                  │
┌─────────────────▼───────────────────────┐
│       Infrastructure Layer              │  ← Components implementing storage interfaces
│  (Redis, PostgreSQL, FileStorage)      │
└─────────────────┬───────────────────────┘
                  │
┌─────────────────▼───────────────────────┐
│         External Storage                │  ← External storage systems
│  (Redis, PostgreSQL, GCS, S3)          │
└─────────────────────────────────────────┘

Upstream Components (Consumers)

1. Domain Services

  • ContextEngine (domain/context/context_engine.py)

  • SessionManager (if exists)

  • ConversationManager (if exists)

2. Application Layer Services

  • TaskService (if exists)

  • ExecutionService (if exists)

  • AnalyticsService (if exists)

3. Infrastructure Layer Implementations

  • DatabaseManager (infrastructure/persistence/database_manager.py)

  • RedisClient (infrastructure/persistence/redis_client.py)

  • FileStorage (infrastructure/persistence/file_storage.py)

Downstream Components (Dependencies)

1. Python ABC System

  • Purpose: Provide abstract base class support

  • Functionality: Interface definition, abstract method declaration

  • Dependency Type: Language feature dependency

2. Domain Models

  • TaskContext (domain/task/task_context.py)

  • Session (if exists)

  • Conversation (if exists)

3. Type System

  • Purpose: Provide type checking and type safety

  • Functionality: Parameter type validation, return value type checking

  • Dependency Type: Python type system

Core Interfaces Explained

1. ISessionStorage - Session Storage Interface

class ISessionStorage(ABC):
    """Session storage interface - Domain layer abstraction"""
    
    @abstractmethod
    async def create_session(
        self,
        session_id: str,
        user_id: str,
        metadata: Dict[str, Any] = None
    ) -> Dict[str, Any]:
        """Create new session"""
        pass
    
    @abstractmethod
    async def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
        """Get session by ID"""
        pass
    
    @abstractmethod
    async def update_session(
        self,
        session_id: str,
        updates: Dict[str, Any] = None,
        increment_requests: bool = False,
        add_processing_time: float = 0.0,
        mark_error: bool = False
    ) -> bool:
        """Update session information and metrics"""
        pass
    
    @abstractmethod
    async def end_session(self, session_id: str, status: str = "completed") -> bool:
        """End session and update metrics"""
        pass

Responsibilities:

  • Session Lifecycle Management: Create, get, update, end sessions

  • Session Metrics Tracking: Request counting, processing time, error statistics

  • Session Metadata Management: Store and update session-related metadata

Implementation Requirements:

  • Must support complete session lifecycle management

  • Must provide session metrics and statistics functionality

  • Should support concurrent access control for sessions

2. IConversationStorage - Conversation Storage Interface

class IConversationStorage(ABC):
    """Conversation storage interface - Domain layer abstraction"""
    
    @abstractmethod
    async def add_conversation_message(
        self,
        session_id: str,
        role: str,
        content: str,
        metadata: Dict[str, Any] = None
    ) -> bool:
        """Add conversation message"""
        pass
    
    @abstractmethod
    async def get_conversation_history(
        self,
        session_id: str,
        limit: int = 50
    ) -> List[Dict[str, Any]]:
        """Get conversation history"""
        pass

Responsibilities:

  • Conversation Message Management: Add and retrieve conversation messages

  • Conversation History Query: Support pagination and limit queries

  • Conversation Context Maintenance: Maintain conversation continuity and context

Implementation Requirements:

  • Must support chronological storage and retrieval of messages

  • Must provide efficient history query functionality

  • Should support conversation persistence and recovery

3. ICheckpointStorage - Checkpoint Storage Interface

class ICheckpointStorage(ABC):
    """Checkpoint storage interface - Domain layer abstraction"""
    
    @abstractmethod
    async def store_checkpoint(
        self,
        thread_id: str,
        checkpoint_id: str,
        checkpoint_data: Dict[str, Any],
        metadata: Dict[str, Any] = None
    ) -> bool:
        """Store checkpoint data"""
        pass
    
    @abstractmethod
    async def get_checkpoint(
        self,
        thread_id: str,
        checkpoint_id: str = None
    ) -> Optional[Dict[str, Any]]:
        """Get checkpoint data"""
        pass
    
    @abstractmethod
    async def list_checkpoints(
        self,
        thread_id: str,
        limit: int = 10
    ) -> List[Dict[str, Any]]:
        """List checkpoints"""
        pass

Responsibilities:

  • Checkpoint Management: Store and retrieve workflow checkpoints

  • State Recovery: Support workflow state recovery from checkpoints

  • Version Control: Manage checkpoint versions and history

Implementation Requirements:

  • Must support atomic checkpoint storage

  • Must provide fast checkpoint retrieval functionality

  • Should support checkpoint version management and cleanup

4. ITaskContextStorage - Task Context Storage Interface

class ITaskContextStorage(ABC):
    """Task context storage interface - Domain layer abstraction"""
    
    @abstractmethod
    async def get_task_context(self, session_id: str) -> Optional[Any]:
        """Get task context"""
        pass
    
    @abstractmethod
    async def store_task_context(self, session_id: str, context: Any) -> bool:
        """Store task context"""
        pass

Responsibilities:

  • Task Context Management: Store and retrieve task execution context

  • State Persistence: Support persistent storage of task state

  • Context Sharing: Support cross-session context sharing

Implementation Requirements:

  • Must support context serialization and deserialization

  • Must provide fast context access functionality

  • Should support context version management

5. IStorageBackend - Unified Storage Backend Interface

class IStorageBackend(
    ISessionStorage,
    IConversationStorage,
    ICheckpointStorage,
    ITaskContextStorage
):
    """Unified storage backend interface - Domain layer abstraction"""
    
    @abstractmethod
    async def initialize(self) -> bool:
        """Initialize storage backend"""
        pass
    
    @abstractmethod
    async def close(self):
        """Close storage backend"""
        pass
    
    @abstractmethod
    async def health_check(self) -> Dict[str, Any]:
        """Perform health check"""
        pass
    
    @abstractmethod
    async def get_metrics(self) -> Dict[str, Any]:
        """Get comprehensive metrics"""
        pass
    
    @abstractmethod
    async def cleanup_expired_sessions(self, max_idle_hours: int = 24) -> int:
        """Cleanup expired sessions"""
        pass

Responsibilities:

  • Unified Storage Interface: Integrate all storage functionality

  • Lifecycle Management: Initialize and close storage backend

  • Health Monitoring: Provide health checks and metrics monitoring

  • Data Cleanup: Support automatic cleanup of expired data

6. ICheckpointerBackend - Checkpoint Backend Interface

class ICheckpointerBackend(ABC):
    """Checkpoint backend interface - LangGraph integration"""
    
    @abstractmethod
    async def put_checkpoint(
        self,
        thread_id: str,
        checkpoint_id: str,
        checkpoint_data: Dict[str, Any],
        metadata: Dict[str, Any] = None
    ) -> bool:
        """Store LangGraph workflow checkpoint"""
        pass
    
    @abstractmethod
    async def get_checkpoint(
        self,
        thread_id: str,
        checkpoint_id: str = None
    ) -> Optional[Dict[str, Any]]:
        """Retrieve LangGraph workflow checkpoint"""
        pass
    
    @abstractmethod
    async def list_checkpoints(
        self,
        thread_id: str,
        limit: int = 10
    ) -> List[Dict[str, Any]]:
        """List LangGraph workflow checkpoints"""
        pass
    
    @abstractmethod
    async def put_writes(
        self,
        thread_id: str,
        checkpoint_id: str,
        task_id: str,
        writes_data: List[tuple]
    ) -> bool:
        """Store intermediate writes for checkpoint"""
        pass
    
    @abstractmethod
    async def get_writes(
        self,
        thread_id: str,
        checkpoint_id: str
    ) -> List[tuple]:
        """Get intermediate writes for checkpoint"""
        pass

Responsibilities:

  • LangGraph Integration: Support checkpoint functionality for LangGraph workflows

  • Intermediate State Management: Store and retrieve workflow intermediate states

  • Workflow Recovery: Support workflow recovery from any checkpoint

Design Patterns Explained

1. Interface Segregation Principle

# Separate different storage responsibilities into independent interfaces
class ISessionStorage(ABC):        # Session storage
class IConversationStorage(ABC):   # Conversation storage
class ICheckpointStorage(ABC):     # Checkpoint storage
class ITaskContextStorage(ABC):    # Task context storage

Advantages:

  • Single Responsibility: Each interface is responsible for only specific type of storage

  • Easy to Implement: Implementation classes can selectively implement relevant interfaces

  • Easy to Test: Can test each storage functionality independently

2. Dependency Inversion Principle

# High-level modules depend on abstract interfaces
class ContextEngine:
    def __init__(self, storage_backend: IStorageBackend):
        self.storage = storage_backend

Advantages:

  • Loose Coupling: High-level modules don’t depend on specific storage implementations

  • Extensible: Can easily replace storage backends

  • Testable: Can use mock objects for testing

3. Composition Pattern

# Form unified interface by composing multiple interfaces
class IStorageBackend(
    ISessionStorage,
    IConversationStorage,
    ICheckpointStorage,
    ITaskContextStorage
):
    # Unified storage interface

Advantages:

  • Function Integration: Integrate related functionality into unified interface

  • Backward Compatibility: Maintain compatibility with existing code

  • Easy to Use: Provide unified storage operation interface

Interface Implementation Standards

1. Basic Implementation Requirements

Async Operation Support

class StorageBackendImpl(IStorageBackend):
    async def create_session(self, session_id: str, user_id: str, 
                           metadata: Dict[str, Any] = None) -> Dict[str, Any]:
        """Async create session"""
        # Implementation logic
        pass
    
    async def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
        """Async get session"""
        # Implementation logic
        pass

Error Handling Standards

class StorageError(Exception):
    """Storage error base class"""
    pass

class SessionNotFoundError(StorageError):
    """Session not found error"""
    pass

class StorageConnectionError(StorageError):
    """Storage connection error"""
    pass

class StorageBackendImpl(IStorageBackend):
    async def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
        try:
            # Storage operation
            return session_data
        except ConnectionError as e:
            raise StorageConnectionError(f"Failed to connect to storage: {e}") from e
        except KeyError:
            raise SessionNotFoundError(f"Session {session_id} not found")

2. Data Serialization Standards

Session Data Format

def serialize_session(session_data: Dict[str, Any]) -> Dict[str, Any]:
    """Serialize session data"""
    return {
        "session_id": session_data["session_id"],
        "user_id": session_data["user_id"],
        "created_at": session_data["created_at"].isoformat(),
        "updated_at": session_data["updated_at"].isoformat(),
        "status": session_data["status"],
        "metadata": session_data.get("metadata", {}),
        "metrics": {
            "request_count": session_data.get("request_count", 0),
            "total_processing_time": session_data.get("total_processing_time", 0.0),
            "error_count": session_data.get("error_count", 0)
        }
    }

def deserialize_session(data: Dict[str, Any]) -> Dict[str, Any]:
    """Deserialize session data"""
    return {
        "session_id": data["session_id"],
        "user_id": data["user_id"],
        "created_at": datetime.fromisoformat(data["created_at"]),
        "updated_at": datetime.fromisoformat(data["updated_at"]),
        "status": data["status"],
        "metadata": data.get("metadata", {}),
        "request_count": data.get("metrics", {}).get("request_count", 0),
        "total_processing_time": data.get("metrics", {}).get("total_processing_time", 0.0),
        "error_count": data.get("metrics", {}).get("error_count", 0)
    }

Conversation Message Format

def serialize_message(role: str, content: str, metadata: Dict[str, Any] = None) -> Dict[str, Any]:
    """Serialize conversation message"""
    return {
        "role": role,
        "content": content,
        "timestamp": datetime.utcnow().isoformat(),
        "metadata": metadata or {}
    }

3. Performance Optimization Standards

Batch Operation Support

class OptimizedStorageBackend(IStorageBackend):
    async def batch_create_sessions(self, sessions: List[Dict[str, Any]]) -> List[bool]:
        """Batch create sessions"""
        # Implement batch operation logic
        pass
    
    async def batch_get_sessions(self, session_ids: List[str]) -> List[Optional[Dict[str, Any]]]:
        """Batch get sessions"""
        # Implement batch query logic
        pass

Caching Strategy

from functools import lru_cache
import asyncio

class CachedStorageBackend(IStorageBackend):
    def __init__(self, underlying_storage: IStorageBackend, cache_size: int = 1000):
        self.storage = underlying_storage
        self._cache = {}
        self._cache_size = cache_size
    
    async def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
        """Get session with caching"""
        if session_id in self._cache:
            return self._cache[session_id]
        
        session = await self.storage.get_session(session_id)
        if session:
            self._cache[session_id] = session
            if len(self._cache) > self._cache_size:
                # Simple LRU cleanup
                oldest_key = next(iter(self._cache))
                del self._cache[oldest_key]
        
        return session

Usage Examples

1. Basic Storage Implementation

Redis Storage Implementation

import redis.asyncio as redis
from aiecs.core.interface.storage_interface import IStorageBackend

class RedisStorageBackend(IStorageBackend):
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url)
        self.prefix = "aiecs:"
    
    async def initialize(self) -> bool:
        """Initialize Redis connection"""
        try:
            await self.redis.ping()
            return True
        except Exception as e:
            print(f"Failed to initialize Redis: {e}")
            return False
    
    async def create_session(self, session_id: str, user_id: str, 
                           metadata: Dict[str, Any] = None) -> Dict[str, Any]:
        """Create session"""
        session_data = {
            "session_id": session_id,
            "user_id": user_id,
            "created_at": datetime.utcnow().isoformat(),
            "updated_at": datetime.utcnow().isoformat(),
            "status": "active",
            "metadata": metadata or {},
            "request_count": 0,
            "total_processing_time": 0.0,
            "error_count": 0
        }
        
        key = f"{self.prefix}session:{session_id}"
        await self.redis.hset(key, mapping=session_data)
        await self.redis.expire(key, 86400)  # 24 hour expiration
        
        return session_data
    
    async def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
        """Get session"""
        key = f"{self.prefix}session:{session_id}"
        data = await self.redis.hgetall(key)
        
        if not data:
            return None
        
        # Convert data types
        data["request_count"] = int(data.get("request_count", 0))
        data["total_processing_time"] = float(data.get("total_processing_time", 0.0))
        data["error_count"] = int(data.get("error_count", 0))
        
        return data
    
    async def add_conversation_message(self, session_id: str, role: str, 
                                     content: str, metadata: Dict[str, Any] = None) -> bool:
        """Add conversation message"""
        message = {
            "role": role,
            "content": content,
            "timestamp": datetime.utcnow().isoformat(),
            "metadata": metadata or {}
        }
        
        key = f"{self.prefix}conversation:{session_id}"
        await self.redis.lpush(key, json.dumps(message))
        await self.redis.expire(key, 86400)  # 24 hour expiration
        
        return True
    
    async def get_conversation_history(self, session_id: str, limit: int = 50) -> List[Dict[str, Any]]:
        """Get conversation history"""
        key = f"{self.prefix}conversation:{session_id}"
        messages = await self.redis.lrange(key, 0, limit - 1)
        
        return [json.loads(msg) for msg in messages]
    
    async def close(self):
        """Close connection"""
        await self.redis.close()

PostgreSQL Storage Implementation

import asyncpg
from aiecs.core.interface.storage_interface import IStorageBackend

class PostgreSQLStorageBackend(IStorageBackend):
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self.pool = None
    
    async def initialize(self) -> bool:
        """Initialize PostgreSQL connection pool"""
        try:
            self.pool = await asyncpg.create_pool(self.connection_string)
            
            # Create table structure
            async with self.pool.acquire() as conn:
                await conn.execute("""
                    CREATE TABLE IF NOT EXISTS sessions (
                        session_id VARCHAR(255) PRIMARY KEY,
                        user_id VARCHAR(255) NOT NULL,
                        created_at TIMESTAMP DEFAULT NOW(),
                        updated_at TIMESTAMP DEFAULT NOW(),
                        status VARCHAR(50) DEFAULT 'active',
                        metadata JSONB,
                        request_count INTEGER DEFAULT 0,
                        total_processing_time FLOAT DEFAULT 0.0,
                        error_count INTEGER DEFAULT 0
                    )
                """)
                
                await conn.execute("""
                    CREATE TABLE IF NOT EXISTS conversations (
                        id SERIAL PRIMARY KEY,
                        session_id VARCHAR(255) NOT NULL,
                        role VARCHAR(50) NOT NULL,
                        content TEXT NOT NULL,
                        timestamp TIMESTAMP DEFAULT NOW(),
                        metadata JSONB,
                        FOREIGN KEY (session_id) REFERENCES sessions(session_id)
                    )
                """)
            
            return True
        except Exception as e:
            print(f"Failed to initialize PostgreSQL: {e}")
            return False
    
    async def create_session(self, session_id: str, user_id: str, 
                           metadata: Dict[str, Any] = None) -> Dict[str, Any]:
        """Create session"""
        async with self.pool.acquire() as conn:
            await conn.execute("""
                INSERT INTO sessions (session_id, user_id, metadata)
                VALUES ($1, $2, $3)
                ON CONFLICT (session_id) DO UPDATE SET
                    user_id = EXCLUDED.user_id,
                    metadata = EXCLUDED.metadata,
                    updated_at = NOW()
            """, session_id, user_id, json.dumps(metadata or {}))
            
            # Get created session
            row = await conn.fetchrow("""
                SELECT * FROM sessions WHERE session_id = $1
            """, session_id)
            
            return dict(row)
    
    async def close(self):
        """Close connection pool"""
        if self.pool:
            await self.pool.close()

2. Storage Factory Pattern

from enum import Enum
from typing import Union

class StorageType(Enum):
    REDIS = "redis"
    POSTGRESQL = "postgresql"
    MEMORY = "memory"

class StorageFactory:
    """Storage backend factory"""
    
    @staticmethod
    def create_storage_backend(
        storage_type: StorageType,
        config: Dict[str, Any]
    ) -> IStorageBackend:
        """Create storage backend instance"""
        if storage_type == StorageType.REDIS:
            return RedisStorageBackend(config["redis_url"])
        elif storage_type == StorageType.POSTGRESQL:
            return PostgreSQLStorageBackend(config["postgresql_url"])
        elif storage_type == StorageType.MEMORY:
            return MemoryStorageBackend()
        else:
            raise ValueError(f"Unsupported storage type: {storage_type}")

# Usage example
config = {
    "redis_url": "redis://localhost:6379/0",
    "postgresql_url": "postgresql://user:password@localhost/aiecs"
}

# Create Redis storage backend
redis_storage = StorageFactory.create_storage_backend(
    StorageType.REDIS, config
)

# Create PostgreSQL storage backend
postgres_storage = StorageFactory.create_storage_backend(
    StorageType.POSTGRESQL, config
)

3. Storage Adapter Pattern

class StorageAdapter(IStorageBackend):
    """Storage adapter - Support multiple storage backends"""
    
    def __init__(self, primary_storage: IStorageBackend, 
                 fallback_storage: IStorageBackend = None):
        self.primary = primary_storage
        self.fallback = fallback_storage
    
    async def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
        """Get session - Support failover"""
        try:
            return await self.primary.get_session(session_id)
        except Exception as e:
            if self.fallback:
                print(f"Primary storage failed, using fallback: {e}")
                return await self.fallback.get_session(session_id)
            raise
    
    async def create_session(self, session_id: str, user_id: str, 
                           metadata: Dict[str, Any] = None) -> Dict[str, Any]:
        """Create session - Dual-write strategy"""
        result = await self.primary.create_session(session_id, user_id, metadata)
        
        if self.fallback:
            try:
                await self.fallback.create_session(session_id, user_id, metadata)
            except Exception as e:
                print(f"Fallback storage failed: {e}")
        
        return result

Maintenance Guide

1. Daily Maintenance

Storage Health Check

class StorageHealthChecker:
    """Storage health checker"""
    
    def __init__(self, storage_backend: IStorageBackend):
        self.storage = storage_backend
    
    async def check_health(self) -> Dict[str, Any]:
        """Perform health check"""
        health_status = {
            "status": "healthy",
            "timestamp": datetime.utcnow().isoformat(),
            "checks": {}
        }
        
        # Check storage connection
        try:
            await self.storage.health_check()
            health_status["checks"]["connection"] = "ok"
        except Exception as e:
            health_status["checks"]["connection"] = f"error: {e}"
            health_status["status"] = "unhealthy"
        
        # Check storage performance
        try:
            start_time = time.time()
            await self.storage.get_metrics()
            response_time = time.time() - start_time
            health_status["checks"]["performance"] = f"ok ({response_time:.3f}s)"
        except Exception as e:
            health_status["checks"]["performance"] = f"error: {e}"
            health_status["status"] = "unhealthy"
        
        return health_status

Data Migration Tool

class StorageMigrator:
    """Storage migration tool"""
    
    def __init__(self, source_storage: IStorageBackend, 
                 target_storage: IStorageBackend):
        self.source = source_storage
        self.target = target_storage
    
    async def migrate_sessions(self, batch_size: int = 100) -> int:
        """Migrate session data"""
        migrated_count = 0
        
        # Get all session IDs (need to implement list_sessions method)
        session_ids = await self._get_all_session_ids()
        
        for i in range(0, len(session_ids), batch_size):
            batch = session_ids[i:i + batch_size]
            
            for session_id in batch:
                try:
                    # Get data from source storage
                    session_data = await self.source.get_session(session_id)
                    if session_data:
                        # Write to target storage
                        await self.target.create_session(
                            session_data["session_id"],
                            session_data["user_id"],
                            session_data.get("metadata")
                        )
                        migrated_count += 1
                except Exception as e:
                    print(f"Failed to migrate session {session_id}: {e}")
        
        return migrated_count
    
    async def _get_all_session_ids(self) -> List[str]:
        """Get all session IDs (need to adjust based on specific implementation)"""
        # This needs to get all session IDs based on specific storage implementation
        # For example, from Redis KEYS command or PostgreSQL SELECT query
        pass

2. Troubleshooting

Common Issue Diagnosis

Issue 1: Storage Connection Failed

# Error message
StorageConnectionError: Failed to connect to storage: Connection refused

# Diagnosis steps
async def diagnose_connection_issue(storage_backend: IStorageBackend):
    """Diagnose storage connection issue"""
    try:
        # Check storage initialization
        initialized = await storage_backend.initialize()
        print(f"Storage initialized: {initialized}")
        
        # Check health status
        health = await storage_backend.health_check()
        print(f"Health check: {health}")
        
        # Check metrics
        metrics = await storage_backend.get_metrics()
        print(f"Metrics: {metrics}")
        
    except Exception as e:
        print(f"Connection diagnosis failed: {e}")
        
        # Check network connection
        import socket
        try:
            # Assume Redis connection
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            result = sock.connect_ex(('localhost', 6379))
            sock.close()
            print(f"Network connectivity: {'ok' if result == 0 else 'failed'}")
        except Exception as net_e:
            print(f"Network check failed: {net_e}")

Issue 2: Data Consistency Issue

async def diagnose_data_consistency(storage_backend: IStorageBackend):
    """Diagnose data consistency issue"""
    # Create test session
    test_session_id = "test_consistency"
    test_user_id = "test_user"
    
    try:
        # Create session
        session = await storage_backend.create_session(
            test_session_id, test_user_id
        )
        print(f"Session created: {session}")
        
        # Immediately read session
        retrieved_session = await storage_backend.get_session(test_session_id)
        print(f"Session retrieved: {retrieved_session}")
        
        # Compare data
        if session == retrieved_session:
            print("Data consistency: OK")
        else:
            print("Data consistency: FAILED")
            print(f"Created: {session}")
            print(f"Retrieved: {retrieved_session}")
        
        # Cleanup test data
        await storage_backend.end_session(test_session_id)
        
    except Exception as e:
        print(f"Consistency check failed: {e}")

3. Performance Optimization

Connection Pool Management

class OptimizedStorageBackend(IStorageBackend):
    def __init__(self, connection_config: Dict[str, Any]):
        self.connection_config = connection_config
        self.pool = None
        self.pool_size = connection_config.get("pool_size", 10)
        self.max_overflow = connection_config.get("max_overflow", 20)
    
    async def initialize(self) -> bool:
        """Initialize connection pool"""
        try:
            self.pool = await asyncpg.create_pool(
                self.connection_config["url"],
                min_size=self.pool_size,
                max_size=self.pool_size + self.max_overflow
            )
            return True
        except Exception as e:
            print(f"Failed to initialize connection pool: {e}")
            return False
    
    async def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
        """Get session using connection pool"""
        async with self.pool.acquire() as conn:
            row = await conn.fetchrow(
                "SELECT * FROM sessions WHERE session_id = $1", 
                session_id
            )
            return dict(row) if row else None

Batch Operation Optimization

class BatchOptimizedStorageBackend(IStorageBackend):
    def __init__(self, underlying_storage: IStorageBackend):
        self.storage = underlying_storage
        self.batch_size = 100
        self.batch_timeout = 1.0  # seconds
        self._batch_queue = []
        self._batch_lock = asyncio.Lock()
    
    async def add_conversation_message(self, session_id: str, role: str, 
                                     content: str, metadata: Dict[str, Any] = None) -> bool:
        """Batch add conversation messages"""
        message = {
            "session_id": session_id,
            "role": role,
            "content": content,
            "metadata": metadata or {}
        }
        
        async with self._batch_lock:
            self._batch_queue.append(message)
            
            if len(self._batch_queue) >= self.batch_size:
                await self._flush_batch()
        
        return True
    
    async def _flush_batch(self):
        """Flush batch queue"""
        if not self._batch_queue:
            return
        
        # Execute batch insert
        await self._execute_batch_insert(self._batch_queue)
        self._batch_queue.clear()
    
    async def _execute_batch_insert(self, messages: List[Dict[str, Any]]):
        """Execute batch insert"""
        # Implement batch insert logic
        pass

Monitoring and Logging

Storage Monitoring Metrics

class StorageMonitor:
    """Storage monitor"""
    
    def __init__(self, storage_backend: IStorageBackend):
        self.storage = storage_backend
        self.metrics = {
            "operations": defaultdict(int),
            "errors": defaultdict(int),
            "latencies": defaultdict(list)
        }
    
    async def record_operation(self, operation: str, latency: float, success: bool):
        """Record operation metrics"""
        self.metrics["operations"][operation] += 1
        self.metrics["latencies"][operation].append(latency)
        
        if not success:
            self.metrics["errors"][operation] += 1
    
    def get_performance_report(self) -> Dict[str, Any]:
        """Get performance report"""
        report = {}
        
        for operation in self.metrics["operations"]:
            latencies = self.metrics["latencies"][operation]
            errors = self.metrics["errors"][operation]
            operations = self.metrics["operations"][operation]
            
            report[operation] = {
                "total_operations": operations,
                "error_count": errors,
                "error_rate": errors / operations if operations > 0 else 0,
                "avg_latency": sum(latencies) / len(latencies) if latencies else 0,
                "min_latency": min(latencies) if latencies else 0,
                "max_latency": max(latencies) if latencies else 0
            }
        
        return report

Storage Logging

import logging
from typing import Dict, Any

class StorageLogger:
    """Storage logger"""
    
    def __init__(self, storage_backend: IStorageBackend):
        self.storage = storage_backend
        self.logger = logging.getLogger(__name__)
    
    async def log_operation(self, operation: str, session_id: str, 
                          success: bool, latency: float, error: str = None):
        """Log storage operation"""
        log_data = {
            "operation": operation,
            "session_id": session_id,
            "success": success,
            "latency": latency,
            "timestamp": datetime.utcnow().isoformat()
        }
        
        if error:
            log_data["error"] = error
        
        if success:
            self.logger.info(f"Storage operation completed: {log_data}")
        else:
            self.logger.error(f"Storage operation failed: {log_data}")
    
    async def log_health_check(self, health_status: Dict[str, Any]):
        """Log health check"""
        self.logger.info(f"Storage health check: {health_status}")
        
        if health_status.get("status") != "healthy":
            self.logger.warning(f"Storage health issues detected: {health_status}")

Version History

  • v1.0.0: Initial version, basic storage interface definitions

  • v1.1.0: Added session storage interface

  • v1.2.0: Added conversation storage interface

  • v1.3.0: Added checkpoint storage interface

  • v1.4.0: Added task context storage interface

  • v1.5.0: Added unified storage backend interface

  • v1.6.0: Added LangGraph checkpoint backend interface