ContextEngine Technical Documentation

Overview

Design Motivation and Problem Background

When building large-scale AI application systems, context and session management face the following core challenges:

1. Multi-Session Management Complexity

  • Need to support concurrent management of multiple user sessions

  • Session state needs to maintain consistency across multiple requests

  • Lack of unified session lifecycle management mechanism

2. Conversation History Management Difficulties

  • Large amounts of conversation messages require efficient storage and retrieval

  • Conversation context needs to maintain continuity over long periods

  • Lack of conversation history optimization and compression mechanisms

3. Checkpoint and State Persistence

  • Complex workflows need to support checkpoints and state recovery

  • State data needs to remain available after system restarts

  • Lack of unified checkpoint management mechanism

4. Performance and Scalability Challenges

  • Large numbers of concurrent sessions require high-performance storage support

  • Memory usage needs optimization to avoid resource exhaustion

  • Lack of effective caching and cleanup mechanisms

ContextEngine’s Solution:

  • Unified Session Management: Provides complete session lifecycle management

  • Multi-Backend Storage Support: Supports automatic switching between Redis and memory storage

  • Dual-Write Architecture: Optional ClickHouse for permanent cold storage (Redis hot + ClickHouse cold)

  • Conversation History Optimization: Intelligent conversation history management and compression

  • Checkpoint Integration: Deep integration with LangGraph checkpoint system

  • Performance Monitoring: Provides detailed performance metrics and health checks

Component Positioning

context_engine.py is a core domain service of the AIECS system, located in the Domain Layer, implementing storage interfaces and checkpoint backend interfaces. As the system’s context management core, it provides advanced session management, conversation tracking, and persistent storage capabilities.

Component Type and Positioning

Component Type

Domain Service Component - Located in the Domain Layer, belongs to the business logic layer

Architecture Layers

┌─────────────────────────────────────────┐
│         Application Layer               │  ← Components using ContextEngine
│  (BaseAIService, ServiceLayer)          │
└─────────────────┬───────────────────────┘
                  │
┌─────────────────▼───────────────────────┐
│         Domain Layer                    │  ← ContextEngine layer
│  (ContextEngine, Business Logic)        │
└─────────────────┬───────────────────────┘
                  │
┌─────────────────▼───────────────────────┐
│       Infrastructure Layer              │  ← Components ContextEngine depends on
│  (RedisClient, ClickHousePermanentBackend) │
└─────────────────┬───────────────────────┘
                  │
┌─────────────────▼───────────────────────┐
│         External Services               │  ← External storage systems
│  (Redis, ClickHouse, FileSystem)        │
└─────────────────────────────────────────┘

Upstream Components (Consumers)

1. AI Service Layer

  • BaseAIService (if exists)

  • ServiceLayer (if exists)

  • AgentService (if exists)

2. Application Layer Services

  • TaskService (if exists)

  • ExecutionService (if exists)

  • ConversationService (if exists)

3. Workflow Engine

  • LangGraph Workflows (via checkpoint interface)

  • BaseServiceCheckpointer (if exists)

  • Workflow Executors (if exists)

Downstream Components (Dependencies)

1. Storage Interfaces

  • IStorageBackend (core/interface/storage_interface.py)

  • ICheckpointerBackend (core/interface/storage_interface.py)

2. Infrastructure Layer

  • RedisClient (infrastructure/persistence/redis_client.py)

  • ClickHousePermanentBackend (infrastructure/persistence/clickhouse_permanent_backend.py) - Optional dual-write for permanent storage

  • ClickHouseClient (infrastructure/persistence/clickhouse_client.py)

  • ContextEngine Client (infrastructure/persistence/context_engine_client.py) - Global singleton initialization

3. Domain Models

  • TaskContext (domain/task/task_context.py)

  • SessionMetrics (internally defined)

  • ConversationMessage (internally defined)

4. Conversation Models

  • ConversationParticipant (domain/context/conversation_models.py)

  • ConversationSession (domain/context/conversation_models.py)

  • AgentCommunicationMessage (domain/context/conversation_models.py)

Core Functionality

1. Session Management

Session Creation and Lifecycle

async def create_session(
    self,
    session_id: str,
    user_id: str,
    metadata: Dict[str, Any] = None
) -> SessionMetrics:
    """Create a new session"""
    now = datetime.utcnow()
    session = SessionMetrics(
        session_id=session_id,
        user_id=user_id,
        created_at=now,
        last_activity=now,
        status="active"
    )
    await self._store_session(session)
    return session

