RedisClient Technical Documentation
1. Overview
Purpose
RedisClient is a component specifically designed for Redis database connection and operations, built on the redis.asyncio asynchronous Redis driver. It provides core functionalities including connection pool management, asynchronous operations, error handling, etc., serving as key infrastructure for caching and session management in the AIECS system.
Core Value
Asynchronous Redis Operations: High-performance asynchronous Redis access based on asyncio
Connection Pool Management: Automatically manages Redis connection pools, improving concurrent performance
Singleton Pattern: Globally shared Redis client instance, avoiding duplicate connections
Error Handling: Comprehensive error handling and logging mechanisms
Simple and Easy to Use: Provides concise API interface, hiding underlying complexity
2. Problem Background & Design Motivation
Problem Background
In the AIECS system, a large amount of cache and session data needs to be handled, including:
Task Status Caching: Need to cache task execution status and intermediate results
User Session Management: Need to store and manage user session information
Distributed Locks: Need to implement distributed lock mechanisms
Message Queue: Need to serve as Celery’s message broker
Real-Time Data: Need to store real-time updated data
Design Motivation
Performance Optimization: Improve data access performance through Redis caching
State Management: Provide reliable state storage and session management
Distributed Support: Support data sharing in distributed environments
Connection Reuse: Reduce connection overhead through connection pools
Operational Simplification: Provide unified Redis access interface
3. Architecture Positioning & Context
System Architecture Location
┌─────────────────────────────────────────────────────────────┐
│ AIECS System Architecture │
├─────────────────────────────────────────────────────────────┤
│ Application Layer │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ TaskService │ │ WebSocketManager│ │
│ └─────────────────┘ └─────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Infrastructure Layer │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ RedisClient │ │ CeleryTaskManager│ │
│ └─────────────────┘ └─────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Data Layer │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Redis Server │ │ Connection Pool │ │
│ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Upstream Callers
TaskService: Task management service that needs to cache task status
WebSocketManager: WebSocket manager that needs to store session information
CeleryTaskManager: Task executor that uses Redis as message broker
DatabaseManager: Database manager that needs to cache query results
Downstream Dependencies
Redis Server: Redis database server
redis.asyncio: Asynchronous Redis client library
Connection Pool: Redis connection pool management
4. Core Features & Use Cases
4.1 Basic Redis Operations
Key-Value Operations
# Get Redis client
redis_client = await get_redis_client()
# Set key-value pairs
await redis_client.set("user:123:name", "John Doe")
await redis_client.set("user:123:email", "john@example.com", ex=3600) # 1 hour expiration
# Get values
name = await redis_client.get("user:123:name")
email = await redis_client.get("user:123:email")
print(f"Username: {name}")
print(f"Email: {email}")
# Check if key exists
if await redis_client.exists("user:123:name"):
print("User information exists")
Hash Table Operations
# Set hash table fields
user_info = {
"name": "John Doe",
"email": "john@example.com",
"age": "25",
"city": "New York"
}
await redis_client.hset("user:123", user_info)
# Get single field
name = await redis_client.hget("user:123", "name")
print(f"Username: {name}")
# Get all fields
user_data = await redis_client.hgetall("user:123")
print(f"User information: {user_data}")
# Atomic increment
await redis_client.hincrby("user:123", "login_count", 1)
login_count = await redis_client.hget("user:123", "login_count")
print(f"Login count: {login_count}")
4.2 Task Status Caching
Cache Task Execution Status
async def cache_task_status(task_id: str, status: str, progress: int):
"""Cache task status"""
task_key = f"task:{task_id}"
task_data = {
"status": status,
"progress": str(progress),
"updated_at": str(int(time.time()))
}
# Set task status
await redis_client.hset(task_key, task_data)
# Set expiration time (24 hours)
await redis_client.expire(task_key, 86400)
print(f"Task {task_id} status cached: {status}")
# Get task status
async def get_task_status(task_id: str) -> dict:
"""Get task status"""
task_key = f"task:{task_id}"
if await redis_client.exists(task_key):
return await redis_client.hgetall(task_key)
else:
return {"status": "not_found"}
# Usage example
await cache_task_status("task_456", "running", 75)
status = await get_task_status("task_456")
print(f"Task status: {status}")
Task Result Caching
async def cache_task_result(task_id: str, result: dict):
"""Cache task result"""
result_key = f"result:{task_id}"
# Serialize result to JSON string
import json
result_json = json.dumps(result, ensure_ascii=False)
# Cache result, set 1 hour expiration
await redis_client.set(result_key, result_json, ex=3600)
print(f"Task {task_id} result cached")
async def get_task_result(task_id: str) -> dict:
"""Get task result"""
result_key = f"result:{task_id}"
result_json = await redis_client.get(result_key)
if result_json:
import json
return json.loads(result_json)
else:
return None
# Usage example
task_result = {
"accuracy": 0.95,
"processing_time": 120,
"output_file": "result.csv"
}
await cache_task_result("task_789", task_result)
result = await get_task_result("task_789")
print(f"Task result: {result}")
4.3 User Session Management
Session Storage and Management
async def create_user_session(user_id: str, session_data: dict) -> str:
"""Create user session"""
session_id = f"session_{user_id}_{int(time.time())}"
session_key = f"session:{session_id}"
# Store session data
session_data["user_id"] = user_id
session_data["created_at"] = str(int(time.time()))
await redis_client.hset(session_key, session_data)
# Set session expiration time (30 minutes)
await redis_client.expire(session_key, 1800)
print(f"User {user_id} session created: {session_id}")
return session_id
async def get_user_session(session_id: str) -> dict:
"""Get user session"""
session_key = f"session:{session_id}"
if await redis_client.exists(session_key):
return await redis_client.hgetall(session_key)
else:
return {"error": "session_not_found"}
async def update_user_session(session_id: str, updates: dict):
"""Update user session"""
session_key = f"session:{session_id}"
if await redis_client.exists(session_key):
await redis_client.hset(session_key, updates)
# Extend expiration time
await redis_client.expire(session_key, 1800)
print(f"Session {session_id} updated")
else:
print(f"Session {session_id} does not exist")
# Usage example
session_id = await create_user_session("user_123", {
"username": "John Doe",
"role": "admin",
"last_activity": str(int(time.time()))
})
session = await get_user_session(session_id)
print(f"Session information: {session}")
await update_user_session(session_id, {
"last_activity": str(int(time.time())),
"current_page": "/dashboard"
})
4.4 Distributed Lock Implementation
Redis-Based Distributed Lock
import asyncio
import uuid
class DistributedLock:
def __init__(self, redis_client, lock_name: str, timeout: int = 10):
self.redis_client = redis_client
self.lock_name = f"lock:{lock_name}"
self.timeout = timeout
self.identifier = str(uuid.uuid4())
async def acquire(self) -> bool:
"""Acquire lock"""
# Use SET command with NX and EX options for atomicity
result = await self.redis_client.set(
self.lock_name,
self.identifier,
ex=self.timeout,
nx=True # Only set if key does not exist
)
return result
async def release(self) -> bool:
"""Release lock"""
# Use Lua script to ensure only lock holder can release
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
client = await self.redis_client.get_client()
result = await client.eval(lua_script, 1, self.lock_name, self.identifier)
return bool(result)
async def __aenter__(self):
if await self.acquire():
return self
else:
raise Exception("Failed to acquire lock")
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.release()
# Use distributed lock
async def process_with_lock(resource_id: str):
"""Process resource with distributed lock"""
lock = DistributedLock(redis_client, f"resource_{resource_id}")
try:
async with lock:
print(f"Acquired lock for resource {resource_id}")
# Execute operations requiring mutual exclusion
await asyncio.sleep(2)
print(f"Processing resource {resource_id} completed")
except Exception as e:
print(f"Processing resource {resource_id} failed: {e}")
# Concurrency test
async def test_distributed_lock():
"""Test distributed lock"""
tasks = [
process_with_lock("resource_1") for _ in range(5)
]
await asyncio.gather(*tasks)
4.5 Cache Strategy Implementation
Multi-Level Cache Strategy
class CacheManager:
def __init__(self, redis_client):
self.redis_client = redis_client
self.local_cache = {} # Local cache
self.local_cache_ttl = {}
async def get(self, key: str, ttl: int = 300) -> any:
"""Get cached data (multi-level cache)"""
# 1. Check local cache
if key in self.local_cache:
if time.time() - self.local_cache_ttl[key] < 60: # Local cache 1 minute
return self.local_cache[key]
else:
# Local cache expired, delete
del self.local_cache[key]
del self.local_cache_ttl[key]
# 2. Check Redis cache
value = await self.redis_client.get(key)
if value:
# Update local cache
self.local_cache[key] = value
self.local_cache_ttl[key] = time.time()
return value
return None
async def set(self, key: str, value: str, ttl: int = 300):
"""Set cached data"""
# Set Redis cache
await self.redis_client.set(key, value, ex=ttl)
# Update local cache
self.local_cache[key] = value
self.local_cache_ttl[key] = time.time()
async def delete(self, key: str):
"""Delete cached data"""
# Delete Redis cache
await self.redis_client.delete(key)
# Delete local cache
self.local_cache.pop(key, None)
self.local_cache_ttl.pop(key, None)
# Use cache manager
cache_manager = CacheManager(redis_client)
# Cache database query results
async def get_user_from_cache(user_id: str) -> dict:
"""Get user information from cache"""
cache_key = f"user:{user_id}"
# Try to get from cache
cached_data = await cache_manager.get(cache_key)
if cached_data:
import json
return json.loads(cached_data)
# Cache miss, query from database
user_data = await database.get_user(user_id)
# Store in cache
if user_data:
import json
await cache_manager.set(cache_key, json.dumps(user_data), ttl=600)
return user_data
5. API Reference
5.1 Class Definition
RedisClient
class RedisClient:
"""Redis client singleton, used for sharing across different cache strategies"""
def __init__(self) -> None
"""Initialize Redis client"""
5.2 Public Methods
initialize
async def initialize(self) -> None
Function: Initialize Redis client
Exceptions:
Exception: Initialization failed
get_client
async def get_client(self) -> redis.Redis
Function: Get Redis client instance
Returns:
redis.Redis: Redis client instance
Exceptions:
RuntimeError: Client not initialized
close
async def close(self) -> None
Function: Close Redis connection
set
async def set(self, key: str, value: str, ex: int = None) -> bool
Function: Set key-value pair
Parameters:
key(str): Key namevalue(str): Valueex(int): Expiration time (seconds)
Returns:
bool: Whether setting succeeded
get
async def get(self, key: str) -> Optional[str]
Function: Get value
Parameters:
key(str): Key name
Returns:
Optional[str]: Value corresponding to key, returns None if not exists
hset
async def hset(self, name: str, mapping: dict) -> int
Function: Set hash table fields
Parameters:
name(str): Hash table namemapping(dict): Field mapping
Returns:
int: Number of fields set
hget
async def hget(self, name: str, key: str) -> Optional[str]
Function: Get hash table field value
Parameters:
name(str): Hash table namekey(str): Field name
Returns:
Optional[str]: Field value, returns None if not exists
hgetall
async def hgetall(self, name: str) -> dict
Function: Get all hash table fields
Parameters:
name(str): Hash table name
Returns:
dict: Dictionary of all fields
hincrby
async def hincrby(self, name: str, key: str, amount: int = 1) -> int
Function: Atomically increment hash table field
Parameters:
name(str): Hash table namekey(str): Field nameamount(int): Increment amount
Returns:
int: Value after increment
exists
async def exists(self, name: str) -> bool
Function: Check if key exists
Parameters:
name(str): Key name
Returns:
bool: Whether key exists
expire
async def expire(self, name: str, time: int) -> bool
Function: Set expiration time
Parameters:
name(str): Key nametime(int): Expiration time (seconds)
Returns:
bool: Whether setting succeeded
delete
async def delete(self, *keys) -> int
Function: Delete one or more keys
Parameters:
*keys: Keys to delete
Returns:
int: Number of keys deleted
ping
async def ping(self) -> bool
Function: Test Redis connection
Returns:
bool: Whether connection is normal
info
async def info(self, section: str = None) -> dict
Function: Get Redis server information
Parameters:
section(str): Information section
Returns:
dict: Server information
5.3 Global Functions
initialize_redis_client
async def initialize_redis_client() -> None
Function: Initialize global Redis client instance
close_redis_client
async def close_redis_client() -> None
Function: Close global Redis client instance
get_redis_client
async def get_redis_client() -> RedisClient
Function: Get global Redis client instance
Returns:
RedisClient: Redis client instance
Exceptions:
RuntimeError: Client not initialized
6. Technical Implementation Details
6.1 Connection Pool Management
Connection Pool Configuration
async def initialize(self):
"""Initialize Redis client"""
# Get configuration from environment variables
redis_host = os.getenv('REDIS_HOST', 'localhost')
redis_port = int(os.getenv('REDIS_PORT', 6379))
redis_db = int(os.getenv('REDIS_DB', 0))
redis_password = os.getenv('REDIS_PASSWORD')
# Create connection pool
self._connection_pool = redis.ConnectionPool(
host=redis_host,
port=redis_port,
db=redis_db,
password=redis_password,
decode_responses=True, # Automatically decode responses
max_connections=20, # Maximum connections
retry_on_timeout=True # Retry on timeout
)
# Create Redis client
self._client = redis.Redis(connection_pool=self._connection_pool)
# Test connection
await self._client.ping()
6.2 Singleton Pattern Implementation
Global Instance Management
# Global variable storing singleton instance
redis_client: Optional[RedisClient] = None
async def initialize_redis_client():
"""Create and initialize global Redis client instance"""
global redis_client
if redis_client is None:
redis_client = RedisClient()
await redis_client.initialize()
async def get_redis_client() -> RedisClient:
"""Get global Redis client instance"""
if redis_client is None:
raise RuntimeError("Redis client not initialized. Call initialize_redis_client() first.")
return redis_client
6.3 Error Handling Strategy
Unified Error Handling
async def set(self, key: str, value: str, ex: int = None) -> bool:
"""Set key-value pair (with error handling)"""
try:
client = await self.get_client()
return await client.set(key, value, ex=ex)
except Exception as e:
logger.error(f"Redis set failed for key {key}: {e}")
return False
async def get(self, key: str) -> Optional[str]:
"""Get value (with error handling)"""
try:
client = await self.get_client()
return await client.get(key)
except Exception as e:
logger.error(f"Redis get failed for key {key}: {e}")
return None
6.4 Connection Health Check
Connection Status Monitoring
async def ping(self) -> bool:
"""Test Redis connection"""
try:
client = await self.get_client()
result = await client.ping()
return result
except Exception as e:
logger.error(f"Redis ping failed: {e}")
return False
async def get_connection_info(self) -> dict:
"""Get connection information"""
try:
client = await self.get_client()
info = await client.info('clients')
return {
'connected_clients': info.get('connected_clients', 0),
'client_recent_max_input_buffer': info.get('client_recent_max_input_buffer', 0),
'client_recent_max_output_buffer': info.get('client_recent_max_output_buffer', 0)
}
except Exception as e:
logger.error(f"Failed to get connection info: {e}")
return {}
7. Configuration & Deployment
7.1 Basic Configuration
Environment Variable Configuration
# Redis connection configuration
export REDIS_HOST="localhost"
export REDIS_PORT="6379"
export REDIS_DB="0"
export REDIS_PASSWORD="your_password"
# Connection pool configuration
export REDIS_MAX_CONNECTIONS="20"
export REDIS_RETRY_ON_TIMEOUT="true"
export REDIS_DECODE_RESPONSES="true"
Code Configuration
# Custom configuration
redis_config = {
'host': 'redis.example.com',
'port': 6379,
'db': 0,
'password': 'secure_password',
'max_connections': 50,
'retry_on_timeout': True,
'decode_responses': True
}
# Create custom client
custom_client = RedisClient()
# Note: Current implementation uses environment variables, need to modify code to support custom configuration
7.2 Docker Deployment
Docker Compose Configuration
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
environment:
- REDIS_PASSWORD=your_password
command: redis-server --requirepass your_password
volumes:
- redis_data:/data
restart: unless-stopped
aiecs-app:
build: .
environment:
- REDIS_HOST=redis
- REDIS_PORT=6379
- REDIS_DB=0
- REDIS_PASSWORD=your_password
depends_on:
- redis
restart: unless-stopped
volumes:
redis_data:
Dockerfile Configuration
FROM python:3.9-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt
# Copy application code
COPY . .
# Set environment variables
ENV REDIS_HOST=redis
ENV REDIS_PORT=6379
ENV REDIS_DB=0
# Start command
CMD ["python", "-m", "aiecs.infrastructure.persistence.redis_client"]
7.3 Production Environment Configuration
High Availability Configuration
# Production environment Redis configuration
production_config = {
'host': 'redis-cluster.internal',
'port': 6379,
'db': 0,
'password': 'production_password',
'max_connections': 100,
'retry_on_timeout': True,
'decode_responses': True,
'socket_keepalive': True,
'socket_keepalive_options': {},
'health_check_interval': 30
}
Redis Cluster Configuration
# redis-cluster.yml
version: '3.8'
services:
redis-node-1:
image: redis:7-alpine
command: redis-server --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes --port 7001
ports:
- "7001:7001"
volumes:
- redis_node_1:/data
redis-node-2:
image: redis:7-alpine
command: redis-server --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes --port 7002
ports:
- "7002:7002"
volumes:
- redis_node_2:/data
redis-node-3:
image: redis:7-alpine
command: redis-server --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes --port 7003
ports:
- "7003:7003"
volumes:
- redis_node_3:/data
volumes:
redis_node_1:
redis_node_2:
redis_node_3:
8. Maintenance & Troubleshooting
8.1 Monitoring Metrics
Key Metrics
Connection Count: Current active connection count
Memory Usage: Redis memory usage
Command Execution Count: Number of commands executed per second
Key Count: Total number of keys in database
Hit Rate: Cache hit rate
Monitoring Implementation
class RedisMonitor:
def __init__(self, redis_client: RedisClient):
self.redis_client = redis_client
async def get_health_status(self) -> dict:
"""Get Redis health status"""
try:
# Test connection
ping_result = await self.redis_client.ping()
if not ping_result:
return {"status": "unhealthy", "error": "ping_failed"}
# Get server information
info = await self.redis_client.info()
return {
"status": "healthy",
"connected_clients": info.get("connected_clients", 0),
"used_memory": info.get("used_memory_human", "0B"),
"total_commands_processed": info.get("total_commands_processed", 0),
"keyspace_hits": info.get("keyspace_hits", 0),
"keyspace_misses": info.get("keyspace_misses", 0),
"uptime_in_seconds": info.get("uptime_in_seconds", 0)
}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
async def get_memory_usage(self) -> dict:
"""Get memory usage"""
try:
info = await self.redis_client.info('memory')
return {
"used_memory": info.get("used_memory", 0),
"used_memory_human": info.get("used_memory_human", "0B"),
"used_memory_peak": info.get("used_memory_peak", 0),
"used_memory_peak_human": info.get("used_memory_peak_human", "0B"),
"used_memory_rss": info.get("used_memory_rss", 0),
"used_memory_rss_human": info.get("used_memory_rss_human", "0B")
}
except Exception as e:
return {"error": str(e)}
async def get_performance_metrics(self) -> dict:
"""Get performance metrics"""
try:
info = await self.redis_client.info('stats')
return {
"total_commands_processed": info.get("total_commands_processed", 0),
"instantaneous_ops_per_sec": info.get("instantaneous_ops_per_sec", 0),
"keyspace_hits": info.get("keyspace_hits", 0),
"keyspace_misses": info.get("keyspace_misses", 0),
"expired_keys": info.get("expired_keys", 0),
"evicted_keys": info.get("evicted_keys", 0)
}
except Exception as e:
return {"error": str(e)}
# Use monitor
monitor = RedisMonitor(redis_client)
health = await monitor.get_health_status()
print(f"Redis health status: {health}")
8.2 Common Issues & Solutions
Issue 1: Connection Timeout
Symptoms: redis.exceptions.TimeoutError error
Possible Causes:
Unstable network connection
Redis server overload
Improper connection pool configuration
Solutions:
# 1. Check network connection
import socket
def check_redis_connection(host: str, port: int) -> bool:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
result = sock.connect_ex((host, port))
sock.close()
return result == 0
except:
return False
# 2. Adjust connection pool configuration
redis_config = {
'max_connections': 10, # Reduce connection count
'retry_on_timeout': True,
'socket_connect_timeout': 5,
'socket_timeout': 5
}
# 3. Implement connection retry mechanism
async def robust_redis_operation(operation, *args, **kwargs):
"""Redis operation with retry"""
max_retries = 3
for attempt in range(max_retries):
try:
return await operation(*args, **kwargs)
except redis.exceptions.TimeoutError:
if attempt == max_retries - 1:
raise
await asyncio.sleep(1 * (2 ** attempt)) # Exponential backoff
Issue 2: Out of Memory
Symptoms: redis.exceptions.ResponseError: OOM command not allowed when used memory > 'maxmemory'
Possible Causes:
Redis memory usage exceeds limit
No appropriate eviction policy set
Too much cached data
Solutions:
# 1. Check memory usage
async def check_memory_usage():
"""Check memory usage"""
info = await redis_client.info('memory')
used_memory = info.get('used_memory', 0)
max_memory = info.get('maxmemory', 0)
if max_memory > 0:
usage_percent = (used_memory / max_memory) * 100
print(f"Memory usage: {usage_percent:.2f}%")
if usage_percent > 90:
print("Warning: Memory usage too high")
return used_memory, max_memory
# 2. Clean expired keys
async def cleanup_expired_keys():
"""Clean expired keys"""
# Manually trigger expired key cleanup
client = await redis_client.get_client()
await client.execute_command('MEMORY', 'PURGE')
print("Expired keys cleaned")
# 3. Set memory eviction policy
# In Redis configuration set:
# maxmemory-policy allkeys-lru
Issue 3: Keyspace Notification Loss
Symptoms: Key expiration events not triggered correctly
Possible Causes:
Keyspace notifications not enabled
Event listener not properly configured
Network issues causing event loss
Solutions:
# 1. Enable keyspace notifications
# In Redis configuration set:
# notify-keyspace-events Ex
# 2. Implement key expiration listener
async def listen_for_key_expiration():
"""Listen for key expiration events"""
client = await redis_client.get_client()
pubsub = client.pubsub()
# Subscribe to key expiration events
await pubsub.psubscribe('__keyevent@0__:expired')
async for message in pubsub.listen():
if message['type'] == 'pmessage':
expired_key = message['data']
print(f"Key expired: {expired_key}")
# Handle expiration event
await handle_key_expiration(expired_key)
async def handle_key_expiration(key: str):
"""Handle key expiration event"""
if key.startswith('session:'):
print(f"Session expired: {key}")
# Clean up related data
elif key.startswith('task:'):
print(f"Task cache expired: {key}")
# Update task status
Issue 4: Data Inconsistency
Symptoms: Read data inconsistent with expectations
Possible Causes:
Concurrent write conflicts
Transactions not properly executed
Data inconsistency due to network partitions
Solutions:
# 1. Use transactions to ensure atomicity
async def atomic_update_user_score(user_id: str, score_delta: int):
"""Atomically update user score"""
client = await redis_client.get_client()
# Use MULTI/EXEC transaction
pipe = client.pipeline()
pipe.hget(f"user:{user_id}", "score")
pipe.hincrby(f"user:{user_id}", "score", score_delta)
pipe.hset(f"user:{user_id}", "last_updated", str(int(time.time())))
results = await pipe.execute()
old_score = int(results[0] or 0)
new_score = results[1]
print(f"User {user_id} score updated: {old_score} -> {new_score}")
return new_score
# 2. Use Lua script to ensure atomicity
async def atomic_transfer_points(from_user: str, to_user: str, points: int):
"""Atomically transfer points"""
lua_script = """
local from_key = KEYS[1]
local to_key = KEYS[2]
local points = tonumber(ARGV[1])
local from_balance = tonumber(redis.call('hget', from_key, 'points') or 0)
if from_balance < points then
return {false, 'insufficient_balance'}
end
redis.call('hincrby', from_key, 'points', -points)
redis.call('hincrby', to_key, 'points', points)
return {true, 'success'}
"""
client = await redis_client.get_client()
result = await client.eval(
lua_script,
2,
f"user:{from_user}",
f"user:{to_user}",
points
)
return result[0], result[1]
9. Visualizations
9.1 System Architecture Diagram
graph TB
subgraph "Application Layer"
APP[AIECS Application]
TS[TaskService]
WSM[WebSocketManager]
end
subgraph "Infrastructure Layer"
RC[RedisClient]
CTM[CeleryTaskManager]
DM[DatabaseManager]
end
subgraph "Redis Layer"
CP[Connection Pool]
RS[Redis Server]
CACHE[Cache Data]
end
APP --> RC
TS --> RC
WSM --> RC
CTM --> RC
DM --> RC
RC --> CP
CP --> RS
RS --> CACHE
9.2 Connection Management Flow Diagram
sequenceDiagram
participant App as Application
participant RC as RedisClient
participant CP as Connection Pool
participant RS as Redis Server
App->>RC: Initialize
RC->>CP: Create Connection Pool
CP->>RS: Establish Connection
RS-->>CP: Connection Acknowledgment
CP-->>RC: Connection Pool Ready
RC-->>App: Initialization Complete
App->>RC: Execute Operation
RC->>CP: Get Connection
CP->>RS: Execute Command
RS-->>CP: Return Result
CP-->>RC: Return Result
RC-->>App: Operation Complete
9.3 Cache Strategy Diagram
graph LR
subgraph "Cache Strategy"
L1[Local Cache]
L2[Redis Cache]
L3[Database]
end
subgraph "Data Flow"
REQ[Request]
L1_CHECK[Check L1]
L2_CHECK[Check L2]
DB_QUERY[Query Database]
L1_UPDATE[Update L1]
L2_UPDATE[Update L2]
end
REQ --> L1_CHECK
L1_CHECK -->|Hit| L1
L1_CHECK -->|Miss| L2_CHECK
L2_CHECK -->|Hit| L2
L2_CHECK -->|Miss| DB_QUERY
DB_QUERY --> L1_UPDATE
DB_QUERY --> L2_UPDATE
L1_UPDATE --> L1
L2_UPDATE --> L2
10. Version History
v1.0.0 (2024-01-15)
New Features:
Basic Redis client implementation
Support asynchronous operations
Implement connection pool management
Provide basic key-value operations
Technical Features:
Built on redis.asyncio
Support singleton pattern
Implement basic error handling
v1.1.0 (2024-02-01)
Feature Enhancements:
Add hash table operation support
Implement expiration time setting
Add connection health check
Support server information query
Performance Optimizations:
Optimize connection pool configuration
Improve error handling mechanism
Enhance logging
v1.2.0 (2024-03-01)
New Features:
Support global instance management
Add application lifecycle management
Implement atomic operation support
Provide monitoring interface
Stability Improvements:
Enhance connection retry mechanism
Improve resource management
Optimize exception handling
v1.3.0 (2024-04-01)
Architecture Upgrades:
Upgrade to redis.asyncio 5.x
Support Redis cluster
Add data serialization support
Implement advanced cache strategies
Monitoring Enhancements:
Add detailed performance metrics
Implement health check interface
Support alert integration
Provide operational management tools
Appendix
B. External Dependencies
C. Best Practices
# 1. Key naming conventions
# Use meaningful key prefixes and separators
key_patterns = [
'user:{user_id}:profile',
'task:{task_id}:status',
'session:{session_id}:data',
'cache:{type}:{id}'
]
# 2. Expiration time settings
# Set appropriate expiration times based on data characteristics
expiration_times = {
'session': 1800, # 30 minutes
'cache': 3600, # 1 hour
'temp': 300, # 5 minutes
'permanent': None # Never expires
}
# 3. Error handling best practices
async def safe_redis_operation(operation, *args, **kwargs):
"""Safe Redis operation"""
try:
return await operation(*args, **kwargs)
except redis.exceptions.ConnectionError:
logger.error("Redis connection error")
return None
except redis.exceptions.TimeoutError:
logger.error("Redis operation timeout")
return None
except Exception as e:
logger.error(f"Redis operation failed: {e}")
return None
D. Contact Information
Technical Lead: AIECS Development Team
Issue Reporting: Through project Issue system
Documentation Updates: Regular maintenance, version synchronization