LLM Custom Callback Handlers Technical Documentation
1. Overview
Purpose: The custom_callbacks.py module is a core component in the AIECS system responsible for managing the lifecycle of LLM (Large Language Model) calls. Through asynchronous callback mechanisms, this module implements fine-grained monitoring of LLM call processes, token usage statistics, performance metrics collection, and error handling.
Core Value:
Precise Token Usage Statistics: Automatically records user prompt tokens and completion tokens usage
Performance Monitoring: Real-time monitoring of LLM call response times and success rates
Cost Control: Achieves precise cost accounting and usage limits through detailed token statistics
Extensibility: Provides composite callback handlers supporting parallel execution of multiple monitoring strategies
Fault Tolerance: Ensures callback processing failures do not affect main business processes
2. Problem Background & Design Motivation
2.1 Business Pain Points
During AIECS system development, we face the following key challenges:
Cost Control Challenges: LLM API calls are expensive, requiring precise statistics for each user’s token usage
Missing Performance Monitoring: Lack of real-time monitoring of LLM call performance, making it difficult to identify performance bottlenecks
Usage Limit Requirements: Need to set different token usage limits for different users
Multi-Dimensional Statistics: Need to distinguish prompt tokens and completion tokens for fine-grained analysis
System Observability: Need complete call chain tracking and error monitoring
2.2 Design Motivation
Based on the above pain points, we designed a monitoring system based on callback patterns:
Decoupled Design: Separates monitoring logic from business logic, avoiding code coupling
Asynchronous Processing: Uses asynchronous callbacks to ensure monitoring operations do not affect main business performance
Flexible Extension: Supports flexible combination of multiple monitoring strategies through composite pattern
Data Persistence: Integrates Redis for high-performance statistics data storage
3. Architecture Positioning & Context
3.1 System Architecture Diagram
graph TB
A[LLM Client] --> B[Custom Callback Handler]
B --> C[Token Usage Repository]
C --> D[Redis Storage]
B --> E[Performance Monitoring]
B --> F[Error Handling]
B --> G[Logging System]
H[Composite Handler] --> I[Basic Token Handler]
H --> J[Detailed Token Handler]
H --> K[Custom Handler]
L[Business Logic] --> A
M[User Management] --> C
N[Cost Control] --> C
3.2 Upstream and Downstream Dependencies
Upstream Callers:
BaseLLMClientand its subclasses (OpenAI, Anthropic, etc.)Business service layer (chat services, document processing services, etc.)
Downstream Dependencies:
TokenUsageRepository: Responsible for Redis data storageBaseCallback: Provides abstract callback interfaceRedis Client: Data persistence layer
Peer Components:
LLMMessageandLLMResponse: Data transfer objectsLogging system: For monitoring and debugging
3.3 Data Flow
sequenceDiagram
participant BL as Business Logic
participant LC as LLM Client
participant CB as Callback Handler
participant TR as Token Repository
participant RD as Redis
BL->>LC: generate_text(messages)
LC->>CB: on_llm_start(messages)
CB->>CB: Record start time
CB->>LC: Continue processing
LC->>LC: Call LLM API
LC->>CB: on_llm_end(response)
CB->>TR: increment_usage(tokens)
TR->>RD: Store token data
CB->>LC: Return to client
LC->>BL: Return response
4. Core Features & Use Cases
4.1 Basic Token Statistics Callback Handler
Function Description: Records total token usage of LLM calls, suitable for simple usage statistics scenarios.
Core Features:
Automatically records call start and end times
Statistics total token usage
Integrated Redis persistent storage
Exception fault tolerance handling
Usage Scenarios:
# Basic usage statistics
from aiecs.llm.custom_callbacks import create_token_callback
# Create callback handler
callback = create_token_callback(
user_id="user_123",
cycle_start_date="2024-01-01"
)
# Use in LLM client
async with LLMClient() as client:
response = await client.generate_text(
messages=messages,
callbacks=[callback]
)
Real-World Application Cases:
User Usage Limits: Set monthly token limits for each user
Cost Accounting: Calculate API call costs for different users
Usage Analysis: Generate user usage reports and trend analysis
4.2 Detailed Token Statistics Callback Handler
Function Description: Separately records prompt tokens and completion tokens, providing more fine-grained usage analysis.
Core Features:
Intelligent token estimation algorithm
Separate statistics for prompt and completion tokens
Support for multiple token data sources
Automatic data validation and correction
Usage Scenarios:
# Detailed usage statistics
from aiecs.llm.custom_callbacks import create_detailed_token_callback
# Create detailed statistics callback
detailed_callback = create_detailed_token_callback(
user_id="user_123",
cycle_start_date="2024-01-01"
)
# Combine multiple callbacks
composite_callback = create_composite_callback(
detailed_callback,
performance_callback,
error_callback
)
Real-World Application Cases:
Cost Optimization: Analyze cost ratio of prompt and completion
Model Selection: Choose optimal model based on token usage patterns
Performance Tuning: Optimize prompt length to reduce unnecessary token consumption
4.3 Composite Callback Handler
Function Description: Supports simultaneous execution of multiple callback handlers, implementing complex monitoring strategies.
Core Features:
Parallel execution of multiple callbacks
Independent exception handling
Dynamic add/remove handlers
Unified execution interface
Usage Scenarios:
# Create composite monitoring strategy
from aiecs.llm.custom_callbacks import (
create_token_callback,
create_detailed_token_callback,
create_composite_callback
)
# Build complete monitoring system
monitoring_callbacks = create_composite_callback(
create_token_callback("user_123"),
create_detailed_token_callback("user_123"),
custom_performance_callback(),
custom_alert_callback()
)
# Use in business code
async def process_user_query(user_id: str, query: str):
messages = [{"role": "user", "content": query}]
async with LLMClient() as client:
response = await client.generate_text(
messages=messages,
callbacks=[monitoring_callbacks]
)
return response
Real-World Application Cases:
End-to-End Monitoring: Simultaneously perform usage statistics, performance monitoring, and error tracking
Multi-Tenant Management: Configure different monitoring strategies for different tenants
A/B Testing: Compare performance under different configurations
5. API Reference
5.1 RedisTokenCallbackHandler
Constructor
def __init__(self, user_id: str, cycle_start_date: Optional[str] = None)
Parameters:
user_id(str, required): User unique identifiercycle_start_date(str, optional): Billing cycle start date in “YYYY-MM-DD” format, defaults to current month
Exceptions:
ValueError: Raised when user_id is empty
Methods
on_llm_start
async def on_llm_start(self, messages: List[dict], **kwargs: Any) -> None
Function: Triggered when LLM call starts Parameters:
messages(List[dict]): Message list, each message contains ‘role’ and ‘content’ keys**kwargs: Additional parameters (e.g., provider, model, etc.)
on_llm_end
async def on_llm_end(self, response: dict, **kwargs: Any) -> None
Function: Triggered when LLM call successfully ends Parameters:
response(dict): Response dictionary containing ‘content’, ‘tokens_used’, ‘model’ keys**kwargs: Additional parameters
Processing Logic:
Calculate call duration
Extract token usage
Call repository to record data
Log record
on_llm_error
async def on_llm_error(self, error: Exception, **kwargs: Any) -> None
Function: Triggered when LLM call encounters an error Parameters:
error(Exception): The exception that occurred**kwargs: Additional parameters
5.2 DetailedRedisTokenCallbackHandler
Constructor
def __init__(self, user_id: str, cycle_start_date: Optional[str] = None)
Parameters: Same as RedisTokenCallbackHandler
Methods
on_llm_start
async def on_llm_start(self, messages: List[dict], **kwargs: Any) -> None
Function: Record start time and estimate prompt tokens
Special Processing: Calls _estimate_prompt_tokens method to estimate input token count
on_llm_end
async def on_llm_end(self, response: dict, **kwargs: Any) -> None
Function: Record detailed token usage Special Processing:
Calls
_extract_detailed_tokensto extract detailed token informationUses
increment_detailed_usagemethod to record data
_estimate_prompt_tokens
def _estimate_prompt_tokens(self, messages: List[dict]) -> int
Function: Estimate token count of input messages Algorithm: Uses rough estimation of 4 characters ≈ 1 token Returns: Estimated token count
_extract_detailed_tokens
def _extract_detailed_tokens(self, response: dict) -> tuple[int, int]
Function: Extract detailed token information from response
Returns: (prompt_tokens, completion_tokens) tuple
Processing Strategy:
Prioritize using detailed token information from response
If only total is available, use estimated prompt tokens to calculate completion tokens
If no information is available, estimate based on content length
5.3 CompositeCallbackHandler
Constructor
def __init__(self, handlers: List[CustomAsyncCallbackHandler])
Parameters:
handlers(List[CustomAsyncCallbackHandler]): Callback handler list
Methods
add_handler
def add_handler(self, handler: CustomAsyncCallbackHandler)
Function: Dynamically add callback handler Parameters:
handler(CustomAsyncCallbackHandler): Handler to add
on_llm_start
async def on_llm_start(self, messages: List[dict], **kwargs: Any) -> None
Function: Execute all handlers’ start callbacks in parallel Exception Handling: Failure of a single handler does not affect other handlers
on_llm_end
async def on_llm_end(self, response: dict, **kwargs: Any) -> None
Function: Execute all handlers’ end callbacks in parallel
on_llm_error
async def on_llm_error(self, error: Exception, **kwargs: Any) -> None
Function: Execute all handlers’ error callbacks in parallel
5.4 Convenience Functions
create_token_callback
def create_token_callback(user_id: str, cycle_start_date: Optional[str] = None) -> RedisTokenCallbackHandler
Function: Create basic token statistics callback handler
Returns: RedisTokenCallbackHandler instance
create_detailed_token_callback
def create_detailed_token_callback(user_id: str, cycle_start_date: Optional[str] = None) -> DetailedRedisTokenCallbackHandler
Function: Create detailed token statistics callback handler
Returns: DetailedRedisTokenCallbackHandler instance
create_composite_callback
def create_composite_callback(*handlers: CustomAsyncCallbackHandler) -> CompositeCallbackHandler
Function: Create composite callback handler
Parameters: Variable number of callback handlers
Returns: CompositeCallbackHandler instance
6. Technical Implementation Details
6.1 Asynchronous Processing Mechanism
Design Principles:
All callback methods are asynchronous, avoiding blocking main business threads
Use
async/awaitsyntax to ensure non-blocking executionException handling does not affect main business processes
Implementation Details:
async def on_llm_end(self, response: dict, **kwargs: Any) -> None:
try:
# Asynchronously execute token recording
await token_usage_repo.increment_total_usage(
self.user_id,
tokens_used,
self.cycle_start_date
)
except Exception as e:
# Log error but don't re-raise, avoid affecting main process
logger.error(f"Failed to record token usage: {e}")
6.2 Token Estimation Algorithm
Basic Estimation:
Uses rough estimation of 4 characters ≈ 1 token
Suitable for quick estimation of English text
May not be accurate for languages like Chinese
Improvement Strategy:
def _estimate_prompt_tokens(self, messages: List[dict]) -> int:
total_chars = sum(len(msg.get('content', '')) for msg in messages)
# Can adjust estimation ratio based on language type
return total_chars // 4
Future Optimization Directions:
Integrate professional token calculation libraries like tiktoken
Support multi-language token estimation
Dynamically adjust estimation parameters based on historical data
6.3 Error Handling Strategy
Layered Error Handling:
Method Level: Each callback method internally catches exceptions
Handler Level: CompositeCallbackHandler provides independent error handling for each sub-handler
System Level: All errors are recorded through logging system
Fault Tolerance Mechanism:
async def on_llm_end(self, response: dict, **kwargs: Any) -> None:
for handler in self.handlers:
try:
await handler.on_llm_end(response, **kwargs)
except Exception as e:
# Log error but continue executing other handlers
logger.error(f"Error in callback handler {type(handler).__name__}: {e}")
6.4 Performance Optimization
Batch Operations:
Use Redis pipeline for batch updates
Reduce network round trips
Improve data write efficiency
Memory Management:
Clean up temporary variables promptly
Avoid storing large amounts of data in callbacks
Use weak references to avoid circular references
Concurrency Control:
Use async locks to avoid race conditions
Reasonably control concurrent callback count
Implement backpressure mechanism to prevent memory overflow
6.5 Data Consistency
Atomic Operations:
Use Redis HINCRBY command to ensure atomicity
Avoid data inconsistency caused by concurrent updates
Transaction Processing:
# Use pipeline to ensure transactional
pipe = client.pipeline()
for field, value in updates.items():
pipe.hincrby(redis_key, field, value)
await pipe.execute()
7. Configuration & Deployment
7.1 Environment Variable Configuration
Required Configuration:
# Redis connection configuration
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=your_password
REDIS_DB=0
# Logging configuration
LOG_LEVEL=INFO
LOG_FORMAT=json
Optional Configuration:
# Token estimation parameters
TOKEN_ESTIMATION_RATIO=4 # Character to token ratio
# Performance monitoring configuration
ENABLE_PERFORMANCE_MONITORING=true
PERFORMANCE_LOG_THRESHOLD=1000 # milliseconds
# Error retry configuration
CALLBACK_RETRY_ATTEMPTS=3
CALLBACK_RETRY_DELAY=100 # milliseconds
7.2 Dependency Management
Core Dependencies:
# requirements.txt
redis>=4.5.0
asyncio-mqtt>=0.11.0
Development Dependencies:
# requirements-dev.txt
pytest>=7.0.0
pytest-asyncio>=0.21.0
pytest-mock>=3.10.0
7.3 Deployment Configuration
Docker Configuration:
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "-m", "aiecs.llm.custom_callbacks"]
Kubernetes Configuration:
apiVersion: apps/v1
kind: Deployment
metadata:
name: aiecs-callbacks
spec:
replicas: 3
selector:
matchLabels:
app: aiecs-callbacks
template:
metadata:
labels:
app: aiecs-callbacks
spec:
containers:
- name: callbacks
image: aiecs/callbacks:latest
env:
- name: REDIS_HOST
value: "redis-service"
- name: REDIS_PORT
value: "6379"
7.4 Monitoring Configuration
Prometheus Metrics:
from prometheus_client import Counter, Histogram
# Define monitoring metrics
token_usage_total = Counter('token_usage_total', 'Total token usage', ['user_id', 'type'])
callback_duration = Histogram('callback_duration_seconds', 'Callback execution time')
callback_errors = Counter('callback_errors_total', 'Callback errors', ['handler_type'])
Health Check:
async def health_check():
"""Check callback handler health status"""
try:
# Check Redis connection
redis_client = await get_redis_client()
await redis_client.ping()
# Check handler status
return {"status": "healthy", "timestamp": time.time()}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
8. Maintenance & Troubleshooting
8.1 Monitoring Metrics
Key Metrics:
Token usage trends
Callback execution time
Error rate and exception types
Redis connection status
Memory usage
Monitoring Dashboard:
# Grafana query examples
# Token usage trends
sum(rate(token_usage_total[5m])) by (user_id)
# Callback execution time
histogram_quantile(0.95, rate(callback_duration_seconds_bucket[5m]))
# Error rate
rate(callback_errors_total[5m])
8.2 Common Issues and Solutions
8.2.1 Redis Connection Failure
Symptoms:
“Failed to connect to Redis” errors in logs
Token data cannot be recorded
Callback handler exits abnormally
Troubleshooting Steps:
Check Redis service status:
redis-cli pingVerify network connection:
telnet redis_host 6379Check authentication: Verify username and password
View Redis logs:
tail -f /var/log/redis/redis.log
Solution:
# Add connection retry mechanism
async def get_redis_client_with_retry(max_retries=3):
for attempt in range(max_retries):
try:
return await get_redis_client()
except Exception as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt) # Exponential backoff
8.2.2 Token Data Inconsistency
Symptoms:
Statistics token count does not match actual usage
Query results inconsistent at different times
Data shows negative values
Troubleshooting Steps:
Check concurrent updates: See if multiple processes are updating simultaneously
Verify data integrity: Check raw data in Redis
Analyze logs: Find abnormal callback execution records
Solution:
# Add data validation
async def validate_token_data(user_id: str, tokens: int):
if tokens < 0:
logger.warning(f"Negative token count detected for user {user_id}: {tokens}")
return 0
return tokens
8.2.3 Performance Issues
Symptoms:
Callback execution time too long
System response slows down
Memory usage continuously grows
Troubleshooting Steps:
Analyze callback execution time: Use performance profiling tools
Check memory leaks: Monitor memory usage trends
Optimize Redis operations: Reduce unnecessary queries
Solution:
# Add performance monitoring
import time
from functools import wraps
def monitor_performance(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
return result
finally:
duration = time.time() - start_time
if duration > 1.0: # Log warning if exceeds 1 second
logger.warning(f"Slow callback execution: {func.__name__} took {duration:.2f}s")
return wrapper
8.3 Log Analysis
Log Level Configuration:
import logging
# Configure callback handler logger
callback_logger = logging.getLogger('aiecs.callbacks')
callback_logger.setLevel(logging.INFO)
# Add file handler
file_handler = logging.FileHandler('/var/log/aiecs/callbacks.log')
file_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
callback_logger.addHandler(file_handler)
Key Log Patterns:
# Find error logs
grep "ERROR" /var/log/aiecs/callbacks.log | tail -100
# Analyze performance issues
grep "Slow callback" /var/log/aiecs/callbacks.log
# Monitor token usage
grep "Recorded.*tokens" /var/log/aiecs/callbacks.log | tail -50
8.4 Data Backup and Recovery
Backup Strategy:
# Redis data backup
redis-cli --rdb /backup/redis-$(date +%Y%m%d).rdb
# Regular backup script
#!/bin/bash
DATE=$(date +%Y%m%d_%H%M%S)
redis-cli --rdb /backup/redis-$DATE.rdb
gzip /backup/redis-$DATE.rdb
Recovery Process:
# Stop Redis service
systemctl stop redis
# Restore data
gunzip /backup/redis-20240101_120000.rdb.gz
cp /backup/redis-20240101_120000.rdb /var/lib/redis/dump.rdb
# Start Redis service
systemctl start redis
9. Visualizations
9.1 System Architecture Diagram
graph TB
subgraph "Business Layer"
A[Chat Service] --> B[Document Processing Service]
B --> C[Other Business Services]
end
subgraph "LLM Client Layer"
D[OpenAI Client] --> E[Anthropic Client]
E --> F[Other LLM Clients]
end
subgraph "Callback Processing Layer"
G[RedisTokenCallbackHandler] --> H[DetailedRedisTokenCallbackHandler]
H --> I[CompositeCallbackHandler]
I --> J[Custom Callback Handlers]
end
subgraph "Data Storage Layer"
K[Token Usage Repository] --> L[Redis Cluster]
L --> M[Monitoring Data]
end
subgraph "Monitoring Layer"
N[Prometheus] --> O[Grafana]
O --> P[Alert System]
end
A --> D
B --> E
C --> F
D --> G
E --> H
F --> I
G --> K
H --> K
I --> K
J --> K
K --> N
L --> N
9.2 Data Flow Diagram
sequenceDiagram
participant U as User
participant BS as Business Service
participant LC as LLM Client
participant CB as Callback Handler
participant TR as Token Repository
participant RD as Redis
participant MT as Monitoring System
U->>BS: Send request
BS->>LC: Call generate_text
LC->>CB: on_llm_start
CB->>CB: Record start time
CB->>LC: Continue processing
LC->>LC: Call LLM API
LC->>CB: on_llm_end
CB->>TR: increment_usage
TR->>RD: Store token data
CB->>MT: Send monitoring data
CB->>LC: Return result
LC->>BS: Return response
BS->>U: Return result
9.3 Token Usage Trend Chart
xychart-beta
title "Token Usage Trend"
x-axis ["Jan", "Feb", "Mar", "Apr", "May", "Jun"]
y-axis "Token Count" 0 --> 100000
bar [10000, 15000, 12000, 18000, 22000, 25000]
line [8000, 12000, 10000, 15000, 20000, 23000]
9.4 Error Rate Monitoring Chart
pie title "Callback Handler Error Distribution"
"Redis Connection Error" : 45
"Token Data Anomaly" : 25
"Timeout Error" : 15
"Other Errors" : 15
10. Version History
v1.0.0 (2024-01-15)
New Features:
Implement basic
RedisTokenCallbackHandlerSupport total token usage statistics
Integrate Redis data storage
Add asynchronous callback mechanism
Technical Features:
Based on
CustomAsyncCallbackHandlerabstract base classUse Redis HINCRBY to ensure atomic operations
Complete error handling and logging
v1.1.0 (2024-02-01)
New Features:
Implement
DetailedRedisTokenCallbackHandlerSupport separate statistics for prompt and completion tokens
Add intelligent token estimation algorithm
Implement
CompositeCallbackHandlercomposite pattern
Performance Optimizations:
Use Redis pipeline for batch operations
Optimize memory usage and concurrent processing
Add performance monitoring metrics
v1.2.0 (2024-03-01)
New Features:
Add convenience functions to simplify usage
Support custom billing cycles
Implement data validation and correction mechanism
Add health check interface
Improvements:
Enhance error handling and retry mechanisms
Optimize log format and monitoring metrics
Add complete unit test coverage
v1.3.0 (2024-04-01) [Planned]
Planned Features:
Integrate tiktoken for precise token calculation
Support multi-language token estimation
Add cost prediction and usage alerts
Implement distributed locks to avoid concurrent conflicts
Performance Goals:
Callback execution time < 100ms
Support 1000+ concurrent users
99.9% availability guarantee
Appendix
B. Example Code Repositories
C. Technical Support
Technical Documentation: https://docs.aiecs.com
Issue Reporting: https://github.com/aiecs/issues
Community Discussion: https://discord.gg/aiecs