ContextEngine Integration Patterns

This guide covers comprehensive patterns for integrating ContextEngine with AIECS agents for persistent conversation history, session management, and context storage.

Table of Contents

  1. Overview

  2. Basic Integration

  3. Session Management Patterns

  4. Conversation History Patterns

  5. Context Storage Patterns

  6. Multi-Agent Patterns

  7. Production Patterns

  8. Best Practices

Overview

ContextEngine provides persistent storage and management for:

  • Conversation History: Multi-turn conversations that persist across agent restarts

  • Session Management: Track user sessions with metrics and lifecycle

  • Context Storage: Store and retrieve arbitrary context data

  • Compression: Automatic conversation compression to manage token limits

Key Benefits

  • Persistence: Conversations survive agent restarts

  • Scalability: Redis backend for distributed systems

  • Session Tracking: Monitor user sessions with metrics

  • Context Management: Store and retrieve context data

  • Compression: Automatic conversation compression

Basic Integration

Pattern 1: Simple Agent Integration

Basic integration with ContextEngine for persistent memory.

from aiecs.domain.agent import HybridAgent, AgentConfiguration
from aiecs.domain.context import ContextEngine
from aiecs.llm import OpenAIClient

# Initialize ContextEngine
context_engine = ContextEngine()
await context_engine.initialize()

# Create agent with ContextEngine
agent = HybridAgent(
    agent_id="agent-1",
    name="My Agent",
    llm_client=OpenAIClient(),
    tools=["search"],
    config=AgentConfiguration(goal="Help users"),
    context_engine=context_engine  # Enable persistent memory
)

await agent.initialize()

# Conversation history persists automatically
result = await agent.execute_task(
    {"description": "Hello"},
    {"session_id": "user-123"}
)

# Agent restarts...
# Previous conversation still available!

Pattern 2: ContextEngine with Redis

Use Redis backend for distributed systems.

from aiecs.domain.context import ContextEngine
from aiecs.infrastructure.persistence.redis_client import get_redis_client

# Get Redis client
redis_client = await get_redis_client()

# Initialize ContextEngine with Redis
context_engine = ContextEngine(redis_client=redis_client)
await context_engine.initialize()

# Use with agent
agent = HybridAgent(
    agent_id="agent-1",
    llm_client=llm_client,
    tools=["search"],
    config=config,
    context_engine=context_engine
)

Pattern 3: In-Memory Fallback

ContextEngine falls back to in-memory storage if Redis unavailable.

# ContextEngine automatically falls back to memory if Redis unavailable
context_engine = ContextEngine()
await context_engine.initialize()

# Works with or without Redis
agent = HybridAgent(
    agent_id="agent-1",
    llm_client=llm_client,
    tools=["search"],
    config=config,
    context_engine=context_engine
)

Session Management Patterns

Pattern 1: Session Lifecycle

Complete session lifecycle management.

from aiecs.domain.context import ContextEngine

context_engine = ContextEngine()
await context_engine.initialize()

# Create session
session_metrics = await context_engine.create_session(
    session_id="session-123",
    user_id="user-456",
    metadata={"source": "web", "device": "mobile"}
)

# Update session metrics
await context_engine.update_session(
    session_id="session-123",
    increment_requests=True,
    add_processing_time=1.5,
    mark_error=False
)

# Get session
session = await context_engine.get_session("session-123")
print(f"Requests: {session.request_count}")
print(f"Errors: {session.error_count}")
print(f"Avg time: {session.total_processing_time / session.request_count}s")

# End session
await context_engine.end_session("session-123", status="completed")

Pattern 2: Session Tracking with Agents

Track sessions automatically with agent execution.

agent = HybridAgent(
    agent_id="agent-1",
    llm_client=llm_client,
    tools=["search"],
    config=config,
    context_engine=context_engine
)

# Agent automatically tracks sessions
result = await agent.execute_task(
    {"description": "Hello"},
    {"session_id": "user-123"}  # Session tracked automatically
)

# Get session metrics
session = await context_engine.get_session("user-123")
print(f"Session requests: {session.request_count}")

Pattern 3: Session Cleanup

Clean up inactive sessions automatically.

# Clean up sessions inactive for more than 30 minutes
cleaned_count = await context_engine.cleanup_inactive_sessions(
    max_idle_seconds=1800
)

print(f"Cleaned up {cleaned_count} inactive sessions")

Pattern 4: Session Metrics Aggregation

Aggregate metrics across multiple sessions.

# Get all active sessions
active_sessions = await context_engine.list_sessions(status="active")

# Aggregate metrics
total_requests = sum(s.request_count for s in active_sessions)
total_errors = sum(s.error_count for s in active_sessions)
total_time = sum(s.total_processing_time for s in active_sessions)

print(f"Total requests: {total_requests}")
print(f"Total errors: {total_errors}")
print(f"Average time: {total_time / total_requests}s")

Conversation History Patterns

Pattern 1: Persistent Conversation History

Conversations persist across agent restarts.

# First run: Create conversation
context_engine = ContextEngine()
await context_engine.initialize()

agent1 = HybridAgent(
    agent_id="agent-1",
    llm_client=llm_client,
    tools=["search"],
    config=config,
    context_engine=context_engine
)

await agent1.initialize()

# Add conversation
result1 = await agent1.execute_task(
    {"description": "What's the weather?"},
    {"session_id": "user-123"}
)

result2 = await agent1.execute_task(
    {"description": "What about tomorrow?"},
    {"session_id": "user-123"}
)

# Agent restarts...

# Second run: Conversation persists!
agent2 = HybridAgent(
    agent_id="agent-1",  # Same agent ID
    llm_client=llm_client,
    tools=["search"],
    config=config,
    context_engine=context_engine
)

await agent2.initialize()

# Previous conversation still available!
result3 = await agent2.execute_task(
    {"description": "What did I ask about?"},
    {"session_id": "user-123"}  # Same session ID
)
# Agent remembers previous conversation!

Pattern 2: Conversation History Retrieval

Retrieve and format conversation history.

# Get conversation history
history = await context_engine.get_conversation_history(
    session_id="user-123",
    limit=50  # Last 50 messages
)

# Format for LLM prompts
formatted = await context_engine.format_conversation_history(
    session_id="user-123",
    format="messages"  # or "string", "dict"
)

# Use in agent
agent = HybridAgent(
    agent_id="agent-1",
    llm_client=llm_client,
    tools=["search"],
    config=config,
    context_engine=context_engine
)

# Agent automatically uses conversation history
result = await agent.execute_task(
    {"description": "Continue conversation"},
    {"session_id": "user-123"}
)

Pattern 3: Conversation History with Compression

Use compression to manage long conversations.

from aiecs.domain.context import CompressionConfig

# Configure compression
compression_config = CompressionConfig(
    strategy="summarize",
    keep_recent=10,
    auto_compress_enabled=True,
    auto_compress_threshold=50
)

context_engine = ContextEngine(compression_config=compression_config)
await context_engine.initialize()

# Compression happens automatically when threshold exceeded
agent = HybridAgent(
    agent_id="agent-1",
    llm_client=llm_client,
    tools=["search"],
    config=config,
    context_engine=context_engine
)

# Long conversation automatically compressed
for i in range(60):
    result = await agent.execute_task(
        {"description": f"Message {i}"},
        {"session_id": "user-123"}
    )
    # Compression triggered at 50 messages

Context Storage Patterns

Pattern 1: Store and Retrieve Context

Store arbitrary context data.

# Store context
await context_engine.set_context(
    session_id="user-123",
    key="user_preferences",
    value={"theme": "dark", "language": "en"}
)

# Retrieve context
preferences = await context_engine.get_context(
    session_id="user-123",
    key="user_preferences"
)

# Use in agent
agent = HybridAgent(
    agent_id="agent-1",
    llm_client=llm_client,
    tools=["search"],
    config=config,
    context_engine=context_engine
)

# Agent can access context
result = await agent.execute_task(
    {"description": "What are my preferences?"},
    {"session_id": "user-123"}
)

Pattern 2: Context Updates

Update context incrementally.

# Set initial context
await context_engine.set_context(
    session_id="user-123",
    key="shopping_cart",
    value={"items": [], "total": 0}
)

# Update context
cart = await context_engine.get_context(
    session_id="user-123",
    key="shopping_cart"
)
cart["items"].append("product-1")
cart["total"] += 10.99

await context_engine.set_context(
    session_id="user-123",
    key="shopping_cart",
    value=cart
)

Pattern 3: Context Listing

List all context keys for a session.

# List all context keys
keys = await context_engine.list_contexts(
    session_id="user-123",
    limit=100
)

# Retrieve multiple contexts
contexts = {}
for key in keys:
    contexts[key] = await context_engine.get_context(
        session_id="user-123",
        key=key
    )

Multi-Agent Patterns

Pattern 1: Shared ContextEngine

Multiple agents share the same ContextEngine.

context_engine = ContextEngine()
await context_engine.initialize()

# Create multiple agents sharing ContextEngine
agent1 = HybridAgent(
    agent_id="agent-1",
    llm_client=llm_client1,
    tools=["search"],
    config=config1,
    context_engine=context_engine  # Shared
)

agent2 = HybridAgent(
    agent_id="agent-2",
    llm_client=llm_client2,
    tools=["calculator"],
    config=config2,
    context_engine=context_engine  # Shared
)

# Both agents share conversation history
result1 = await agent1.execute_task(
    {"description": "Hello"},
    {"session_id": "user-123"}
)

result2 = await agent2.execute_task(
    {"description": "What did I say?"},
    {"session_id": "user-123"}  # Same session
)
# Agent2 can see conversation from agent1!

Pattern 2: Agent-Specific Context

Each agent has its own ContextEngine instance.

# Separate ContextEngine for each agent
context_engine1 = ContextEngine()
await context_engine1.initialize()

context_engine2 = ContextEngine()
await context_engine2.initialize()

agent1 = HybridAgent(
    agent_id="agent-1",
    llm_client=llm_client1,
    tools=["search"],
    config=config1,
    context_engine=context_engine1  # Separate
)

agent2 = HybridAgent(
    agent_id="agent-2",
    llm_client=llm_client2,
    tools=["calculator"],
    config=config2,
    context_engine=context_engine2  # Separate
)

# Each agent has isolated conversation history

Production Patterns

Pattern 1: Redis Configuration

Configure Redis for production.

from aiecs.infrastructure.persistence.redis_client import get_redis_client

# Get Redis client with configuration
redis_client = await get_redis_client(
    host="redis.example.com",
    port=6379,
    db=0,
    password="your-password",
    ssl=True
)

context_engine = ContextEngine(redis_client=redis_client)
await context_engine.initialize()

# Use with agent
agent = HybridAgent(
    agent_id="agent-1",
    llm_client=llm_client,
    tools=["search"],
    config=config,
    context_engine=context_engine
)

Pattern 2: Compression Configuration

Configure compression for production.

from aiecs.domain.context import CompressionConfig

# Production compression config
compression_config = CompressionConfig(
    strategy="hybrid",
    hybrid_strategies=["truncate", "summarize"],
    keep_recent=20,
    auto_compress_enabled=True,
    auto_compress_threshold=100,
    auto_compress_target=50,
    summary_max_tokens=500,
    compression_timeout=30
)

context_engine = ContextEngine(compression_config=compression_config)
await context_engine.initialize()

Pattern 3: Error Handling

Handle ContextEngine errors gracefully.

try:
    context_engine = ContextEngine()
    await context_engine.initialize()
except Exception as e:
    logger.error(f"Failed to initialize ContextEngine: {e}")
    # Fall back to in-memory mode
    context_engine = None

agent = HybridAgent(
    agent_id="agent-1",
    llm_client=llm_client,
    tools=["search"],
    config=config,
    context_engine=context_engine  # None = in-memory fallback
)

# Agent works with or without ContextEngine

Pattern 4: Monitoring

Monitor ContextEngine health and metrics.

# Get global metrics
metrics = context_engine.get_global_metrics()

print(f"Total sessions: {metrics['total_sessions']}")
print(f"Active sessions: {metrics['active_sessions']}")
print(f"Total messages: {metrics['total_messages']}")
print(f"Total context operations: {metrics['total_context_operations']}")

# Monitor session health
active_sessions = await context_engine.list_sessions(status="active")
for session in active_sessions:
    if session.error_count > 10:
        logger.warning(f"Session {session.session_id} has high error count")

Best Practices

1. Always Initialize ContextEngine

Always call initialize() before use:

context_engine = ContextEngine()
await context_engine.initialize()  # Don't forget this!

2. Use Consistent Session IDs

Use consistent session IDs across requests:

# Good: Consistent session ID
session_id = f"user-{user_id}"

# Bad: Random session IDs
session_id = str(uuid.uuid4())  # Don't do this!

3. Clean Up Inactive Sessions

Regularly clean up inactive sessions:

# Clean up sessions inactive for 30 minutes
await context_engine.cleanup_inactive_sessions(max_idle_seconds=1800)

4. Configure Compression

Configure compression for long conversations:

compression_config = CompressionConfig(
    auto_compress_enabled=True,
    auto_compress_threshold=50
)

5. Handle Errors Gracefully

Always handle ContextEngine errors:

try:
    await context_engine.add_conversation_message(...)
except Exception as e:
    logger.error(f"Failed to add message: {e}")
    # Fall back to in-memory or retry

6. Monitor Performance

Monitor ContextEngine performance:

# Track operation times
start = time.time()
await context_engine.get_conversation_history(...)
duration = time.time() - start

if duration > 1.0:
    logger.warning(f"Slow ContextEngine operation: {duration}s")

7. Use Redis for Production

Use Redis backend for production deployments:

# Production: Use Redis
redis_client = await get_redis_client()
context_engine = ContextEngine(redis_client=redis_client)

# Development: Can use in-memory
context_engine = ContextEngine()  # Falls back to memory

Summary

ContextEngine integration provides:

  • ✅ Persistent conversation history

  • ✅ Session management with metrics

  • ✅ Context storage and retrieval

  • ✅ Automatic compression

  • ✅ Redis backend for scalability

  • ✅ Graceful fallback to in-memory

For more details, see: