Token Usage Repository Technical Documentation

1. Overview

Purpose: token_usage_repository.py is the core data access layer component in the AIECS system responsible for token usage statistics and management. This module provides high-performance token usage storage, querying, limit checking, and other functionalities through Redis, providing a reliable data foundation for cost control, usage monitoring, and user management across the entire system.

Core Value:

  • Precise Statistics: Real-time recording of user prompt tokens, completion tokens, and total usage

  • High-Performance Storage: Redis-based atomic operations, supporting high-concurrency access

  • Flexible Billing: Support custom billing cycles and usage limits

  • Cost Control: Provide usage limit checking to prevent excessive usage

  • Data Consistency: Use Redis atomic operations to ensure data accuracy

2. Problem Background & Design Motivation

2.1 Business Pain Points

During AIECS system development, we face the following key challenges:

  1. Cost Control Challenges: LLM API calls are expensive, requiring precise statistics of each user’s token usage

  2. Missing Usage Monitoring: Lack of real-time monitoring and analysis of user usage

  3. Billing Cycle Confusion: Different users may have different billing cycles, requiring flexible support

  4. Excessive Usage Risk: Lack of usage limit mechanism may lead to unexpected cost overruns

  5. Data Consistency Issues: Data race and consistency issues in high-concurrency scenarios

  6. Performance Bottlenecks: Traditional databases cannot meet high-frequency token statistics requirements

2.2 Design Motivation

Based on the above pain points, we designed a Redis-based token usage repository:

  • High-Performance Storage: Use Redis to provide millisecond-level read/write performance

  • Atomic Operations: Use Redis HINCRBY to ensure data consistency

  • Flexible Billing: Support custom billing cycles and usage limits

  • Real-Time Monitoring: Provide real-time usage queries and limit checking

  • Cost Control: Prevent excessive usage through usage limits

3. Architecture Positioning & Context

3.1 System Architecture Diagram

graph TB
    subgraph "Business Layer"
        A[LLM Client] --> B[Callback Handler]
        B --> C[Token Statistics]
    end
    
    subgraph "Data Access Layer"
        D[Token Usage Repository] --> E[Redis Operations]
        D --> F[Usage Statistics]
        D --> G[Limit Checking]
    end
    
    subgraph "Storage Layer"
        H[Redis Cluster] --> I[Hash Storage]
        H --> J[Atomic Operations]
        H --> K[Data Persistence]
    end
    
    subgraph "Monitoring Layer"
        L[Usage Monitoring] --> D
        M[Cost Analysis] --> D
        N[Alert System] --> D
    end
    
    C --> D
    E --> H
    F --> L
    G --> M

3.2 Upstream and Downstream Dependencies

Upstream Callers:

  • CustomAsyncCallbackHandler: LLM callback handler, records token usage

  • Business Service Layer: Needs to query user usage statistics

  • Monitoring System: Needs to get usage data for alerts

Downstream Dependencies:

  • RedisClient: Redis client, provides data storage capability

  • Redis Server: Provides high-performance data storage and atomic operations

  • Logging System: Records operation logs and error information

Peer Components:

  • Configuration Management: Provides Redis connection configuration

  • Error Handling: Handles Redis connection and operation exceptions

3.3 Data Flow

sequenceDiagram
    participant LLM as LLM Client
    participant CB as Callback Handler
    participant TR as Token Repository
    participant RD as Redis
    participant MON as Monitoring System

    LLM->>CB: Call Complete
    CB->>TR: increment_detailed_usage
    TR->>RD: HINCRBY Operation
    RD->>TR: Return Update Result
    TR->>CB: Record Success
    CB->>LLM: Continue Processing
    
    MON->>TR: get_usage_stats
    TR->>RD: HGETALL Query
    RD->>TR: Return Statistics Data
    TR->>MON: Return Usage Statistics

4. Core Features & Use Cases

4.1 Precise Token Statistics

Function Description: Real-time recording and statistics of user prompt tokens, completion tokens, and total usage.

Core Features:

  • Support separate statistics for prompt and completion tokens

  • Automatically calculate total usage

  • Redis-based atomic operations ensure data consistency

  • Support custom billing cycles

Use Cases:

from aiecs.utils.token_usage_repository import token_usage_repo

# Record detailed token usage
await token_usage_repo.increment_detailed_usage(
    user_id="user_123",
    prompt_tokens=150,
    completion_tokens=75,
    cycle_start_date="2024-01-01"
)

# Record total usage
await token_usage_repo.increment_total_usage(
    user_id="user_123",
    total_tokens=225,
    cycle_start_date="2024-01-01"
)

# Record prompt and completion tokens separately
await token_usage_repo.increment_prompt_tokens(
    user_id="user_123",
    prompt_tokens=150
)

await token_usage_repo.increment_completion_tokens(
    user_id="user_123",
    completion_tokens=75
)

Real-world Application Cases:

  • Cost Accounting: Precisely calculate each user’s API call costs

  • Usage Analysis: Analyze user usage patterns and trends

  • Resource Optimization: Optimize resource allocation based on usage data

  • Billing System: Provide accurate usage data for billing systems

4.2 Usage Limit Management

Function Description: Set usage limits for users and provide real-time limit checking functionality.

Core Features:

  • Support user-level usage limit settings

  • Real-time checking if limits are exceeded

  • Provide remaining usage queries

  • Support limit settings for different billing cycles

Use Cases:

# Set user usage limit
await token_usage_repo.set_usage_limit(
    user_id="user_123",
    limit=10000,  # 10K tokens
    cycle_start_date="2024-01-01"
)

# Check usage limit
limit_check = await token_usage_repo.check_usage_limit(
    user_id="user_123",
    cycle_start_date="2024-01-01"
)

if limit_check["exceeded"]:
    print(f"User has exceeded limit! Current usage: {limit_check['current_usage']}, Limit: {limit_check['limit']}")
else:
    print(f"Remaining usage: {limit_check['remaining']} tokens")

# Check limit before LLM call
async def call_llm_with_limit_check(user_id: str, prompt: str):
    limit_check = await token_usage_repo.check_usage_limit(user_id)
    
    if limit_check["exceeded"]:
        raise Exception("User usage has exceeded limit")
    
    # Continue LLM call
    return await llm_client.generate_text(prompt)

Real-world Application Cases:

  • Cost Control: Prevent users from excessive usage causing cost overruns

  • Resource Management: Reasonably allocate system resources

  • User Management: Set different usage limits for different users

  • Alert System: Send alerts when usage approaches limits

4.3 Usage Query and Statistics

Function Description: Provide rich usage query and statistics functionality, supporting monitoring and analysis.

Core Features:

  • Query usage statistics for specified users

  • Support data queries for different billing cycles

  • Provide detailed usage breakdown

  • Support batch queries and aggregate statistics

Use Cases:

# Get user usage statistics
stats = await token_usage_repo.get_usage_stats(
    user_id="user_123",
    cycle_start_date="2024-01-01"
)

print(f"Prompt tokens: {stats['prompt_tokens']}")
print(f"Completion tokens: {stats['completion_tokens']}")
print(f"Total tokens: {stats['total_tokens']}")

# Batch query usage for multiple users
async def get_batch_usage_stats(user_ids: List[str]):
    results = {}
    for user_id in user_ids:
        stats = await token_usage_repo.get_usage_stats(user_id)
        results[user_id] = stats
    return results

# Calculate usage rate
def calculate_usage_rate(stats: Dict[str, int], limit: int) -> float:
    total_usage = stats.get("total_tokens", 0)
    return (total_usage / limit) * 100 if limit > 0 else 0

Real-world Application Cases:

  • User Dashboard: Provide usage overview for users

  • Administrator Monitoring: Monitor overall system usage

  • Cost Analysis: Analyze cost distribution across different users and services

  • Trend Analysis: Analyze usage change trends

4.4 Data Reset and Management

Function Description: Provide data reset and management functionality, supporting billing cycle reset and abnormal data cleanup.

Core Features:

  • Support resetting user usage data

  • Support reset by billing cycle

  • Provide data cleanup and repair functionality

  • Support batch operations

Use Cases:

# Reset user usage (new billing cycle starts)
await token_usage_repo.reset_usage(
    user_id="user_123",
    cycle_start_date="2024-02-01"  # New billing cycle
)

# Batch reset usage for multiple users
async def reset_batch_usage(user_ids: List[str], new_cycle: str):
    for user_id in user_ids:
        await token_usage_repo.reset_usage(user_id, new_cycle)

# Data repair: Clean abnormal data
async def cleanup_abnormal_data(user_id: str):
    stats = await token_usage_repo.get_usage_stats(user_id)
    
    # Check for data anomalies
    if stats["total_tokens"] < 0:
        logger.warning(f"Found abnormal data: {stats}")
        await token_usage_repo.reset_usage(user_id)

Real-world Application Cases:

  • Billing Cycle Reset: Monthly reset of user usage

  • Data Repair: Repair abnormal or corrupted data

  • Test Environment Cleanup: Clean test data

  • User Migration: Migrate user data to new billing cycle

5. API Reference

5.1 TokenUsageRepository Class

Constructor

def __init__(self)

Function: Initialize token usage repository Parameters: None Returns: TokenUsageRepository instance

Methods

increment_prompt_tokens
async def increment_prompt_tokens(self, user_id: str, prompt_tokens: int, cycle_start_date: Optional[str] = None)

Function: Increment user’s prompt token usage Parameters:

  • user_id (str, required): User ID

  • prompt_tokens (int, required): Number of prompt tokens to increment

  • cycle_start_date (Optional[str], optional): Billing cycle start date, format “YYYY-MM-DD”

Returns: None

Exceptions:

  • Exception: When Redis operation fails

Usage Example:

await token_usage_repo.increment_prompt_tokens("user_123", 150)
increment_completion_tokens
async def increment_completion_tokens(self, user_id: str, completion_tokens: int, cycle_start_date: Optional[str] = None)

Function: Increment user’s completion token usage Parameters:

  • user_id (str, required): User ID

  • completion_tokens (int, required): Number of completion tokens to increment

  • cycle_start_date (Optional[str], optional): Billing cycle start date

Returns: None

Exceptions:

  • Exception: When Redis operation fails

increment_total_usage
async def increment_total_usage(self, user_id: str, total_tokens: int, cycle_start_date: Optional[str] = None)

Function: Increment user’s total token usage Parameters:

  • user_id (str, required): User ID

  • total_tokens (int, required): Number of total tokens to increment

  • cycle_start_date (Optional[str], optional): Billing cycle start date

Returns: None

Exceptions:

  • Exception: When Redis operation fails

increment_detailed_usage
async def increment_detailed_usage(
    self,
    user_id: str,
    prompt_tokens: int,
    completion_tokens: int,
    cycle_start_date: Optional[str] = None
)

Function: Increment both user’s prompt and completion token usage simultaneously Parameters:

  • user_id (str, required): User ID

  • prompt_tokens (int, required): Number of prompt tokens to increment

  • completion_tokens (int, required): Number of completion tokens to increment

  • cycle_start_date (Optional[str], optional): Billing cycle start date

Returns: None

Exceptions:

  • Exception: When Redis operation fails

Usage Example:

await token_usage_repo.increment_detailed_usage(
    user_id="user_123",
    prompt_tokens=150,
    completion_tokens=75,
    cycle_start_date="2024-01-01"
)
get_usage_stats
async def get_usage_stats(self, user_id: str, cycle_start_date: Optional[str] = None) -> Dict[str, int]

Function: Get user usage statistics Parameters:

  • user_id (str, required): User ID

  • cycle_start_date (Optional[str], optional): Billing cycle start date

Returns:

  • Dict[str, int]: Dictionary containing the following keys:

    • prompt_tokens: Prompt token usage

    • completion_tokens: Completion token usage

    • total_tokens: Total token usage

Exceptions:

  • No direct exceptions, returns default values on error

Usage Example:

stats = await token_usage_repo.get_usage_stats("user_123")
print(f"Total usage: {stats['total_tokens']}")
reset_usage
async def reset_usage(self, user_id: str, cycle_start_date: Optional[str] = None)

Function: Reset user usage data Parameters:

  • user_id (str, required): User ID

  • cycle_start_date (Optional[str], optional): Billing cycle start date

Returns: None

Exceptions:

  • Exception: When Redis operation fails

set_usage_limit
async def set_usage_limit(self, user_id: str, limit: int, cycle_start_date: Optional[str] = None)

Function: Set user usage limit Parameters:

  • user_id (str, required): User ID

  • limit (int, required): Usage limit (number of tokens)

  • cycle_start_date (Optional[str], optional): Billing cycle start date

Returns: None

Exceptions:

  • Exception: When Redis operation fails

check_usage_limit
async def check_usage_limit(self, user_id: str, cycle_start_date: Optional[str] = None) -> Dict[str, Any]

Function: Check if user has exceeded usage limit Parameters:

  • user_id (str, required): User ID

  • cycle_start_date (Optional[str], optional): Billing cycle start date

Returns:

  • Dict[str, Any]: Dictionary containing the following keys:

    • exceeded: Whether limit is exceeded (bool)

    • current_usage: Current usage (int)

    • limit: Set limit (int)

    • remaining: Remaining usage (int or float(‘inf’))

Exceptions:

  • No direct exceptions, returns default values on error

Usage Example:

limit_check = await token_usage_repo.check_usage_limit("user_123")
if limit_check["exceeded"]:
    print("User has exceeded limit")
else:
    print(f"Remaining usage: {limit_check['remaining']}")

5.2 Global Instance

token_usage_repo

token_usage_repo = TokenUsageRepository()

Function: Global singleton instance for use throughout the application Type: TokenUsageRepository

Usage Example:

from aiecs.utils.token_usage_repository import token_usage_repo

# Use global instance directly
await token_usage_repo.increment_total_usage("user_123", 100)

6. Technical Implementation Details

6.1 Redis Key Design

Key Format:

def _get_key_for_current_period(self, user_id: str, cycle_start_date: Optional[str] = None) -> str:
    if cycle_start_date:
        period = cycle_start_date
    else:
        period = datetime.now().strftime("%Y-%m-%d")
    
    return f"token_usage:{user_id}:{period}"

Key Structure Explanation:

  • token_usage: Fixed prefix, identifies token usage data

  • {user_id}: User ID, ensures data isolation

  • {period}: Billing cycle, supports multiple cycle data coexistence

Hash Field Design:

# Redis Hash fields
{
    "prompt_tokens": "150",      # Prompt token usage
    "completion_tokens": "75",   # Completion token usage
    "total_tokens": "225",       # Total token usage
    "usage_limit": "10000"       # Usage limit (optional)
}

6.2 Atomic Operation Mechanism

HINCRBY Operation:

# Use Redis HINCRBY to ensure atomicity
await client.hincrby(redis_key, "prompt_tokens", prompt_tokens)
await client.hincrby(redis_key, "completion_tokens", completion_tokens)
await client.hincrby(redis_key, "total_tokens", total_tokens)

Pipeline Batch Operations:

# Use Pipeline for batch atomic operations
pipe = client.pipeline()
for field, value in updates.items():
    pipe.hincrby(redis_key, field, value)
await pipe.execute()

Advantages:

  • Ensure data consistency

  • Support high-concurrency access

  • Reduce network round trips

6.3 Error Handling Mechanism

Layered Error Handling:

async def increment_detailed_usage(self, user_id: str, prompt_tokens: int, completion_tokens: int, cycle_start_date: Optional[str] = None):
    try:
        # Parameter validation
        if not user_id or (prompt_tokens <= 0 and completion_tokens <= 0):
            return
        
        # Redis operations
        # ... execute operations
        
    except Exception as e:
        logger.error(f"Failed to increment detailed usage for user {user_id}: {e}")
        raise  # Re-raise exception for caller to handle

Error Type Handling:

  • Parameter validation errors: Return directly, don’t execute operation

  • Redis connection errors: Log error and re-raise

  • Data format errors: Log error and return default value

6.4 Performance Optimization Strategies

Connection Pool Management:

# Use Redis connection pool
self._connection_pool = redis.ConnectionPool(
    host=redis_host,
    port=redis_port,
    db=redis_db,
    password=redis_password,
    decode_responses=True,
    max_connections=20,
    retry_on_timeout=True
)

Batch Operation Optimization:

# Batch update multiple fields
updates = {}
if prompt_tokens > 0:
    updates["prompt_tokens"] = prompt_tokens
if completion_tokens > 0:
    updates["completion_tokens"] = completion_tokens

# Use Pipeline for batch execution
pipe = client.pipeline()
for field, value in updates.items():
    pipe.hincrby(redis_key, field, value)
await pipe.execute()

Data Validation Optimization:

# Early validation to avoid invalid operations
if not user_id or prompt_tokens <= 0:
    return

# Type conversion optimization
result = {}
for key, value in stats.items():
    try:
        result[key] = int(value) if value else 0
    except (ValueError, TypeError):
        result[key] = 0

6.5 Data Consistency Guarantees

Atomic Operations:

  • Use Redis HINCRBY to ensure atomicity of single field

  • Use Pipeline to ensure atomicity of multiple fields

Data Validation:

# Ensure required fields exist
result.setdefault("prompt_tokens", 0)
result.setdefault("completion_tokens", 0)
result.setdefault("total_tokens", 0)

Error Recovery:

# Provide default values to ensure system stability
except Exception as e:
    logger.error(f"Failed to get usage stats for user {user_id}: {e}")
    return {
        "prompt_tokens": 0,
        "completion_tokens": 0,
        "total_tokens": 0
    }

7. Configuration & Deployment

7.1 Environment Requirements

Python Version:

  • Python 3.8+ (Python 3.9+ recommended)

  • Support asyncio asynchronous programming

  • Support typing type annotations

Dependencies:

# requirements.txt
redis>=4.5.0          # Redis client
asyncio>=3.4.3        # Asynchronous programming support

Redis Requirements:

  • Redis 6.0+ (Redis 7.0+ recommended)

  • Support Hash data type

  • Support HINCRBY operation

7.2 Environment Variable Configuration

Redis Connection Configuration:

# .env
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
REDIS_PASSWORD=your_password
REDIS_MAX_CONNECTIONS=20
REDIS_RETRY_ON_TIMEOUT=true

Token Statistics Configuration:

# Token statistics related configuration
TOKEN_STATS_ENABLED=true
TOKEN_STATS_DEFAULT_CYCLE=monthly
TOKEN_STATS_CLEANUP_INTERVAL=3600
TOKEN_STATS_BATCH_SIZE=100

7.3 Deployment Configuration

Docker Configuration:

FROM python:3.9-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy code
COPY aiecs/ ./aiecs/

# Set environment variables
ENV REDIS_HOST=redis
ENV REDIS_PORT=6379
ENV TOKEN_STATS_ENABLED=true

# Run application
CMD ["python", "-m", "aiecs.utils.token_usage_repository"]

Kubernetes Configuration:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: aiecs-token-repository
spec:
  replicas: 3
  selector:
    matchLabels:
      app: aiecs-token-repository
  template:
    metadata:
      labels:
        app: aiecs-token-repository
    spec:
      containers:
      - name: token-repository
        image: aiecs/token-repository:latest
        env:
        - name: REDIS_HOST
          value: "redis-service"
        - name: REDIS_PORT
          value: "6379"
        - name: TOKEN_STATS_ENABLED
          value: "true"
        resources:
          requests:
            memory: "128Mi"
            cpu: "100m"
          limits:
            memory: "256Mi"
            cpu: "200m"

7.4 Monitoring Configuration

Prometheus Metrics:

from prometheus_client import Counter, Histogram, Gauge

# Define monitoring metrics
token_increments_total = Counter('token_increments_total', 'Total token increments', ['user_id', 'token_type'])
token_queries_total = Counter('token_queries_total', 'Total token queries', ['user_id'])
token_operations_duration_seconds = Histogram('token_operations_duration_seconds', 'Token operation duration')
redis_operations_total = Counter('redis_operations_total', 'Total Redis operations', ['operation_type', 'status'])

Health Check:

async def health_check():
    """Token usage repository health check"""
    try:
        # Test Redis connection
        redis_client = await get_redis_client()
        await redis_client.ping()
        
        # Test basic operations
        test_user = "health_check_test"
        await token_usage_repo.increment_total_usage(test_user, 1)
        stats = await token_usage_repo.get_usage_stats(test_user)
        await token_usage_repo.reset_usage(test_user)
        
        return {
            "status": "healthy",
            "timestamp": time.time(),
            "version": "1.0.0"
        }
    except Exception as e:
        return {"status": "unhealthy", "error": str(e)}

8. Maintenance & Troubleshooting

8.1 Monitoring Metrics

Key Metrics:

  • Token increment operation success rate

  • Query operation response time

  • Redis connection status

  • Data consistency checks

Monitoring Dashboard:

# Grafana query examples
# Token increment success rate
rate(token_increments_total[5m])

# Average operation time
histogram_quantile(0.95, rate(token_operations_duration_seconds_bucket[5m]))

# Redis operation success rate
rate(redis_operations_total{status="success"}[5m]) / rate(redis_operations_total[5m])

8.2 Common Issues & Solutions

8.2.1 Redis Connection Failure

Symptoms:

  • “Failed to initialize Redis client” error in logs

  • Token statistics operations fail

  • System functionality limited

Troubleshooting Steps:

  1. Check Redis service status: redis-cli ping

  2. Verify network connection: telnet redis_host 6379

  3. Check authentication: Verify password and permissions

  4. View Redis logs: tail -f /var/log/redis/redis.log

Solutions:

# Add connection retry mechanism
class ResilientTokenRepository(TokenUsageRepository):
    def __init__(self, max_retries=3, retry_delay=1):
        super().__init__()
        self.max_retries = max_retries
        self.retry_delay = retry_delay
    
    async def _execute_with_retry(self, operation, *args, **kwargs):
        """Execute operation with retry"""
        for attempt in range(self.max_retries):
            try:
                return await operation(*args, **kwargs)
            except Exception as e:
                if attempt == self.max_retries - 1:
                    raise
                logger.warning(f"Operation failed, retrying {attempt + 1}/{self.max_retries}: {e}")
                await asyncio.sleep(self.retry_delay * (2 ** attempt))

8.2.2 Data Inconsistency

Symptoms:

  • Statistics data inaccurate

  • Different queries return different results

  • Data shows negative values

Troubleshooting Steps:

  1. Check concurrent operations: See if multiple processes are operating simultaneously

  2. Verify Redis data: Query data directly in Redis

  3. Analyze operation logs: Find abnormal operation records

Solutions:

# Data consistency check tool
async def verify_data_consistency(user_id: str, cycle_start_date: str = None):
    """Verify data consistency"""
    try:
        # Get statistics data
        stats = await token_usage_repo.get_usage_stats(user_id, cycle_start_date)
        
        # Verify data consistency
        calculated_total = stats.get("prompt_tokens", 0) + stats.get("completion_tokens", 0)
        stored_total = stats.get("total_tokens", 0)
        
        if calculated_total != stored_total:
            logger.warning(f"Data inconsistency: calculated total {calculated_total} != stored total {stored_total}")
            
            # Repair data
            await token_usage_repo.increment_total_usage(
                user_id, 
                calculated_total - stored_total, 
                cycle_start_date
            )
            
        return True
    except Exception as e:
        logger.error(f"Data consistency check failed: {e}")
        return False

8.2.3 Performance Issues

Symptoms:

  • Token statistics operations respond slowly

  • Redis operations timeout

  • Overall system performance degrades

Troubleshooting Steps:

  1. Analyze operation time: Use performance profiling tools

  2. Check Redis performance: Monitor Redis CPU and memory usage

  3. Optimize batch operations: Reduce network round trips

Solutions:

# Performance optimized version
class OptimizedTokenRepository(TokenUsageRepository):
    def __init__(self, batch_size=100, cache_ttl=300):
        super().__init__()
        self.batch_size = batch_size
        self.cache_ttl = cache_ttl
        self._cache = {}
    
    async def batch_increment_usage(self, operations: List[Dict]):
        """Batch increment usage"""
        # Group by user
        user_operations = {}
        for op in operations:
            user_id = op["user_id"]
            if user_id not in user_operations:
                user_operations[user_id] = []
            user_operations[user_id].append(op)
        
        # Batch execute
        tasks = []
        for user_id, ops in user_operations.items():
            task = self._batch_process_user(user_id, ops)
            tasks.append(task)
        
        await asyncio.gather(*tasks)
    
    async def _batch_process_user(self, user_id: str, operations: List[Dict]):
        """Batch process operations for single user"""
        total_prompt = sum(op.get("prompt_tokens", 0) for op in operations)
        total_completion = sum(op.get("completion_tokens", 0) for op in operations)
        
        if total_prompt > 0 or total_completion > 0:
            await self.increment_detailed_usage(
                user_id, total_prompt, total_completion
            )

8.3 Data Backup and Recovery

Backup Strategy:

# Redis data backup
redis-cli --rdb /backup/token_usage_$(date +%Y%m%d).rdb

# Regular backup script
#!/bin/bash
DATE=$(date +%Y%m%d_%H%M%S)
redis-cli --rdb /backup/token_usage_$DATE.rdb
gzip /backup/token_usage_$DATE.rdb

Recovery Process:

# Stop Redis service
systemctl stop redis

# Restore data
gunzip /backup/token_usage_20240101_120000.rdb.gz
cp /backup/token_usage_20240101_120000.rdb /var/lib/redis/dump.rdb

# Start Redis service
systemctl start redis

8.4 Log Analysis

Log Configuration:

import logging

# Configure token repository logs
token_logger = logging.getLogger('aiecs.token_usage_repository')
token_logger.setLevel(logging.INFO)

# Add file handler
file_handler = logging.FileHandler('/var/log/aiecs/token_usage.log')
file_handler.setFormatter(logging.Formatter(
    '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
token_logger.addHandler(file_handler)

Key Log Patterns:

# Find error logs
grep "ERROR" /var/log/aiecs/token_usage.log | tail -100

# Analyze performance issues
grep "duration" /var/log/aiecs/token_usage.log

# Monitor usage changes
grep "incremented by" /var/log/aiecs/token_usage.log | tail -50

9. Visualizations

9.1 System Architecture Diagram

graph TB
    subgraph "Business Layer"
        A[LLM Client] --> B[Callback Handler]
        B --> C[Token Statistics]
    end
    
    subgraph "Data Access Layer"
        D[Token Usage Repository] --> E[Redis Operations]
        D --> F[Usage Statistics]
        D --> G[Limit Checking]
    end
    
    subgraph "Storage Layer"
        H[Redis Cluster] --> I[Hash Storage]
        H --> J[Atomic Operations]
        H --> K[Data Persistence]
    end
    
    subgraph "Monitoring Layer"
        L[Usage Monitoring] --> D
        M[Cost Analysis] --> D
        N[Alert System] --> D
    end
    
    C --> D
    E --> H
    F --> L
    G --> M

9.2 Data Flow Diagram

flowchart TD
    A[LLM Call] --> B[Token Statistics]
    B --> C[Increment Operation]
    C --> D[Redis Storage]
    D --> E[Data Update]
    
    F[Usage Query] --> G[Statistics Data]
    G --> H[Redis Query]
    H --> I[Data Return]
    
    J[Limit Check] --> K[Usage Comparison]
    K --> L[Limit Judgment]
    L --> M[Result Return]

9.3 Token Usage Trend Chart

xychart-beta
    title "Token Usage Trend"
    x-axis ["Jan", "Feb", "Mar", "Apr", "May", "Jun"]
    y-axis "Token Count" 0 --> 100000
    bar [10000, 15000, 12000, 18000, 22000, 25000]
    line [8000, 12000, 10000, 15000, 20000, 23000]

9.4 User Usage Distribution Chart

pie title "User Token Usage Distribution"
    "User A" : 35
    "User B" : 25
    "User C" : 20
    "Other Users" : 20

10. Version History

v1.0.0 (2024-01-15)

New Features:

  • Implement basic TokenUsageRepository class

  • Support separate statistics for prompt and completion tokens

  • Implement total usage statistics

  • Add Redis atomic operation support

Technical Features:

  • Redis Hash-based data storage

  • Use HINCRBY to ensure atomicity

  • Support custom billing cycles

  • Complete error handling mechanism

v1.1.0 (2024-02-01)

New Features:

  • Implement detailed usage statistics (increment_detailed_usage)

  • Add usage limit management functionality

  • Implement usage query and statistics

  • Add data reset functionality

Improvements:

  • Optimize Redis operation performance

  • Enhance error handling mechanism

  • Add detailed log recording

  • Improve data validation logic

v1.2.0 (2024-03-01)

New Features:

  • Implement Pipeline batch operations

  • Add data consistency checks

  • Support batch queries and operations

  • Add performance monitoring metrics

Performance Optimizations:

  • Use Pipeline to reduce network round trips

  • Optimize data validation logic

  • Add connection pool management

  • Improve concurrent processing capability

v1.3.0 (2024-04-01) [Planned]

Planned Features:

  • Support distributed Redis cluster

  • Add data sharding functionality

  • Implement real-time data synchronization

  • Support multi-tenant data isolation

Performance Goals:

  • Operation latency < 10ms

  • Support 10000+ concurrent operations

  • 99.9% availability guarantee

  • 100% data consistency


Appendix

B. Example Code Repositories

C. Technical Support

  • Technical Documentation: https://docs.aiecs.com

  • Issue Reporting: https://github.com/aiecs/issues

  • Community Discussion: https://discord.gg/aiecs

D. Best Practices

D.1 Usage Statistics Best Practices

# Batch statistics best practices
class TokenUsageManager:
    """Token usage manager"""
    
    def __init__(self):
        self.repository = TokenUsageRepository()
        self.batch_queue = []
        self.batch_size = 100
        self.flush_interval = 30  # 30 seconds
    
    async def record_usage(self, user_id: str, prompt_tokens: int, completion_tokens: int):
        """Record usage"""
        self.batch_queue.append({
            "user_id": user_id,
            "prompt_tokens": prompt_tokens,
            "completion_tokens": completion_tokens,
            "timestamp": time.time()
        })
        
        if len(self.batch_queue) >= self.batch_size:
            await self.flush_batch()
    
    async def flush_batch(self):
        """Flush batch data"""
        if not self.batch_queue:
            return
        
        # Group by user
        user_operations = {}
        for op in self.batch_queue:
            user_id = op["user_id"]
            if user_id not in user_operations:
                user_operations[user_id] = {"prompt_tokens": 0, "completion_tokens": 0}
            
            user_operations[user_id]["prompt_tokens"] += op["prompt_tokens"]
            user_operations[user_id]["completion_tokens"] += op["completion_tokens"]
        
        # Batch update
        tasks = []
        for user_id, tokens in user_operations.items():
            task = self.repository.increment_detailed_usage(
                user_id, tokens["prompt_tokens"], tokens["completion_tokens"]
            )
            tasks.append(task)
        
        await asyncio.gather(*tasks)
        self.batch_queue.clear()

D.2 Usage Limit Best Practices

# Usage limit management best practices
class UsageLimitManager:
    """Usage limit manager"""
    
    def __init__(self):
        self.repository = TokenUsageRepository()
        self.limit_cache = {}
        self.cache_ttl = 300  # 5 minutes
    
    async def check_and_enforce_limit(self, user_id: str, requested_tokens: int) -> bool:
        """Check and enforce usage limit"""
        # Check cache
        cache_key = f"limit_{user_id}"
        if cache_key in self.limit_cache:
            cached_data = self.limit_cache[cache_key]
            if time.time() - cached_data["timestamp"] < self.cache_ttl:
                limit_info = cached_data["data"]
            else:
                del self.limit_cache[cache_key]
                limit_info = await self.repository.check_usage_limit(user_id)
        else:
            limit_info = await self.repository.check_usage_limit(user_id)
            self.limit_cache[cache_key] = {
                "data": limit_info,
                "timestamp": time.time()
            }
        
        # Check if limit exceeded
        if limit_info["exceeded"]:
            return False
        
        # Check if request would exceed limit
        if limit_info["limit"] > 0:
            if limit_info["current_usage"] + requested_tokens > limit_info["limit"]:
                return False
        
        return True
    
    async def set_user_limit(self, user_id: str, limit: int, cycle_start_date: str = None):
        """Set user limit"""
        await self.repository.set_usage_limit(user_id, limit, cycle_start_date)
        
        # Clear cache
        cache_key = f"limit_{user_id}"
        if cache_key in self.limit_cache:
            del self.limit_cache[cache_key]

D.3 Monitoring and Alerting Best Practices

# Monitoring and alerting best practices
class TokenUsageMonitor:
    """Token usage monitor"""
    
    def __init__(self):
        self.repository = TokenUsageRepository()
        self.alert_thresholds = {
            "high_usage": 0.8,  # 80% usage rate alert
            "critical_usage": 0.95,  # 95% usage rate critical alert
            "limit_exceeded": 1.0  # 100% usage rate exceeded alert
        }
    
    async def check_usage_alerts(self, user_id: str) -> List[Dict]:
        """Check usage alerts"""
        alerts = []
        
        try:
            limit_check = await self.repository.check_usage_limit(user_id)
            
            if limit_check["limit"] == 0:
                return alerts  # No limit, don't check alerts
            
            usage_rate = limit_check["current_usage"] / limit_check["limit"]
            
            if usage_rate >= self.alert_thresholds["limit_exceeded"]:
                alerts.append({
                    "level": "critical",
                    "message": f"User {user_id} has exceeded usage limit",
                    "usage_rate": usage_rate,
                    "current_usage": limit_check["current_usage"],
                    "limit": limit_check["limit"]
                })
            elif usage_rate >= self.alert_thresholds["critical_usage"]:
                alerts.append({
                    "level": "warning",
                    "message": f"User {user_id} usage approaching limit",
                    "usage_rate": usage_rate,
                    "remaining": limit_check["remaining"]
                })
            elif usage_rate >= self.alert_thresholds["high_usage"]:
                alerts.append({
                    "level": "info",
                    "message": f"User {user_id} has high usage",
                    "usage_rate": usage_rate,
                    "remaining": limit_check["remaining"]
                })
            
        except Exception as e:
            logger.error(f"Failed to check usage alerts: {e}")
            alerts.append({
                "level": "error",
                "message": f"Error checking usage alerts for user {user_id}: {e}"
            })
        
        return alerts