Features:

  • Unique Session Identifier: Supports custom or auto-generated session IDs

  • User Association: Each session is associated with a specific user

  • Metadata Support: Supports custom session metadata

  • State Tracking: Real-time tracking of session state and activity

Session Updates and Metrics Tracking

async def update_session(
    self,
    session_id: str,
    updates: Dict[str, Any] = None,
    increment_requests: bool = False,
    add_processing_time: float = 0.0,
    mark_error: bool = False
) -> bool:
    """Update session information and metrics"""
    session = await self.get_session(session_id)
    if not session:
        return False
    
    # Update metrics
    if increment_requests:
        session.request_count += 1
    if add_processing_time > 0:
        session.total_processing_time += add_processing_time
    if mark_error:
        session.error_count += 1
    
    session.last_activity = datetime.utcnow()
    await self._store_session(session)
    return True

Features:

  • Request Counting: Automatically tracks session request count

  • Processing Time Statistics: Accumulates session processing time

  • Error Statistics: Tracks session error count

  • Activity Time Updates: Automatically updates last activity time

2. Conversation History Management

Conversation Message Storage

async def add_conversation_message(
    self,
    session_id: str,
    role: str,
    content: str,
    metadata: Dict[str, Any] = None
) -> bool:
    """Add a conversation message"""
    message = ConversationMessage(
        role=role,
        content=content,
        timestamp=datetime.utcnow(),
        metadata=metadata or {}
    )
    await self._store_conversation_message(session_id, message)
    return True

Features:

  • Structured Messages: Supports role, content, timestamp, metadata

  • Sequential Storage: Maintains message chronological order

  • Metadata Support: Supports message-level custom metadata

  • Automatic Timestamps: Automatically records message creation time

Conversation History Retrieval

async def get_conversation_history(
    self,
    session_id: str,
    limit: int = 50
) -> List[ConversationMessage]:
    """Get conversation history"""
    if self.redis_client:
        # Redis implementation
        key = f"conversation:{session_id}"
        messages_data = await self.redis_client.lrange(key, 0, limit - 1)
        return [ConversationMessage.from_dict(json.loads(msg)) for msg in messages_data]
    else:
        # Memory implementation
        messages = self._memory_conversations.get(session_id, [])
        return messages[-limit:] if limit > 0 else messages

Features:

  • Pagination Support: Supports limiting returned message count

  • Sequential Retrieval: Returns messages in chronological order

  • Multi-Backend Storage: Supports Redis and memory storage

  • Data Deserialization: Automatically handles data format conversion

3. Task Context Management

Context Storage and Retrieval

async def store_task_context(self, session_id: str, context: Any) -> bool:
    """Store task context"""
    if isinstance(context, TaskContext):
        await self._store_task_context(session_id, context)
        return True
    return False

async def get_task_context(self, session_id: str) -> Optional[TaskContext]:
    """Get task context"""
    if self.redis_client:
        # Redis implementation
        key = f"context:{session_id}"
        context_data = await self.redis_client.get(key)
        if context_data:
            return TaskContext.from_dict(json.loads(context_data))
    else:
        # Memory implementation
        return self._memory_contexts.get(session_id)
    return None

Features:

  • Type Safety: Supports TaskContext type checking

  • Serialization Support: Automatically handles context serialization and deserialization

  • Persistent Storage: Supports persistent context storage

  • Fast Retrieval: Provides efficient context retrieval

4. Checkpoint Management

LangGraph Checkpoint Integration

async def store_checkpoint(
    self,
    thread_id: str,
    checkpoint_id: str,
    checkpoint_data: Dict[str, Any],
    metadata: Dict[str, Any] = None
) -> bool:
    """Store checkpoint data"""
    checkpoint_key = f"checkpoint:{thread_id}:{checkpoint_id}"
    checkpoint_info = {
        "checkpoint_id": checkpoint_id,
        "thread_id": thread_id,
        "data": checkpoint_data,
        "metadata": metadata or {},
        "created_at": datetime.utcnow().isoformat()
    }
    
    if self.redis_client:
        await self.redis_client.setex(
            checkpoint_key, 
            self.checkpoint_ttl, 
            json.dumps(checkpoint_info)
        )
    else:
        self._memory_checkpoints[checkpoint_key] = checkpoint_info
    
    return True

