DatabaseManager Technical Documentation
1. Overview
Purpose
DatabaseManager is a component specifically designed for database connection management, operation execution, and task history management, built on the asyncpg asynchronous PostgreSQL driver. It provides core functionalities including connection pool management, task history persistence, status tracking, etc., serving as key infrastructure for the data persistence layer in the AIECS system.
Core Value
Asynchronous Database Operations: High-performance asynchronous database access based on asyncpg
Connection Pool Management: Automatically manages database connection pools, improving concurrent performance
Task History Persistence: Complete recording of task execution history and status changes
Data Consistency: Ensures data consistency of task status and results
Performance Optimization: Improves database performance through indexing and query optimization
2. Problem Background & Design Motivation
Problem Background
In the AIECS system, a large amount of task execution data needs to be handled, including:
Task Status Tracking: Need to persistently store task execution status and results
History Record Management: Need to save complete task execution history for query and analysis
Concurrent Access: Multiple services accessing the database simultaneously require efficient connection management
Data Consistency: Ensure atomicity and consistency of task status changes
Performance Requirements: Large numbers of concurrent tasks require high-performance database operations
Design Motivation
Data Persistence: Persistently store task execution data, supporting data recovery after system restart
State Management: Provide reliable task status tracking and query mechanisms
Performance Optimization: Improve database access performance through connection pools and asynchronous operations
Data Integrity: Ensure completeness and consistency of task execution history
Operational Support: Provide data cleanup and monitoring functionality to support system operations
3. Architecture Positioning & Context
System Architecture Location
┌─────────────────────────────────────────────────────────────┐
│ AIECS System Architecture │
├─────────────────────────────────────────────────────────────┤
│ Domain Layer │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ TaskService │ │ DSLProcessor │ │
│ └─────────────────┘ └─────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Infrastructure Layer │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ DatabaseManager │ │ CeleryTaskManager│ │
│ └─────────────────┘ └─────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Data Layer │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ PostgreSQL │ │ Connection Pool │ │
│ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Upstream Callers
TaskService: Task management service that needs to save and query task status
DSLProcessor: DSL processor that needs to record task execution steps
CeleryTaskManager: Task executor that needs to update task status
WebSocketManager: WebSocket manager that needs to query task history
Downstream Dependencies
PostgreSQL: Main database system
asyncpg: Asynchronous PostgreSQL driver
TaskStatus/TaskStepResult: Task status and result models
Configuration System: Database connection configuration
4. Core Features & Use Cases
4.1 Database Connection Management
Connection Pool Initialization
# Create database manager
db_manager = DatabaseManager()
# Initialize connection pool
await db_manager.init_connection_pool(
min_size=10, # Minimum connections
max_size=20 # Maximum connections
)
# Initialize database schema
await db_manager.init_database_schema()
Automatic Connection Management
# Database manager automatically handles connection acquisition and release
async def process_task_with_db(task_data: Dict[str, Any]):
"""Process task and automatically manage database connections"""
# Save task history
step_result = TaskStepResult(
step="data_processing",
result={"processed_rows": 1000},
completed=True,
message="Data processing completed",
status="completed"
)
await db_manager.save_task_history(
user_id="user_123",
task_id="task_456",
step=1,
step_result=step_result
)
4.2 Task History Management
Save Task Execution History
# Save task step result
async def save_task_step(user_id: str, task_id: str, step: int, result: Any):
"""Save task step execution result"""
step_result = TaskStepResult(
step=f"step_{step}",
result=result,
completed=True,
message=f"Step {step} execution completed",
status=TaskStatus.COMPLETED
)
await db_manager.save_task_history(
user_id=user_id,
task_id=task_id,
step=step,
step_result=step_result
)
# Save error information
async def save_task_error(user_id: str, task_id: str, step: int, error: Exception):
"""Save task execution error"""
step_result = TaskStepResult(
step=f"step_{step}",
result=None,
completed=False,
message=f"Step {step} execution failed: {str(error)}",
status=TaskStatus.FAILED,
error_code="E003",
error_message=str(error)
)
await db_manager.save_task_history(
user_id=user_id,
task_id=task_id,
step=step,
step_result=step_result
)
Query Task History
# Query complete task history
async def get_task_execution_history(user_id: str, task_id: str):
"""Get task execution history"""
history = await db_manager.load_task_history(user_id, task_id)
print(f"Task {task_id} execution history:")
for step in history:
print(f" Step {step['step']}: {step['status']} - {step['result']}")
return history
# Query all user tasks
async def get_user_task_list(user_id: str):
"""Get user task list"""
tasks = await db_manager.get_user_tasks(user_id, limit=50)
print(f"User {user_id} task list:")
for task in tasks:
print(f" Task {task['task_id']}: {task['status']} (Last updated: {task['last_updated']})")
return tasks
4.3 Task Status Management
Task Status Tracking
# Check task status
async def check_task_status(user_id: str, task_id: str):
"""Check current task status"""
status = await db_manager.check_task_status(user_id, task_id)
if status == TaskStatus.PENDING:
print("Task waiting for execution")
elif status == TaskStatus.RUNNING:
print("Task running")
elif status == TaskStatus.COMPLETED:
print("Task execution completed")
elif status == TaskStatus.FAILED:
print("Task execution failed")
elif status == TaskStatus.CANCELLED:
print("Task cancelled")
return status
# Cancel task
async def cancel_user_task(user_id: str, task_id: str):
"""Cancel user task"""
success = await db_manager.mark_task_as_cancelled(user_id, task_id)
if success:
print(f"Task {task_id} marked as cancelled")
else:
print(f"Failed to cancel task {task_id}")
return success
4.4 Data Maintenance and Cleanup
Periodic Data Cleanup
# Clean old task data
async def cleanup_old_data():
"""Clean task data older than 30 days"""
success = await db_manager.cleanup_old_tasks(days_old=30)
if success:
print("Old task data cleanup completed")
else:
print("Data cleanup failed")
return success
# Scheduled maintenance task
import asyncio
from datetime import datetime, timedelta
async def scheduled_cleanup():
"""Periodically execute data cleanup"""
while True:
try:
# Execute cleanup at 2 AM daily
now = datetime.now()
if now.hour == 2 and now.minute == 0:
await cleanup_old_data()
# Wait 1 minute
await asyncio.sleep(60)
except Exception as e:
logger.error(f"Scheduled cleanup task error: {e}")
await asyncio.sleep(300) # Wait 5 minutes after error
4.5 Batch Operations
Batch Save Task History
# Batch save multiple task steps
async def save_batch_task_steps(user_id: str, task_id: str, steps: List[Dict[str, Any]]):
"""Batch save task steps"""
for i, step_data in enumerate(steps):
step_result = TaskStepResult(
step=f"batch_step_{i}",
result=step_data.get("result"),
completed=step_data.get("completed", True),
message=step_data.get("message", f"Batch step {i}"),
status=step_data.get("status", TaskStatus.COMPLETED)
)
await db_manager.save_task_history(
user_id=user_id,
task_id=task_id,
step=i,
step_result=step_result
)
5. API Reference
5.1 Class Definition
DatabaseManager
class DatabaseManager:
"""Database connection and operation manager"""
def __init__(self, db_config: Optional[Dict[str, Any]] = None) -> None
"""Initialize database manager
Args:
db_config: Database configuration dictionary, if None then get from configuration system
"""
5.2 Public Methods
init_connection_pool
async def init_connection_pool(self, min_size: int = 10, max_size: int = 20) -> None
Function: Initialize database connection pool
Parameters:
min_size(int): Minimum connections, default 10max_size(int): Maximum connections, default 20
Exceptions:
Exception: Connection pool initialization failed
init_database_schema
async def init_database_schema(self) -> bool
Function: Initialize database table structure
Returns:
bool: Whether initialization succeeded
save_task_history
async def save_task_history(self, user_id: str, task_id: str, step: int, step_result: TaskStepResult) -> bool
Function: Save task execution history
Parameters:
user_id(str): User IDtask_id(str): Task IDstep(int): Step numberstep_result(TaskStepResult): Step result
Returns:
bool: Whether save succeeded
Exceptions:
Exception: Database operation failed
load_task_history
async def load_task_history(self, user_id: str, task_id: str) -> List[Dict]
Function: Load task execution history
Parameters:
user_id(str): User IDtask_id(str): Task ID
Returns:
List[Dict]: Task history record list
Exceptions:
Exception: Database operation failed
mark_task_as_cancelled
async def mark_task_as_cancelled(self, user_id: str, task_id: str) -> bool
Function: Mark task as cancelled
Parameters:
user_id(str): User IDtask_id(str): Task ID
Returns:
bool: Whether operation succeeded
Exceptions:
Exception: Database operation failed
check_task_status
async def check_task_status(self, user_id: str, task_id: str) -> TaskStatus
Function: Check task status
Parameters:
user_id(str): User IDtask_id(str): Task ID
Returns:
TaskStatus: Task status
Exceptions:
Exception: Database operation failed
get_user_tasks
async def get_user_tasks(self, user_id: str, limit: int = 100) -> List[Dict]
Function: Get user task list
Parameters:
user_id(str): User IDlimit(int): Return record limit, default 100
Returns:
List[Dict]: User task list
Exceptions:
Exception: Database operation failed
cleanup_old_tasks
async def cleanup_old_tasks(self, days_old: int = 30) -> bool
Function: Clean old task records
Parameters:
days_old(int): Clean records older than how many days, default 30
Returns:
bool: Whether cleanup succeeded
close
async def close(self) -> None
Function: Close database connection pool
6. Technical Implementation Details
6.1 Connection Pool Management
Connection Pool Configuration
async def init_connection_pool(self, min_size: int = 10, max_size: int = 20):
"""Initialize database connection pool"""
self.connection_pool = await asyncpg.create_pool(
**self.db_config,
min_size=min_size, # Minimum connections
max_size=max_size, # Maximum connections
command_timeout=60, # Command timeout
server_settings={
'application_name': 'aiecs_database_manager',
'timezone': 'UTC'
}
)
Connection Acquisition and Release
async def _get_connection(self):
"""Get database connection"""
if self.connection_pool:
# Use connection pool
return self.connection_pool.acquire()
else:
# Create connection directly
return asyncpg.connect(**self.db_config)
# Use context manager to automatically release connection
async with self.connection_pool.acquire() as conn:
result = await conn.fetch("SELECT * FROM task_history")
6.2 Database Schema Management
Table Structure Creation
async def _create_tables(self, conn):
"""Create database tables"""
await conn.execute('''
CREATE TABLE IF NOT EXISTS task_history (
id SERIAL PRIMARY KEY,
user_id TEXT NOT NULL,
task_id TEXT NOT NULL,
step INTEGER NOT NULL,
result JSONB NOT NULL,
timestamp TIMESTAMP NOT NULL,
status TEXT NOT NULL DEFAULT 'pending'
);
-- Create indexes to optimize query performance
CREATE INDEX IF NOT EXISTS idx_task_history_user_id ON task_history (user_id);
CREATE INDEX IF NOT EXISTS idx_task_history_task_id ON task_history (task_id);
CREATE INDEX IF NOT EXISTS idx_task_history_status ON task_history (status);
CREATE INDEX IF NOT EXISTS idx_task_history_timestamp ON task_history (timestamp);
-- Composite indexes to optimize common queries
CREATE INDEX IF NOT EXISTS idx_task_history_user_task ON task_history (user_id, task_id);
CREATE INDEX IF NOT EXISTS idx_task_history_user_timestamp ON task_history (user_id, timestamp DESC);
''')
6.3 Error Handling Strategy
Database Operation Fault Tolerance
async def save_task_history(self, user_id: str, task_id: str, step: int, step_result: TaskStepResult):
"""Save task history (with fault tolerance)"""
try:
if self.connection_pool:
async with self.connection_pool.acquire() as conn:
await conn.execute(
'INSERT INTO task_history (user_id, task_id, step, result, timestamp, status) VALUES ($1, $2, $3, $4, $5, $6)',
user_id, task_id, step, json.dumps(step_result.dict()), datetime.now(), step_result.status
)
else:
conn = await asyncpg.connect(**self.db_config)
try:
await conn.execute(
'INSERT INTO task_history (user_id, task_id, step, result, timestamp, status) VALUES ($1, $2, $3, $4, $5, $6)',
user_id, task_id, step, json.dumps(step_result.dict()), datetime.now(), step_result.status
)
finally:
await conn.close()
return True
except Exception as e:
logger.error(f"Database error saving task history: {e}")
raise Exception(f"Database error: {e}")
Retry Mechanism
import asyncio
from functools import wraps
def retry_db_operation(max_retries: int = 3, delay: float = 1.0):
"""Database operation retry decorator"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
logger.warning(f"Database operation failed (attempt {attempt + 1}/{max_retries}): {e}")
await asyncio.sleep(delay * (2 ** attempt)) # Exponential backoff
return None
return wrapper
return decorator
# Use retry decorator
@retry_db_operation(max_retries=3, delay=1.0)
async def save_task_history_with_retry(self, user_id: str, task_id: str, step: int, step_result: TaskStepResult):
"""Save task history with retry"""
# Original save logic
pass
6.4 Performance Optimization
Batch Operation Optimization
async def save_batch_task_history(self, records: List[Dict[str, Any]]):
"""Batch save task history"""
if not records:
return
try:
if self.connection_pool:
async with self.connection_pool.acquire() as conn:
# Use batch insert
await conn.executemany(
'''INSERT INTO task_history (user_id, task_id, step, result, timestamp, status)
VALUES ($1, $2, $3, $4, $5, $6)''',
[
(
record['user_id'],
record['task_id'],
record['step'],
json.dumps(record['result']),
record['timestamp'],
record['status']
)
for record in records
]
)
else:
conn = await asyncpg.connect(**self.db_config)
try:
await conn.executemany(
'''INSERT INTO task_history (user_id, task_id, step, result, timestamp, status)
VALUES ($1, $2, $3, $4, $5, $6)''',
# Same batch data
)
finally:
await conn.close()
except Exception as e:
logger.error(f"Batch save error: {e}")
raise
Query Optimization
async def get_user_tasks_optimized(self, user_id: str, limit: int = 100, offset: int = 0):
"""Optimized user task query"""
try:
if self.connection_pool:
async with self.connection_pool.acquire() as conn:
# Use pagination and optimized query
records = await conn.fetch(
'''SELECT DISTINCT task_id,
MAX(timestamp) as last_updated,
(SELECT status FROM task_history th2
WHERE th2.user_id = $1 AND th2.task_id = th1.task_id
ORDER BY step DESC LIMIT 1) as status
FROM task_history th1
WHERE user_id = $1
GROUP BY task_id
ORDER BY last_updated DESC
LIMIT $2 OFFSET $3''',
user_id, limit, offset
)
else:
# Direct connection query
conn = await asyncpg.connect(**self.db_config)
try:
records = await conn.fetch(
# Same query
)
finally:
await conn.close()
return [dict(r) for r in records]
except Exception as e:
logger.error(f"Optimized query error: {e}")
raise
7. Configuration & Deployment
7.1 Basic Configuration
Database Configuration
# Basic database configuration
db_config = {
"host": "localhost",
"port": 5432,
"database": "aiecs_db",
"user": "aiecs_user",
"password": "aiecs_password",
"min_size": 10,
"max_size": 20
}
# Create database manager
db_manager = DatabaseManager(db_config)
Environment Variable Configuration
# Database connection configuration
export DB_HOST="localhost"
export DB_PORT="5432"
export DB_NAME="aiecs_db"
export DB_USER="aiecs_user"
export DB_PASSWORD="aiecs_password"
# Connection pool configuration
export DB_MIN_SIZE="10"
export DB_MAX_SIZE="20"
export DB_COMMAND_TIMEOUT="60"
7.2 Docker Deployment
Docker Compose Configuration
version: '3.8'
services:
postgres:
image: postgres:15-alpine
environment:
- POSTGRES_DB=aiecs_db
- POSTGRES_USER=aiecs_user
- POSTGRES_PASSWORD=aiecs_password
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
aiecs-app:
build: .
environment:
- DB_HOST=postgres
- DB_PORT=5432
- DB_NAME=aiecs_db
- DB_USER=aiecs_user
- DB_PASSWORD=aiecs_password
depends_on:
- postgres
volumes:
postgres_data:
7.3 Production Environment Configuration
High Availability Configuration
# Production environment database configuration
production_config = {
"host": "postgres-cluster.internal",
"port": 5432,
"database": "aiecs_prod",
"user": "aiecs_prod_user",
"password": "secure_password",
"min_size": 20,
"max_size": 100,
"command_timeout": 30,
"server_settings": {
"application_name": "aiecs_production",
"timezone": "UTC",
"statement_timeout": "30s"
}
}
8. Maintenance & Troubleshooting
8.1 Monitoring Metrics
Key Metrics
Connection Pool Usage:
(Active connections / Maximum connections) * 100%Query Response Time: Average response time of database queries
Error Rate: Proportion of failed database operations
Data Growth: Data growth trend of task history table
Monitoring Implementation
class DatabaseMonitor:
def __init__(self, db_manager: DatabaseManager):
self.db_manager = db_manager
self.metrics = {
"connection_pool_size": 0,
"active_connections": 0,
"query_count": 0,
"error_count": 0
}
async def get_connection_pool_stats(self) -> Dict[str, Any]:
"""Get connection pool statistics"""
if self.db_manager.connection_pool:
return {
"size": self.db_manager.connection_pool.get_size(),
"min_size": self.db_manager.connection_pool.get_min_size(),
"max_size": self.db_manager.connection_pool.get_max_size(),
"closed": self.db_manager.connection_pool.is_closed()
}
return {"error": "No connection pool"}
async def get_database_health(self) -> Dict[str, Any]:
"""Get database health status"""
try:
if self.db_manager.connection_pool:
async with self.db_manager.connection_pool.acquire() as conn:
# Check database connection
result = await conn.fetchval("SELECT 1")
return {
"status": "healthy" if result == 1 else "unhealthy",
"connection_test": "passed" if result == 1 else "failed"
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e)
}
8.2 Common Issues & Solutions
Issue 1: Connection Pool Exhausted
Symptoms: asyncpg.exceptions.TooManyConnectionsError error
Possible Causes:
Connections not properly released
Improper connection pool size configuration
Long-running queries occupying connections
Solutions:
# 1. Check connection pool configuration
async def check_connection_pool_config():
if db_manager.connection_pool:
stats = await db_manager.get_connection_pool_stats()
print(f"Connection pool size: {stats['size']}/{stats['max_size']}")
if stats['size'] >= stats['max_size'] * 0.9:
print("Warning: Connection pool usage too high")
# 2. Increase connection pool size
await db_manager.init_connection_pool(min_size=20, max_size=50)
# 3. Check connection leaks
import weakref
import gc
def check_connection_leaks():
"""Check connection leaks"""
# Force garbage collection
gc.collect()
# Check unreleased connections
for obj in gc.get_objects():
if isinstance(obj, asyncpg.Connection) and not obj.is_closed():
print(f"Found unclosed connection: {obj}")
Issue 2: Database Connection Timeout
Symptoms: asyncpg.exceptions.QueryCanceledError error
Possible Causes:
Query execution time too long
Unstable network connection
Database overload
Solutions:
# 1. Increase query timeout
db_config = {
"command_timeout": 120, # 2 minute timeout
"server_settings": {
"statement_timeout": "120s"
}
}
# 2. Optimize query performance
async def optimize_slow_queries():
"""Optimize slow queries"""
if db_manager.connection_pool:
async with db_manager.connection_pool.acquire() as conn:
# Analyze query plan
result = await conn.fetch("""
EXPLAIN ANALYZE
SELECT * FROM task_history
WHERE user_id = $1 AND task_id = $2
""", "user_123", "task_456")
print("Query plan:", result)
# 3. Add query retry mechanism
@retry_db_operation(max_retries=3, delay=2.0)
async def robust_query(query: str, *args):
"""Query with retry"""
if db_manager.connection_pool:
async with db_manager.connection_pool.acquire() as conn:
return await conn.fetch(query, *args)
Issue 3: Data Consistency Issues
Symptoms: Task status inconsistent, data duplicate or missing
Possible Causes:
Concurrent write conflicts
Transactions not properly committed
Data inconsistency due to network partitions
Solutions:
# 1. Use transactions to ensure data consistency
async def save_task_with_transaction(user_id: str, task_id: str, step: int, step_result: TaskStepResult):
"""Save task data using transaction"""
if db_manager.connection_pool:
async with db_manager.connection_pool.acquire() as conn:
async with conn.transaction():
# Check if task exists
existing = await conn.fetchrow(
"SELECT id FROM task_history WHERE user_id = $1 AND task_id = $2 AND step = $3",
user_id, task_id, step
)
if existing:
# Update existing record
await conn.execute(
"UPDATE task_history SET result = $1, timestamp = $2, status = $3 WHERE id = $4",
json.dumps(step_result.dict()), datetime.now(), step_result.status, existing['id']
)
else:
# Insert new record
await conn.execute(
"INSERT INTO task_history (user_id, task_id, step, result, timestamp, status) VALUES ($1, $2, $3, $4, $5, $6)",
user_id, task_id, step, json.dumps(step_result.dict()), datetime.now(), step_result.status
)
# 2. Add unique constraints to prevent duplicates
async def add_unique_constraints():
"""Add unique constraints"""
if db_manager.connection_pool:
async with db_manager.connection_pool.acquire() as conn:
await conn.execute("""
ALTER TABLE task_history
ADD CONSTRAINT unique_user_task_step
UNIQUE (user_id, task_id, step)
""")
Issue 4: Database Performance Issues
Symptoms: Slow query response, overall system performance degradation
Possible Causes:
Missing necessary indexes
Inefficient query statements
Excessive data volume
Solutions:
# 1. Analyze query performance
async def analyze_query_performance():
"""Analyze query performance"""
if db_manager.connection_pool:
async with db_manager.connection_pool.acquire() as conn:
# View slow queries
slow_queries = await conn.fetch("""
SELECT query, mean_time, calls
FROM pg_stat_statements
ORDER BY mean_time DESC
LIMIT 10
""")
for query in slow_queries:
print(f"Slow query: {query['query'][:100]}... Average time: {query['mean_time']}ms")
# 2. Optimize indexes
async def optimize_indexes():
"""Optimize database indexes"""
if db_manager.connection_pool:
async with db_manager.connection_pool.acquire() as conn:
# Analyze table statistics
await conn.execute("ANALYZE task_history")
# Rebuild indexes
await conn.execute("REINDEX TABLE task_history")
# 3. Data partitioning
async def partition_task_history():
"""Partition task history table"""
if db_manager.connection_pool:
async with db_manager.connection_pool.acquire() as conn:
# Partition by time
await conn.execute("""
CREATE TABLE task_history_2024_01
PARTITION OF task_history
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01')
""")
9. Visualizations
9.1 System Architecture Diagram
graph TB
subgraph "Application Layer"
APP[AIECS Application]
TS[TaskService]
DS[DSLProcessor]
end
subgraph "Infrastructure Layer"
DM[DatabaseManager]
CTM[CeleryTaskManager]
WSM[WebSocketManager]
end
subgraph "Data Layer"
CP[Connection Pool]
PG[(PostgreSQL)]
IDX[Indexes]
end
APP --> DM
TS --> DM
DS --> DM
CTM --> DM
WSM --> DM
DM --> CP
CP --> PG
PG --> IDX
9.2 Database Operation Flow Diagram
sequenceDiagram
participant App as Application
participant DM as DatabaseManager
participant CP as Connection Pool
participant DB as PostgreSQL
App->>DM: Save Task History
DM->>CP: Acquire Connection
CP->>DM: Return Connection
DM->>DB: Execute INSERT
DB->>DM: Return Result
DM->>CP: Release Connection
DM->>App: Return Success
9.3 Connection Pool Management Diagram
graph LR
subgraph "Connection Pool Management"
CP[Connection Pool]
AC[Active Connections]
IC[Idle Connections]
WC[Waiting Clients]
end
subgraph "Connection States"
NEW[New Connection]
ACTIVE[Active Connection]
IDLE[Idle Connection]
CLOSED[Closed Connection]
end
CP --> AC
CP --> IC
AC --> ACTIVE
IC --> IDLE
WC --> CP
10. Version History
v1.0.0 (2024-01-15)
New Features:
Basic database connection management
Support asyncpg asynchronous driver
Implement task history save and query
Provide basic error handling
Technical Features:
Built on asyncpg
Support connection pool management
Implement JSONB data storage
Provide index optimization
v1.1.0 (2024-02-01)
Feature Enhancements:
Add task status management functionality
Implement user task list query
Support task cancellation operation
Add data cleanup functionality
Performance Optimizations:
Optimize query performance
Improve connection pool management
Enhance error handling mechanism
v1.2.0 (2024-03-01)
New Features:
Support batch operations
Add retry mechanism
Implement connection pool monitoring
Provide health check interface
Stability Improvements:
Enhance transaction support
Improve concurrency control
Optimize memory usage
v1.3.0 (2024-04-01)
Architecture Upgrades:
Upgrade to asyncpg 0.28.x
Support advanced connection pool configuration
Add query performance analysis
Implement data partitioning support
Monitoring Enhancements:
Add detailed performance metrics
Implement slow query monitoring
Support database health check
Provide operational management tools
Appendix
B. External Dependencies
C. Best Practices
# 1. Connection pool configuration best practices
optimal_config = {
"min_size": 10, # Minimum connections
"max_size": 50, # Maximum connections
"command_timeout": 60, # Command timeout
"server_settings": {
"application_name": "aiecs_app",
"timezone": "UTC",
"statement_timeout": "60s"
}
}
# 2. Query optimization best practices
async def optimized_query_example():
"""Optimized query example"""
# Use parameterized queries to prevent SQL injection
query = "SELECT * FROM task_history WHERE user_id = $1 AND task_id = $2"
# Use appropriate indexes
# CREATE INDEX idx_user_task ON task_history (user_id, task_id)
# Limit return result count
query += " LIMIT 100"
return await conn.fetch(query, user_id, task_id)
# 3. Error handling best practices
async def robust_database_operation():
"""Robust database operation"""
try:
result = await db_manager.save_task_history(...)
return result
except asyncpg.exceptions.UniqueViolationError:
logger.warning("Record already exists, skip insert")
return True
except asyncpg.exceptions.ConnectionDoesNotExistError:
logger.error("Database connection lost, attempting reconnect")
await db_manager.init_connection_pool()
raise
except Exception as e:
logger.error(f"Database operation failed: {e}")
raise
D. Contact Information
Technical Lead: AIECS Development Team
Issue Reporting: Through project Issue system
Documentation Updates: Regular maintenance, version synchronization