Graph Storage Performance Guide
Complete guide to optimizing graph storage performance in AIECS Knowledge Graph.
Quick Start
Enable All Optimizations
from aiecs.infrastructure.graph_storage import PostgresGraphStore
from aiecs.infrastructure.graph_storage.cache import GraphStoreCache, GraphStoreCacheConfig
from aiecs.infrastructure.graph_storage.batch_operations import BatchOperationsMixin
from aiecs.infrastructure.graph_storage.performance_monitoring import PerformanceMonitor
class OptimizedGraphStore(PostgresGraphStore, BatchOperationsMixin):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Enable caching
self.cache = GraphStoreCache(GraphStoreCacheConfig(
redis_url="redis://localhost:6379/0",
ttl=300
))
# Enable monitoring
self.monitor = PerformanceMonitor(
slow_query_threshold_ms=100.0
)
async def initialize(self):
await super().initialize()
await self.cache.initialize()
await self.monitor.initialize()
async def close(self):
await self.cache.close()
await super().close()
# Usage
store = OptimizedGraphStore()
await store.initialize()
# Benefit from all optimizations
await store.batch_add_entities(entities) # Batch operations
entity = await store.get_entity("id") # Cached
1. Caching
Redis Configuration
from aiecs.infrastructure.graph_storage.cache import GraphStoreCacheConfig
# Production configuration
config = GraphStoreCacheConfig(
enabled=True,
ttl=300, # 5 minutes
max_cache_size_mb=1000, # In-memory fallback size
redis_url="redis://localhost:6379/0",
key_prefix="graph:"
)
Usage Patterns
Basic Caching:
cache = GraphStoreCache(config)
await cache.initialize()
# Cache a query result
result = await cache.get_or_set(
"entity:person_1",
lambda: store.get_entity("person_1"),
ttl=300
)
Cache Invalidation:
# Invalidate entity cache
await cache.invalidate_entity("person_1")
# Invalidate relation cache
await cache.invalidate_relation("rel_1")
# Clear all cache
await cache.clear()
Decorator Pattern:
from aiecs.infrastructure.graph_storage.cache import cached_method
class MyStore(GraphStore):
@cached_method(lambda self, entity_id: f"entity:{entity_id}", ttl=300)
async def get_entity(self, entity_id: str):
# Your implementation
pass
Performance Impact
Cache hit: ~0.1ms (vs ~10-100ms database query)
Recommended for: Frequently accessed entities, hot paths
Not recommended for: Rapidly changing data, write-heavy workloads
2. Batch Operations
Bulk Insertion
Entities:
# Generate test data
entities = [
Entity(id=f"e{i}", entity_type="Person", properties={"name": f"Person {i}"})
for i in range(10000)
]
# Batch insert (100x faster than individual inserts)
count = await store.batch_add_entities(
entities,
batch_size=1000,
use_copy=True # Use PostgreSQL COPY for max performance
)
print(f"Inserted {count} entities")
Relations:
# Generate test relations
relations = [
Relation(id=f"r{i}", source_id=f"e{i}", target_id=f"e{i+1}",
relation_type="KNOWS", properties={})
for i in range(9999)
]
# Batch insert
count = await store.batch_add_relations(relations, batch_size=1000)
Bulk Deletion
# Delete multiple entities
entity_ids = [f"e{i}" for i in range(10000)]
count = await store.batch_delete_entities(entity_ids, batch_size=1000)
# Delete multiple relations
relation_ids = [f"r{i}" for i in range(10000)]
count = await store.batch_delete_relations(relation_ids, batch_size=1000)
Optimal Batch Size
from aiecs.infrastructure.graph_storage.batch_operations import estimate_batch_size
# For entities averaging 1KB each, target 10MB batches
batch_size = estimate_batch_size(avg_item_size_bytes=1024, target_batch_size_mb=10)
# Returns ~10,000
Performance Impact
COPY method: ~100x faster than individual inserts
Multi-row INSERT: ~10-20x faster
Example: 10,000 entities in 300ms vs 30s
3. Index Optimization
Analyze Indexes
from aiecs.infrastructure.graph_storage.index_optimization import IndexOptimizer
optimizer = IndexOptimizer(store.pool)
# Get all indexes
indexes = await optimizer.analyze_indexes()
for idx in indexes:
print(f"{idx.index_name}: {idx.usage_count} uses, {idx.size_mb}MB")
# Find unused indexes
unused = await optimizer.get_unused_indexes(min_usage_threshold=10)
print(f"Found {len(unused)} unused indexes")
Get Recommendations
# Get missing index recommendations
recommendations = await optimizer.get_missing_index_recommendations()
for rec in recommendations:
print(f"[{rec.estimated_benefit}] {rec.reason}")
print(f" SQL: {rec.create_sql}")
Apply Optimizations
# Dry run (show what would be done)
results = await optimizer.apply_recommendations(recommendations, dry_run=True)
# Apply recommendations
results = await optimizer.apply_recommendations(recommendations, dry_run=False)
print(f"Applied: {len(results['applied'])}")
print(f"Failed: {len(results['failed'])}")
Full Optimization Report
report = await optimizer.get_optimization_report()
print(f"Total indexes: {report['indexes']['total_count']}")
print(f"Unused indexes: {report['indexes']['unused_count']}")
print(f"Recommendations: {report['summary']['total_recommendations']}")
print(f" High priority: {report['summary']['high_priority']}")
print(f" Medium priority: {report['summary']['medium_priority']}")
Maintenance
# Run VACUUM ANALYZE for better query plans
await optimizer.vacuum_analyze() # All tables
await optimizer.vacuum_analyze('graph_entities') # Specific table
4. Performance Monitoring
Setup
from aiecs.infrastructure.graph_storage.performance_monitoring import PerformanceMonitor
monitor = PerformanceMonitor(
enabled=True,
slow_query_threshold_ms=100.0,
log_slow_queries=True
)
await monitor.initialize()
Track Queries
# Using context manager
async with monitor.track_query("get_entity", query):
result = await conn.fetch(query, entity_id)
# Manual recording
duration_ms = 123.45
await monitor.record_query("get_entity", query, duration_ms, row_count=1)
Analyze Query Plans
# Get query execution plan
plan = await monitor.analyze_query_plan(
conn,
"SELECT * FROM graph_entities WHERE entity_type = $1",
("Person",)
)
# Check for warnings
warnings = plan.get_warnings()
for warning in warnings:
print(f"⚠️ {warning}")
# Example warnings:
# - "Sequential scan detected - consider adding index"
# - "Inefficient nested loop - consider optimizing join"
# - "High query cost (12345) - consider optimization"
Performance Reports
# Get comprehensive report
report = monitor.get_performance_report()
print(f"Total queries: {report['total_queries']}")
print(f"Average time: {report['avg_query_time_ms']:.2f}ms")
print(f"Slow queries: {report['slow_query_count']}")
# Top slow queries
for query in report['top_slow_queries']:
print(f"{query['query_type']}: {query['avg_time_ms']:.2f}ms "
f"(p95: {query['p95_ms']:.2f}ms)")
# Get stats for specific query type
entity_stats = monitor.get_query_stats(query_type="get_entity")
Prepared Statement Caching
from aiecs.infrastructure.graph_storage.performance_monitoring import PreparedStatementCache
cache = PreparedStatementCache(max_size=100)
# Get or create prepared statement
stmt = await cache.get_or_prepare(
conn,
"get_entity",
"SELECT * FROM graph_entities WHERE id = $1"
)
# Use prepared statement (faster query planning)
result = await conn.fetch(stmt, entity_id)
5. Benchmarking
Run Benchmarks
# Run all benchmarks
pytest test/performance/test_graph_storage_benchmarks.py -v -m benchmark
# Run specific benchmark
pytest test/performance/test_graph_storage_benchmarks.py::TestGraphStorageBenchmarks::test_bulk_entity_insertion -v
# Compare backends
pytest test/performance/test_graph_storage_benchmarks.py::TestGraphStorageBenchmarks::test_neighbor_queries -v
Custom Benchmarks
import time
# Benchmark custom operation
async def benchmark_operation():
store = OptimizedGraphStore()
await store.initialize()
# Prepare test data
entities = generate_test_entities(10000)
# Benchmark
start = time.time()
await store.batch_add_entities(entities)
duration = time.time() - start
print(f"Inserted 10,000 entities in {duration:.2f}s")
print(f"Rate: {10000 / duration:.0f} entities/second")
await store.close()
Best Practices
1. Choose the Right Backend
InMemoryGraphStore:
✅ Development, testing
✅ Small graphs (< 100K nodes)
✅ Temporary data
❌ Production, persistence needed
SQLiteGraphStore:
✅ Small to medium graphs (< 1M nodes)
✅ Single-user applications
✅ Embedded systems
❌ High concurrency, large scale
PostgresGraphStore:
✅ Production deployments
✅ Large graphs (> 1M nodes)
✅ High concurrency
✅ Advanced features (JSONB, pgvector)
2. Optimize for Your Workload
Read-Heavy Workload:
Enable caching with high TTL
Optimize indexes for common queries
Use read replicas if available
Write-Heavy Workload:
Use batch operations
Reduce index overhead
Lower cache TTL
Consider async writes
Mixed Workload:
Moderate caching (5-10 min TTL)
Selective indexing
Monitor query patterns
Adjust based on metrics
3. Monitor and Tune
# Regular monitoring
async def monitor_performance():
report = monitor.get_performance_report()
# Alert on high average query time
if report['avg_query_time_ms'] > 50:
print(f"⚠️ High avg query time: {report['avg_query_time_ms']:.2f}ms")
# Alert on many slow queries
if report['slow_query_count'] > 100:
print(f"⚠️ {report['slow_query_count']} slow queries detected")
# Check index usage
optimizer_report = await optimizer.get_optimization_report()
if optimizer_report['indexes']['unused_count'] > 5:
print(f"⚠️ {optimizer_report['indexes']['unused_count']} unused indexes")
4. Optimize Indexes Regularly
# Weekly maintenance
async def weekly_maintenance():
optimizer = IndexOptimizer(pool)
# VACUUM ANALYZE for updated statistics
await optimizer.vacuum_analyze()
# Check for new recommendations
recommendations = await optimizer.get_missing_index_recommendations()
if recommendations:
print(f"Found {len(recommendations)} index recommendations")
# Review and apply manually
Performance Checklist
Initial Setup
Choose appropriate backend for use case
Enable connection pooling
Create optimal indexes
Enable performance monitoring
For Production
Enable Redis caching
Configure cache TTL appropriately
Use batch operations for bulk writes
Monitor slow queries
Regular VACUUM ANALYZE
Review index usage monthly
For Development
Use InMemoryGraphStore for tests
Enable performance monitoring
Profile critical operations
Test with realistic data volumes
Troubleshooting
Slow Queries
Enable monitoring:
monitor = PerformanceMonitor(slow_query_threshold_ms=50.0)
Analyze query plan:
plan = await monitor.analyze_query_plan(conn, slow_query) print(plan.get_warnings())
Check indexes:
optimizer = IndexOptimizer(pool) recommendations = await optimizer.get_missing_index_recommendations()
Cache Not Working
Check cache status:
print(cache._initialized) # Should be True print(cache.backend) # Should not be None
Test cache manually:
await cache.backend.set("test", "value", 60) value = await cache.backend.get("test") print(value) # Should print "value"
Check Redis connection:
redis-cli ping # Should return "PONG"
High Memory Usage
Check cache size:
# Reduce cache size config = GraphStoreCacheConfig(max_cache_size_mb=500)
Reduce TTL:
config = GraphStoreCacheConfig(ttl=60) # 1 minute
Clear cache:
await cache.clear()
Summary
Performance Gains
Optimization |
Speedup |
Use Case |
|---|---|---|
Redis Caching |
100-1000x |
Frequently accessed data |
Batch INSERT (COPY) |
100x |
Bulk data loading |
Batch INSERT (Multi-row) |
10-20x |
Medium bulk operations |
Composite Indexes |
10-50x |
Filtered queries |
GIN Indexes |
100-1000x |
JSONB property searches |
Prepared Statements |
2-5x |
Repeated queries |
Quick Wins
Enable batch operations for bulk writes (+100x)
Add composite indexes for common filters (+10-50x)
Enable Redis caching for hot data (+100-1000x)
Use prepared statements for repeated queries (+2-5x)
Next Steps
Profile your specific workload
Apply relevant optimizations
Monitor performance metrics
Iterate based on results
For more information, see:
docs/knowledge_graph/PHASE6_TASK6.3_COMPLETE.md- Complete implementation detailstest/performance/test_graph_storage_benchmarks.py- Benchmark examplesaiecs/infrastructure/graph_storage/- Implementation code