Features:

  • LangGraph Compatible: Fully compatible with LangGraph checkpoint interface

  • Thread Isolation: Supports checkpoint isolation for multi-threaded workflows

  • Metadata Support: Supports checkpoint-level metadata

  • TTL Support: Supports automatic checkpoint expiration

5. Multi-Backend Storage Support

Storage Backend Automatic Switching

async def initialize(self) -> bool:
    """Initialize storage backend"""
    if not REDIS_AVAILABLE:
        logger.warning("Redis not available, using memory storage")
        return True
    
    try:
        if self.use_existing_redis and get_redis_client:
            # Use existing Redis client
            redis_client_instance = await get_redis_client()
            self.redis_client = await redis_client_instance.get_client()
            await self.redis_client.ping()
            return True
        else:
            # Direct Redis connection
            redis_url = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
            self.redis_client = redis.from_url(redis_url)
            await self.redis_client.ping()
            return True
    except Exception as e:
        logger.error(f"Failed to connect to Redis: {e}")
        logger.warning("Falling back to memory storage")
        self.redis_client = None
        return False

Features:

  • Automatic Fallback: Automatically switches to memory storage when Redis is unavailable

  • Connection Reuse: Supports using existing Redis connection pools

  • Flexible Configuration: Supports environment variable configuration for Redis connection

  • Error Handling: Comprehensive error handling and logging

6. Permanent Storage Dual-Write (Redis hot + cold archive)

Select a cold-storage backend with CONTEXT_PERMANENT_BACKEND:

Value

Backend

Notes

none

Disabled (default)

Redis / in-memory only

postgres

PostgreSQL

Reuses POSTGRES_URL or main DB settings; optional CONTEXT_PG_URL

clickhouse

ClickHouse

Legacy CLICKHOUSE_ENABLED=true also selects ClickHouse

Writes to the permanent backend are fire-and-forget; failures do not block the primary Redis path.

Environment Configuration

Variable

Description

Default

CONTEXT_PERMANENT_BACKEND

Cold archive backend: postgres, clickhouse, none

none

CONTEXT_PG_URL

PostgreSQL DSN for context archive (optional)

falls back to POSTGRES_URL / main DB

CONTEXT_PG_DATABASE

Override database name for context archive

main DB_NAME

CLICKHOUSE_ENABLED

Legacy: enable ClickHouse when CONTEXT_PERMANENT_BACKEND unset

false

CLICKHOUSE_HOST

ClickHouse server host

localhost

CLICKHOUSE_PORT

ClickHouse HTTP port

8123

CLICKHOUSE_USER

ClickHouse username

default

CLICKHOUSE_PASSWORD

ClickHouse password

(empty)

CLICKHOUSE_DATABASE

Database name

default

REDIS_HOST

Redis host

localhost

REDIS_PORT

Redis port

6379

REDIS_DB

Redis database index

1

REDIS_PASSWORD

Redis password

(empty)

PostgreSQL Tables (Auto-Created)

When CONTEXT_PERMANENT_BACKEND=postgres, tables are created automatically (or run aiecs/scripts/migrations/postgres/001_context_permanent_tables.sql):

  • context_sessions, context_conversations, context_task_contexts

  • context_checkpoints, context_checkpoint_writes, context_conversation_sessions

ClickHouse Tables (Auto-Created)

When CONTEXT_PERMANENT_BACKEND=clickhouse (or legacy CLICKHOUSE_ENABLED=true):

  • context_sessions - Session create/update/end events

  • context_conversations - Conversation messages

  • context_task_contexts - Task context snapshots

  • context_checkpoints - LangGraph checkpoints

  • context_checkpoint_writes - Checkpoint intermediate writes

  • context_conversation_sessions - Conversation session metadata

7. Initialization via Infrastructure

For application-wide singleton ContextEngine, use the infrastructure persistence module:

from aiecs.infrastructure.persistence import (
    initialize_redis_client,
    initialize_context_engine,
    close_context_engine,
    get_context_engine,
)

# During application startup (e.g., FastAPI lifespan)
await initialize_redis_client()
await initialize_context_engine()

# In any component
engine = get_context_engine()
if engine:
    await engine.add_conversation_message(session_id, "assistant", content)

# During shutdown
await close_context_engine()

Data Model Details

1. SessionMetrics - Session Metrics Model

@dataclass
class SessionMetrics:
    """Session-level performance metrics"""
    session_id: str
    user_id: str
    created_at: datetime
    last_activity: datetime
    request_count: int = 0
    error_count: int = 0
    total_processing_time: float = 0.0
    status: str = "active"  # active, completed, failed, expired
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary format"""
        return {
            **asdict(self),
            "created_at": self.created_at.isoformat(),
            "last_activity": self.last_activity.isoformat()
        }
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'SessionMetrics':
        """Create instance from dictionary"""
        data = data.copy()
        data["created_at"] = datetime.fromisoformat(data["created_at"])
        data["last_activity"] = datetime.fromisoformat(data["last_activity"])
        return cls(**data)

Field Descriptions:

  • session_id: Unique session identifier

  • user_id: User identifier

  • created_at: Session creation time

  • last_activity: Last activity time

  • request_count: Request count

  • error_count: Error count

  • total_processing_time: Total processing time

  • status: Session status

2. ConversationMessage - Conversation Message Model

@dataclass
class ConversationMessage:
    """Structured conversation message"""
    role: str  # user, assistant, system
    content: str
    timestamp: datetime
    metadata: Dict[str, Any] = None
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary format"""
        return {
            "role": self.role,
            "content": self.content,
            "timestamp": self.timestamp.isoformat(),
            "metadata": self.metadata or {}
        }
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'ConversationMessage':
        """Create instance from dictionary"""
        data = data.copy()
        data["timestamp"] = datetime.fromisoformat(data["timestamp"])
        return cls(**data)

Field Descriptions:

  • role: Message role (user, assistant, system)

  • content: Message content

  • timestamp: Message timestamp

  • metadata: Message metadata

Design Patterns Explained

1. Adapter Pattern

class ContextEngine(IStorageBackend, ICheckpointerBackend):
    """Adapter Pattern - Adapts storage interface and checkpoint interface"""
    
    async def create_session(self, session_id: str, user_id: str, metadata: Dict[str, Any] = None):
        # Implements IStorageBackend interface
        pass
    
    async def put_checkpoint(self, thread_id: str, checkpoint_id: str, checkpoint_data: Dict[str, Any]):
        # Implements ICheckpointerBackend interface
        pass

Advantages:

  • Unified Interface: Unifies implementations of different storage interfaces

  • Backward Compatibility: Maintains compatibility with existing interfaces

  • Function Integration: Integrates related functionality into a unified implementation

2. Strategy Pattern

async def _store_session(self, session: SessionMetrics):
    """Storage Strategy - Selects storage method based on available backend"""
    if self.redis_client:
        # Redis storage strategy
        key = f"session:{session.session_id}"
        await self.redis_client.setex(key, self.session_ttl, json.dumps(session.to_dict()))
    else:
        # Memory storage strategy
        self._memory_sessions[session.session_id] = session

Advantages:

  • Algorithm Encapsulation: Encapsulates storage algorithms in concrete implementations

  • Dynamic Switching: Can switch storage strategies at runtime

  • Easy Extension: Adding new storage strategies doesn’t require modifying existing code

3. Template Method Pattern

async def _store_conversation_message(self, session_id: str, message: ConversationMessage):
    """Template Method - Defines common flow for storing conversation messages"""
    # 1. Serialize message
    message_data = message.to_dict()
    
    # 2. Select storage backend
    if self.redis_client:
        # Redis storage implementation
        key = f"conversation:{session_id}"
        await self.redis_client.lpush(key, json.dumps(message_data))
        await self.redis_client.expire(key, self.session_ttl)
    else:
        # Memory storage implementation
        if session_id not in self._memory_conversations:
            self._memory_conversations[session_id] = []
        self._memory_conversations[session_id].append(message)
    
    # 3. Update metrics
    self._global_metrics["total_messages"] += 1

Advantages:

  • Unified Flow: Defines unified storage flow

  • Step Reuse: Common steps can be reused in subclasses

  • Easy Maintenance: Modifying flow only requires modifying template method

Usage Examples

1. Basic Session Management

from aiecs.domain.context import ContextEngine

# Initialize ContextEngine (creates own RedisClient per instance)
engine = ContextEngine(use_existing_redis=True)
await engine.initialize()

# Create session
session = await engine.create_session(
    session_id="user_123_session_001",
    user_id="user_123",
    metadata={"source": "web", "version": "1.0"}
)

# Update session metrics
await engine.update_session(
    session_id="user_123_session_001",
    increment_requests=True,
    add_processing_time=1.5,
    mark_error=False
)

# End session
await engine.end_session("user_123_session_001", status="completed")

2. Conversation History Management

# Add conversation messages
await engine.add_conversation_message(
    session_id="user_123_session_001",
    role="user",
    content="Hello, I need help with data analysis",
    metadata={"message_type": "query", "priority": "normal"}
)

await engine.add_conversation_message(
    session_id="user_123_session_001",
    role="assistant",
    content="I'd be happy to help you with data analysis. What specific data do you have?",
    metadata={"message_type": "response", "confidence": 0.95}
)

# Get conversation history
history = await engine.get_conversation_history(
    session_id="user_123_session_001",
    limit=10
)

for message in history:
    print(f"{message.role}: {message.content}")

3. Task Context Management

from aiecs.domain.task.task_context import TaskContext

# Create task context
context = TaskContext(
    {"user_id": "user_123", "chat_id": "session_001", "metadata": {"task_type": "analysis"}}
)

# Store context
await engine.store_task_context("user_123_session_001", context)

# Retrieve context
retrieved_context = await engine.get_task_context("user_123_session_001")
if retrieved_context:
    print(f"Task mode: {retrieved_context.mode}")
    print(f"Service: {retrieved_context.service}")

4. Checkpoint Management

# Store checkpoint
checkpoint_data = {
    "workflow_state": "data_processing",
    "current_step": "data_cleaning",
    "processed_records": 1000,
    "errors": []
}

await engine.store_checkpoint(
    thread_id="workflow_001",
    checkpoint_id="checkpoint_001",
    checkpoint_data=checkpoint_data,
    metadata={"workflow_version": "1.0", "created_by": "system"}
)

# Get checkpoint
checkpoint = await engine.get_checkpoint(
    thread_id="workflow_001",
    checkpoint_id="checkpoint_001"
)

if checkpoint:
    print(f"Workflow state: {checkpoint['data']['workflow_state']}")
    print(f"Current step: {checkpoint['data']['current_step']}")

5. Advanced Conversation Session Management

from aiecs.domain.context.conversation_models import ConversationParticipant

# Create conversation session (participants as dict list)
session_key = await engine.create_conversation_session(
    session_id="session_001",
    participants=[
        {"id": "user_123", "type": "user", "role": "user"},
        {"id": "agent_001", "type": "agent", "role": "assistant", "metadata": {}}
    ],
    session_type="user_to_agent",
    metadata={"project": "sales_analysis", "priority": "high"}
)

# Add agent communication message
await engine.add_agent_communication_message(
    session_key=session_key,
    sender_id="agent_001",
    sender_type="agent",
    sender_role="data_analyst",
    content="I've analyzed your sales data and found some interesting patterns.",
    message_type="analysis_result",
    metadata={"analysis_id": "analysis_001", "confidence": 0.92}
)

# Get agent conversation history
agent_history = await engine.get_agent_conversation_history(
    session_key=session_key,
    limit=20
)

Maintenance Guide

1. Daily Maintenance

Health Checks

async def check_engine_health(engine: ContextEngine):
    """Check content engine health status"""
    health_status = await engine.health_check()
    
    print(f"Engine Status: {health_status['status']}")
    print(f"Storage Backend: {health_status['storage_backend']}")
    print(f"Active Sessions: {health_status['active_sessions']}")
    print(f"Total Messages: {health_status['total_messages']}")
    
    if health_status['status'] != 'healthy':
        print(f"Health Issues: {health_status.get('issues', [])}")
    
    return health_status['status'] == 'healthy'

Performance Monitoring

async def monitor_engine_performance(engine: ContextEngine):
    """Monitor content engine performance"""
    metrics = await engine.get_metrics()
    
    print("=== Content Engine Metrics ===")
    print(f"Total Sessions: {metrics['total_sessions']}")
    print(f"Active Sessions: {metrics['active_sessions']}")
    print(f"Total Messages: {metrics['total_messages']}")
    print(f"Total Checkpoints: {metrics['total_checkpoints']}")
    
    # Check memory usage
    if 'memory_usage' in metrics:
        print(f"Memory Usage: {metrics['memory_usage']}")
    
    # Check storage performance
    if 'storage_performance' in metrics:
        perf = metrics['storage_performance']
        print(f"Average Response Time: {perf['avg_response_time']:.3f}s")
        print(f"Cache Hit Rate: {perf['cache_hit_rate']:.2%}")

2. Troubleshooting

Common Issue Diagnosis

Issue 1: Redis Connection Failure

async def diagnose_redis_connection(engine: ContextEngine):
    """Diagnose Redis connection issues"""
    try:
        # Check Redis client status
        if engine.redis_client:
            await engine.redis_client.ping()
            print("✅ Redis connection is healthy")
        else:
            print("⚠️ Redis client is None, using memory storage")
        
        # Check health status
        health = await engine.health_check()
        print(f"Health status: {health['status']}")
        
        if health['status'] != 'healthy':
            print(f"Health issues: {health.get('issues', [])}")
            
    except Exception as e:
        print(f"❌ Redis connection failed: {e}")
        
        # Check environment variables
        import os
        redis_url = os.getenv('REDIS_URL')
        print(f"REDIS_URL: {redis_url}")
        
        # Check network connection
        import socket
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            result = sock.connect_ex(('localhost', 6379))
            sock.close()
            print(f"Network connectivity: {'ok' if result == 0 else 'failed'}")
        except Exception as net_e:
            print(f"Network check failed: {net_e}")

Issue 2: Session Data Inconsistency

async def diagnose_session_consistency(engine: ContextEngine, session_id: str):
    """Diagnose session data consistency issues"""
    try:
        # Get session
        session = await engine.get_session(session_id)
        if not session:
            print(f"❌ Session {session_id} not found")
            return
        
        print(f"✅ Session found: {session.session_id}")
        print(f"User ID: {session.user_id}")
        print(f"Status: {session.status}")
        print(f"Request Count: {session.request_count}")
        print(f"Error Count: {session.error_count}")
        
        # Check conversation history
        history = await engine.get_conversation_history(session_id, limit=5)
        print(f"Recent messages: {len(history)}")
        
        # Check task context
        context = await engine.get_task_context(session_id)
        if context:
            print(f"Task context: {context.mode} - {context.service}")
        else:
            print("No task context found")
        
        # Check checkpoints
        checkpoints = await engine.list_checkpoints(session_id, limit=5)
        print(f"Checkpoints: {len(checkpoints)}")
        
    except Exception as e:
        print(f"❌ Session consistency check failed: {e}")

3. Performance Optimization

Memory Usage Optimization

async def optimize_memory_usage(engine: ContextEngine):
    """Optimize memory usage"""
    # Clean up expired sessions
    cleaned_count = await engine.cleanup_expired_sessions(max_idle_hours=24)
    print(f"Cleaned up {cleaned_count} expired sessions")
    
    # Check memory usage
    metrics = await engine.get_metrics()
    if 'memory_usage' in metrics:
        memory_usage = metrics['memory_usage']
        print(f"Current memory usage: {memory_usage}")
        
        # If memory usage is high, perform cleanup
        if memory_usage > 0.8:  # 80% threshold
            print("Memory usage high, performing cleanup...")
            # Perform additional cleanup operations
            await engine.cleanup_expired_sessions(max_idle_hours=12)

Storage Performance Optimization

async def optimize_storage_performance(engine: ContextEngine):
    """Optimize storage performance"""
    # Check storage performance metrics
    metrics = await engine.get_metrics()
    
    if 'storage_performance' in metrics:
        perf = metrics['storage_performance']
        
        # Check response time
        if perf['avg_response_time'] > 0.1:  # 100ms threshold
            print("Storage response time is high, checking connection...")
            health = await engine.health_check()
            print(f"Storage health: {health}")
        
        # Check cache hit rate
        if perf['cache_hit_rate'] < 0.8:  # 80% threshold
            print("Cache hit rate is low, consider increasing cache size")

4. Data Migration

Session Data Migration

async def migrate_session_data(source_engine: ContextEngine, target_engine: ContextEngine):
    """Migrate session data"""
    print("Starting session data migration...")
    
    # Get all sessions (need to implement list_sessions method)
    # Here we assume there's a method to get all sessions
    sessions = await source_engine.list_all_sessions()
    
    migrated_count = 0
    for session in sessions:
        try:
            # Migrate session
            await target_engine.create_session(
                session.session_id,
                session.user_id,
                session.metadata
            )
            
            # Migrate conversation history
            history = await source_engine.get_conversation_history(session.session_id)
            for message in history:
                await target_engine.add_conversation_message(
                    session.session_id,
                    message.role,
                    message.content,
                    message.metadata
                )
            
            # Migrate task context
            context = await source_engine.get_task_context(session.session_id)
            if context:
                await target_engine.store_task_context(session.session_id, context)
            
            migrated_count += 1
            print(f"Migrated session: {session.session_id}")
            
        except Exception as e:
            print(f"Failed to migrate session {session.session_id}: {e}")
    
    print(f"Migration completed: {migrated_count} sessions migrated")

Monitoring and Logging

Performance Monitoring

import time
from typing import Dict, Any

class ContentEngineMonitor:
    """Content Engine Monitor"""
    
    def __init__(self, engine: ContextEngine):
        self.engine = engine
        self.operation_metrics = {
            "session_operations": [],
            "conversation_operations": [],
            "checkpoint_operations": []
        }
    
    async def record_operation(self, operation_type: str, operation: str, 
                             latency: float, success: bool):
        """Record operation metrics"""
        metric = {
            "operation_type": operation_type,
            "operation": operation,
            "latency": latency,
            "success": success,
            "timestamp": time.time()
        }
        
        self.operation_metrics[f"{operation_type}_operations"].append(metric)
        
        # Keep last 1000 records
        if len(self.operation_metrics[f"{operation_type}_operations"]) > 1000:
            self.operation_metrics[f"{operation_type}_operations"] = \
                self.operation_metrics[f"{operation_type}_operations"][-1000:]
    
    def get_performance_report(self) -> Dict[str, Any]:
        """Get performance report"""
        report = {}
        
        for operation_type, metrics in self.operation_metrics.items():
            if not metrics:
                continue
            
            latencies = [m["latency"] for m in metrics]
            successes = [m["success"] for m in metrics]
            
            report[operation_type] = {
                "total_operations": len(metrics),
                "success_rate": sum(successes) / len(successes) if successes else 0,
                "avg_latency": sum(latencies) / len(latencies) if latencies else 0,
                "min_latency": min(latencies) if latencies else 0,
                "max_latency": max(latencies) if latencies else 0
            }
        
        return report

Logging

import logging
from typing import Dict, Any

class ContentEngineLogger:
    """Content Engine Logger"""
    
    def __init__(self, engine: ContextEngine):
        self.engine = engine
        self.logger = logging.getLogger(__name__)
    
    async def log_session_operation(self, operation: str, session_id: str, 
                                  success: bool, latency: float, error: str = None):
        """Log session operation"""
        log_data = {
            "operation": operation,
            "session_id": session_id,
            "success": success,
            "latency": latency,
            "timestamp": datetime.utcnow().isoformat()
        }
        
        if error:
            log_data["error"] = error
        
        if success:
            self.logger.info(f"Session operation completed: {log_data}")
        else:
            self.logger.error(f"Session operation failed: {log_data}")
    
    async def log_conversation_operation(self, operation: str, session_id: str,
                                       message_count: int, success: bool):
        """Log conversation operation"""
        log_data = {
            "operation": operation,
            "session_id": session_id,
            "message_count": message_count,
            "success": success,
            "timestamp": datetime.utcnow().isoformat()
        }
        
        if success:
            self.logger.info(f"Conversation operation completed: {log_data}")
        else:
            self.logger.error(f"Conversation operation failed: {log_data}")

Version History

  • v1.0.0: Initial version, basic session management functionality

  • v1.1.0: Added conversation history management

  • v1.2.0: Added task context management

  • v1.3.0: Added checkpoint management

  • v1.4.0: Added multi-backend storage support

  • v1.5.0: Added advanced conversation session management

  • v1.6.0: Added performance monitoring and logging

  • v1.7.0: Added ClickHouse dual-write for permanent storage; infrastructure persistence init

Integration Tests

ContextEngine ClickHouse integration tests are in test/integration/context/test_context_clickhouse.py

Prerequisites: Redis and ClickHouse running, configured in .env.test:

  • CLICKHOUSE_ENABLED=true

  • REDIS_*, CLICKHOUSE_* environment variables

Run: poetry run pytest test/integration/context/test_context_clickhouse.py -v