ContextEngine Integration Patterns
This guide covers comprehensive patterns for integrating ContextEngine with AIECS agents for persistent conversation history, session management, and context storage.
Table of Contents
Overview
ContextEngine provides persistent storage and management for:
Conversation History: Multi-turn conversations that persist across agent restarts
Session Management: Track user sessions with metrics and lifecycle
Context Storage: Store and retrieve arbitrary context data
Compression: Automatic conversation compression to manage token limits
Key Benefits
✅ Persistence: Conversations survive agent restarts
✅ Scalability: Redis backend for distributed systems
✅ Session Tracking: Monitor user sessions with metrics
✅ Context Management: Store and retrieve context data
✅ Compression: Automatic conversation compression
Basic Integration
Pattern 1: Simple Agent Integration
Basic integration with ContextEngine for persistent memory.
from aiecs.domain.agent import HybridAgent, AgentConfiguration
from aiecs.domain.context import ContextEngine
from aiecs.llm import OpenAIClient
# Initialize ContextEngine
context_engine = ContextEngine()
await context_engine.initialize()
# Create agent with ContextEngine
agent = HybridAgent(
agent_id="agent-1",
name="My Agent",
llm_client=OpenAIClient(),
tools=["search"],
config=AgentConfiguration(goal="Help users"),
context_engine=context_engine # Enable persistent memory
)
await agent.initialize()
# Conversation history persists automatically
result = await agent.execute_task(
{"description": "Hello"},
{"session_id": "user-123"}
)
# Agent restarts...
# Previous conversation still available!
Pattern 2: ContextEngine with Redis
Use Redis backend for distributed systems.
from aiecs.domain.context import ContextEngine
from aiecs.infrastructure.persistence.redis_client import get_redis_client
# Get Redis client
redis_client = await get_redis_client()
# Initialize ContextEngine with Redis
context_engine = ContextEngine(redis_client=redis_client)
await context_engine.initialize()
# Use with agent
agent = HybridAgent(
agent_id="agent-1",
llm_client=llm_client,
tools=["search"],
config=config,
context_engine=context_engine
)
Pattern 3: In-Memory Fallback
ContextEngine falls back to in-memory storage if Redis unavailable.
# ContextEngine automatically falls back to memory if Redis unavailable
context_engine = ContextEngine()
await context_engine.initialize()
# Works with or without Redis
agent = HybridAgent(
agent_id="agent-1",
llm_client=llm_client,
tools=["search"],
config=config,
context_engine=context_engine
)
Session Management Patterns
Pattern 1: Session Lifecycle
Complete session lifecycle management.
from aiecs.domain.context import ContextEngine
context_engine = ContextEngine()
await context_engine.initialize()
# Create session
session_metrics = await context_engine.create_session(
session_id="session-123",
user_id="user-456",
metadata={"source": "web", "device": "mobile"}
)
# Update session metrics
await context_engine.update_session(
session_id="session-123",
increment_requests=True,
add_processing_time=1.5,
mark_error=False
)
# Get session
session = await context_engine.get_session("session-123")
print(f"Requests: {session.request_count}")
print(f"Errors: {session.error_count}")
print(f"Avg time: {session.total_processing_time / session.request_count}s")
# End session
await context_engine.end_session("session-123", status="completed")
Pattern 2: Session Tracking with Agents
Track sessions automatically with agent execution.
agent = HybridAgent(
agent_id="agent-1",
llm_client=llm_client,
tools=["search"],
config=config,
context_engine=context_engine
)
# Agent automatically tracks sessions
result = await agent.execute_task(
{"description": "Hello"},
{"session_id": "user-123"} # Session tracked automatically
)
# Get session metrics
session = await context_engine.get_session("user-123")
print(f"Session requests: {session.request_count}")
Pattern 3: Session Cleanup
Clean up inactive sessions automatically.
# Clean up sessions inactive for more than 30 minutes
cleaned_count = await context_engine.cleanup_inactive_sessions(
max_idle_seconds=1800
)
print(f"Cleaned up {cleaned_count} inactive sessions")
Pattern 4: Session Metrics Aggregation
Aggregate metrics across multiple sessions.
# Get all active sessions
active_sessions = await context_engine.list_sessions(status="active")
# Aggregate metrics
total_requests = sum(s.request_count for s in active_sessions)
total_errors = sum(s.error_count for s in active_sessions)
total_time = sum(s.total_processing_time for s in active_sessions)
print(f"Total requests: {total_requests}")
print(f"Total errors: {total_errors}")
print(f"Average time: {total_time / total_requests}s")
Conversation History Patterns
Pattern 1: Persistent Conversation History
Conversations persist across agent restarts.
# First run: Create conversation
context_engine = ContextEngine()
await context_engine.initialize()
agent1 = HybridAgent(
agent_id="agent-1",
llm_client=llm_client,
tools=["search"],
config=config,
context_engine=context_engine
)
await agent1.initialize()
# Add conversation
result1 = await agent1.execute_task(
{"description": "What's the weather?"},
{"session_id": "user-123"}
)
result2 = await agent1.execute_task(
{"description": "What about tomorrow?"},
{"session_id": "user-123"}
)
# Agent restarts...
# Second run: Conversation persists!
agent2 = HybridAgent(
agent_id="agent-1", # Same agent ID
llm_client=llm_client,
tools=["search"],
config=config,
context_engine=context_engine
)
await agent2.initialize()
# Previous conversation still available!
result3 = await agent2.execute_task(
{"description": "What did I ask about?"},
{"session_id": "user-123"} # Same session ID
)
# Agent remembers previous conversation!
Pattern 2: Conversation History Retrieval
Retrieve and format conversation history.
# Get conversation history
history = await context_engine.get_conversation_history(
session_id="user-123",
limit=50 # Last 50 messages
)
# Format for LLM prompts
formatted = await context_engine.format_conversation_history(
session_id="user-123",
format="messages" # or "string", "dict"
)
# Use in agent
agent = HybridAgent(
agent_id="agent-1",
llm_client=llm_client,
tools=["search"],
config=config,
context_engine=context_engine
)
# Agent automatically uses conversation history
result = await agent.execute_task(
{"description": "Continue conversation"},
{"session_id": "user-123"}
)
Pattern 3: Conversation History with Compression
Use compression to manage long conversations.
from aiecs.domain.context import CompressionConfig
# Configure compression
compression_config = CompressionConfig(
strategy="summarize",
keep_recent=10,
auto_compress_enabled=True,
auto_compress_threshold=50
)
context_engine = ContextEngine(compression_config=compression_config)
await context_engine.initialize()
# Compression happens automatically when threshold exceeded
agent = HybridAgent(
agent_id="agent-1",
llm_client=llm_client,
tools=["search"],
config=config,
context_engine=context_engine
)
# Long conversation automatically compressed
for i in range(60):
result = await agent.execute_task(
{"description": f"Message {i}"},
{"session_id": "user-123"}
)
# Compression triggered at 50 messages
Context Storage Patterns
Pattern 1: Store and Retrieve Context
Store arbitrary context data.
# Store context
await context_engine.set_context(
session_id="user-123",
key="user_preferences",
value={"theme": "dark", "language": "en"}
)
# Retrieve context
preferences = await context_engine.get_context(
session_id="user-123",
key="user_preferences"
)
# Use in agent
agent = HybridAgent(
agent_id="agent-1",
llm_client=llm_client,
tools=["search"],
config=config,
context_engine=context_engine
)
# Agent can access context
result = await agent.execute_task(
{"description": "What are my preferences?"},
{"session_id": "user-123"}
)
Pattern 2: Context Updates
Update context incrementally.
# Set initial context
await context_engine.set_context(
session_id="user-123",
key="shopping_cart",
value={"items": [], "total": 0}
)
# Update context
cart = await context_engine.get_context(
session_id="user-123",
key="shopping_cart"
)
cart["items"].append("product-1")
cart["total"] += 10.99
await context_engine.set_context(
session_id="user-123",
key="shopping_cart",
value=cart
)
Pattern 3: Context Listing
List all context keys for a session.
# List all context keys
keys = await context_engine.list_contexts(
session_id="user-123",
limit=100
)
# Retrieve multiple contexts
contexts = {}
for key in keys:
contexts[key] = await context_engine.get_context(
session_id="user-123",
key=key
)
Multi-Agent Patterns
Pattern 2: Agent-Specific Context
Each agent has its own ContextEngine instance.
# Separate ContextEngine for each agent
context_engine1 = ContextEngine()
await context_engine1.initialize()
context_engine2 = ContextEngine()
await context_engine2.initialize()
agent1 = HybridAgent(
agent_id="agent-1",
llm_client=llm_client1,
tools=["search"],
config=config1,
context_engine=context_engine1 # Separate
)
agent2 = HybridAgent(
agent_id="agent-2",
llm_client=llm_client2,
tools=["calculator"],
config=config2,
context_engine=context_engine2 # Separate
)
# Each agent has isolated conversation history
Production Patterns
Pattern 1: Redis Configuration
Configure Redis for production.
from aiecs.infrastructure.persistence.redis_client import get_redis_client
# Get Redis client with configuration
redis_client = await get_redis_client(
host="redis.example.com",
port=6379,
db=0,
password="your-password",
ssl=True
)
context_engine = ContextEngine(redis_client=redis_client)
await context_engine.initialize()
# Use with agent
agent = HybridAgent(
agent_id="agent-1",
llm_client=llm_client,
tools=["search"],
config=config,
context_engine=context_engine
)
Pattern 2: Compression Configuration
Configure compression for production.
from aiecs.domain.context import CompressionConfig
# Production compression config
compression_config = CompressionConfig(
strategy="hybrid",
hybrid_strategies=["truncate", "summarize"],
keep_recent=20,
auto_compress_enabled=True,
auto_compress_threshold=100,
auto_compress_target=50,
summary_max_tokens=500,
compression_timeout=30
)
context_engine = ContextEngine(compression_config=compression_config)
await context_engine.initialize()
Pattern 3: Error Handling
Handle ContextEngine errors gracefully.
try:
context_engine = ContextEngine()
await context_engine.initialize()
except Exception as e:
logger.error(f"Failed to initialize ContextEngine: {e}")
# Fall back to in-memory mode
context_engine = None
agent = HybridAgent(
agent_id="agent-1",
llm_client=llm_client,
tools=["search"],
config=config,
context_engine=context_engine # None = in-memory fallback
)
# Agent works with or without ContextEngine
Pattern 4: Monitoring
Monitor ContextEngine health and metrics.
# Get global metrics
metrics = context_engine.get_global_metrics()
print(f"Total sessions: {metrics['total_sessions']}")
print(f"Active sessions: {metrics['active_sessions']}")
print(f"Total messages: {metrics['total_messages']}")
print(f"Total context operations: {metrics['total_context_operations']}")
# Monitor session health
active_sessions = await context_engine.list_sessions(status="active")
for session in active_sessions:
if session.error_count > 10:
logger.warning(f"Session {session.session_id} has high error count")
Best Practices
1. Always Initialize ContextEngine
Always call initialize() before use:
context_engine = ContextEngine()
await context_engine.initialize() # Don't forget this!
2. Use Consistent Session IDs
Use consistent session IDs across requests:
# Good: Consistent session ID
session_id = f"user-{user_id}"
# Bad: Random session IDs
session_id = str(uuid.uuid4()) # Don't do this!
3. Clean Up Inactive Sessions
Regularly clean up inactive sessions:
# Clean up sessions inactive for 30 minutes
await context_engine.cleanup_inactive_sessions(max_idle_seconds=1800)
4. Configure Compression
Configure compression for long conversations:
compression_config = CompressionConfig(
auto_compress_enabled=True,
auto_compress_threshold=50
)
5. Handle Errors Gracefully
Always handle ContextEngine errors:
try:
await context_engine.add_conversation_message(...)
except Exception as e:
logger.error(f"Failed to add message: {e}")
# Fall back to in-memory or retry
6. Monitor Performance
Monitor ContextEngine performance:
# Track operation times
start = time.time()
await context_engine.get_conversation_history(...)
duration = time.time() - start
if duration > 1.0:
logger.warning(f"Slow ContextEngine operation: {duration}s")
7. Use Redis for Production
Use Redis backend for production deployments:
# Production: Use Redis
redis_client = await get_redis_client()
context_engine = ContextEngine(redis_client=redis_client)
# Development: Can use in-memory
context_engine = ContextEngine() # Falls back to memory
Summary
ContextEngine integration provides:
✅ Persistent conversation history
✅ Session management with metrics
✅ Context storage and retrieval
✅ Automatic compression
✅ Redis backend for scalability
✅ Graceful fallback to in-memory
For more details, see: