Base Callback Handler Technical Documentation
1. Overview
Purpose: base_callback.py is the core abstraction layer of the LLM (Large Language Model) callback handling mechanism in the AIECS system. This module defines unified asynchronous callback interfaces, providing standardized extension points for LLM call lifecycle management across the entire system.
Core Value:
Unified Interface Specification: Provides standardized abstract interfaces for all LLM callback handlers
Decoupled Design: Completely separates callback logic from business logic, improving code maintainability
Asynchronous Support: Based on modern Python asynchronous programming models, ensuring high performance and scalability
Type Safety: Uses generic data structures to avoid circular dependencies, maintaining architectural clarity
Extensibility: Provides flexible extension foundation for custom callback handlers
2. Problem Background & Design Motivation
2.1 Business Pain Points
During AIECS system development, we face the following key challenges:
Missing Callback Mechanism: Lack of unified LLM call lifecycle management mechanism
Insufficient Monitoring Capabilities: Unable to perform fine-grained monitoring and statistics on LLM call processes
Severe Code Coupling: Business logic mixed with monitoring logic, difficult to maintain and extend
Complex Type Dependencies: Direct use of specific LLM types causes circular dependency issues
Asynchronous Processing Difficulties: Lack of standardized asynchronous callback processing patterns
2.2 Design Motivation
Based on the above pain points, we designed a callback mechanism based on abstract base classes:
Interface Standardization: Define unified callback interfaces through abstract base classes, ensuring consistency of all implementations
Type Decoupling: Use generic dictionary types to avoid circular dependencies with specific LLM implementations
Asynchronous First: All callback methods are asynchronous, ensuring non-blocking main business flow
Extension Friendly: Support various custom callback handlers through inheritance mechanism
Clear Architecture: Abstract callback logic as an independent layer, maintaining system architecture clarity
3. Architecture Positioning & Context
3.1 System Architecture Diagram
graph TB
subgraph "Business Layer"
A[Chat Service] --> B[Document Processing Service]
B --> C[Task Execution Service]
end
subgraph "LLM Client Layer"
D[OpenAI Client] --> E[Vertex AI Client]
E --> F[XAI Client]
F --> G[Other LLM Client]
end
subgraph "Callback Processing Layer"
H[Base Callback Handler] --> I[Token Statistics Callback]
H --> J[Performance Monitoring Callback]
H --> K[Error Handling Callback]
H --> L[Custom Callback]
end
subgraph "Data Storage Layer"
M[Token Repository] --> N[Redis]
O[Monitoring Data] --> P[Prometheus]
end
A --> D
B --> E
C --> F
D --> H
E --> H
F --> H
G --> H
I --> M
J --> O
K --> O
L --> M
3.2 Upstream and Downstream Dependencies
Upstream Callers:
BaseLLMClientand its subclasses (OpenAI, Vertex AI, XAI, etc.)LLMClientManager: Advanced LLM operation managerBusiness service layer: Chat service, document processing service, etc.
Downstream Dependencies:
Specific callback handler implementations (Token statistics, performance monitoring, etc.)
Data storage layer (Redis, database, etc.)
Monitoring systems (Prometheus, Grafana, etc.)
Peer Components:
LLMMessageandLLMResponse: Data transfer objectsLogging system: For debugging and monitoring
3.3 Data Flow
sequenceDiagram
participant BL as Business Logic
participant LC as LLM Client
participant CB as Callback Handler
participant DS as Data Storage
participant MT as Monitoring System
BL->>LC: Initiate LLM Call
LC->>CB: on_llm_start(messages)
CB->>CB: Record Start State
CB->>LC: Continue Processing
LC->>LC: Execute LLM API Call
alt Call Success
LC->>CB: on_llm_end(response)
CB->>DS: Store Statistics
CB->>MT: Send Monitoring Data
CB->>LC: Return Success
LC->>BL: Return Response
else Call Failure
LC->>CB: on_llm_error(error)
CB->>MT: Record Error Information
CB->>LC: Return Error
LC->>BL: Raise Exception
end
4. Core Features & Use Cases
4.1 Abstract Callback Interface Definition
Function Description: Provides standardized asynchronous callback interfaces, defining three key nodes in the LLM call lifecycle.
Core Features:
Design pattern based on ABC (Abstract Base Class)
Asynchronous method definitions, supporting modern Python asynchronous programming
Generic data structures, avoiding type dependency issues
Clear interface contracts, ensuring implementation consistency
Use Cases:
# Basic callback handler implementation
from aiecs.utils.base_callback import CustomAsyncCallbackHandler
class MyCustomCallback(CustomAsyncCallbackHandler):
async def on_llm_start(self, messages: List[dict], **kwargs: Any) -> None:
"""Execute when LLM call starts"""
print(f"Starting to process {len(messages)} messages")
async def on_llm_end(self, response: dict, **kwargs: Any) -> None:
"""Execute when LLM call successfully ends"""
print(f"Call successful, generated {len(response.get('content', ''))} characters")
async def on_llm_error(self, error: Exception, **kwargs: Any) -> None:
"""Execute when LLM call encounters error"""
print(f"Call failed: {error}")
Real-world Application Cases:
Token Statistics: Record user token usage
Performance Monitoring: Measure LLM call response time
Error Tracking: Record and analyze reasons for call failures
Cost Control: Calculate costs based on usage
4.2 Lifecycle Management
Function Description: Manage the complete lifecycle of LLM calls through three key callback methods.
Core Features:
Start Phase:
on_llm_start- Record call start stateSuccess Phase:
on_llm_end- Handle successful resultsFailure Phase:
on_llm_error- Handle exception cases
Use Cases:
# Complete lifecycle management example
class ComprehensiveCallback(CustomAsyncCallbackHandler):
def __init__(self):
self.start_time = None
self.call_id = None
async def on_llm_start(self, messages: List[dict], **kwargs: Any) -> None:
"""Record call start"""
import time
import uuid
self.start_time = time.time()
self.call_id = str(uuid.uuid4())
logger.info(f"LLM call started [ID: {self.call_id}]")
logger.info(f"Message count: {len(messages)}")
async def on_llm_end(self, response: dict, **kwargs: Any) -> None:
"""Handle successful result"""
if self.start_time:
duration = time.time() - self.start_time
logger.info(f"LLM call successful [ID: {self.call_id}] Duration: {duration:.2f}s")
# Record response information
content_length = len(response.get('content', ''))
tokens_used = response.get('tokens_used', 0)
logger.info(f"Generated content length: {content_length}, Token usage: {tokens_used}")
async def on_llm_error(self, error: Exception, **kwargs: Any) -> None:
"""Handle error case"""
if self.start_time:
duration = time.time() - self.start_time
logger.error(f"LLM call failed [ID: {self.call_id}] Duration: {duration:.2f}s")
logger.error(f"Error details: {type(error).__name__}: {error}")
Real-world Application Cases:
Audit Logging: Record detailed information of all LLM calls
Performance Analysis: Analyze call performance under different scenarios
Fault Diagnosis: Quickly locate and resolve call issues
Usage Statistics: Generate detailed usage reports
4.3 Type Safety Design
Function Description: Use generic data structures to avoid circular dependencies, maintaining architectural clarity.
Core Features:
Use
Dict[str, Any]instead of specific LLM typesAvoid circular dependencies with specific implementations
Maintain interface generality and extensibility
Support arbitrary additional parameter passing
Use Cases:
# Type-safe data processing
class TypeSafeCallback(CustomAsyncCallbackHandler):
async def on_llm_start(self, messages: List[dict], **kwargs: Any) -> None:
"""Safely process message data"""
# Validate message format
for i, msg in enumerate(messages):
if not isinstance(msg, dict):
logger.warning(f"Message {i} is not in dict format")
continue
if 'role' not in msg or 'content' not in msg:
logger.warning(f"Message {i} missing required fields")
continue
# Safely access data
role = msg.get('role', 'unknown')
content = msg.get('content', '')
logger.info(f"Message {i}: {role} - {content[:50]}...")
async def on_llm_end(self, response: dict, **kwargs: Any) -> None:
"""Safely process response data"""
# Extract key information
content = response.get('content', '')
provider = response.get('provider', 'unknown')
model = response.get('model', 'unknown')
tokens = response.get('tokens_used', 0)
# Validate data validity
if not content:
logger.warning("Response content is empty")
if tokens < 0:
logger.warning(f"Abnormal token count: {tokens}")
logger.info(f"Response from {provider}/{model}, using {tokens} tokens")
Real-world Application Cases:
Data Validation: Ensure callback receives correct data format
Error Handling: Gracefully handle abnormal data
Compatibility: Support data formats from different LLM providers
Extensibility: Easily add new data fields
5. API Reference
5.1 CustomAsyncCallbackHandler
Class Definition
class CustomAsyncCallbackHandler(ABC):
"""
Abstract base class for asynchronous callback handlers
This is an abstract base class that defines callback interfaces for LLM calls.
All specific callback handlers should inherit from this class and implement its abstract methods.
Uses generic data structures (Dict[str, Any]) instead of specific LLM types
to avoid circular import issues and maintain clear architecture.
"""
Abstract Methods
on_llm_start
@abstractmethod
async def on_llm_start(self, messages: List[dict], **kwargs: Any) -> None:
"""
Callback triggered when LLM call starts
Args:
messages: List of message dictionaries, each containing 'role' and 'content' keys
**kwargs: Additional parameters such as provider, model, etc.
"""
pass
Function: Execute when LLM call starts Parameters:
messages(List[dict]): Input message list, each message contains:role(str): Message role (e.g., “user”, “assistant”, “system”)content(str): Message content
**kwargs(Any): Additional parameters, may include:provider(str): LLM provider namemodel(str): Model name usedtemperature(float): Sampling temperaturemax_tokens(int): Maximum token count
Usage Example:
async def on_llm_start(self, messages: List[dict], **kwargs: Any) -> None:
# Record call start time
self.start_time = time.time()
# Extract key information
provider = kwargs.get('provider', 'unknown')
model = kwargs.get('model', 'unknown')
# Record logs
logger.info(f"Starting LLM call: {provider}/{model}")
logger.info(f"Message count: {len(messages)}")
on_llm_end
@abstractmethod
async def on_llm_end(self, response: dict, **kwargs: Any) -> None:
"""
Callback triggered when LLM call successfully ends
Args:
response: Response dictionary containing 'content', 'tokens_used', 'model', etc.
**kwargs: Additional parameters such as provider, model, etc.
"""
pass
Function: Execute when LLM call successfully ends Parameters:
response(dict): Response dictionary containing:content(str): Generated text contenttokens_used(int): Total tokens usedprompt_tokens(int): Input token count (optional)completion_tokens(int): Output token count (optional)provider(str): Provider namemodel(str): Model nameresponse_time(float): Response time (seconds)cost_estimate(float): Cost estimate (optional)
**kwargs(Any): Additional parameters
Usage Example:
async def on_llm_end(self, response: dict, **kwargs: Any) -> None:
# Calculate call duration
if hasattr(self, 'start_time'):
duration = time.time() - self.start_time
logger.info(f"LLM call completed, Duration: {duration:.2f}s")
# Extract response information
content = response.get('content', '')
tokens = response.get('tokens_used', 0)
provider = response.get('provider', 'unknown')
model = response.get('model', 'unknown')
# Record statistics
logger.info(f"Generated content length: {len(content)} characters")
logger.info(f"Tokens used: {tokens}")
logger.info(f"Call source: {provider}/{model}")
on_llm_error
@abstractmethod
async def on_llm_error(self, error: Exception, **kwargs: Any) -> None:
"""
Callback triggered when LLM call encounters an error
Args:
error: Exception object that occurred during LLM call
**kwargs: Additional parameters such as provider, model, etc.
"""
pass
Function: Execute when LLM call encounters an error Parameters:
error(Exception): Exception object that occurred**kwargs(Any): Additional parameters
Usage Example:
async def on_llm_error(self, error: Exception, **kwargs: Any) -> None:
# Calculate duration before failure
if hasattr(self, 'start_time'):
duration = time.time() - self.start_time
logger.error(f"LLM call failed, Duration: {duration:.2f}s")
# Extract context information
provider = kwargs.get('provider', 'unknown')
model = kwargs.get('model', 'unknown')
# Record error details
logger.error(f"Call failed: {provider}/{model}")
logger.error(f"Error type: {type(error).__name__}")
logger.error(f"Error message: {str(error)}")
# Handle differently based on error type
if isinstance(error, TimeoutError):
logger.error("Call timeout, may need retry")
elif isinstance(error, ConnectionError):
logger.error("Network connection issue")
else:
logger.error("Unknown error type")
6. Technical Implementation Details
6.1 Abstract Base Class Design Pattern
Design Principles:
Use Python’s
ABC(Abstract Base Class) to ensure interface contractsForce subclasses to implement necessary methods through
@abstractmethoddecoratorProvide clear docstrings explaining the purpose of each method
Implementation Details:
from abc import ABC, abstractmethod
from typing import Any, List
class CustomAsyncCallbackHandler(ABC):
"""
Abstract base class design
- Inherit ABC to ensure cannot be directly instantiated
- Use abstractmethod to force implementation
- Provide complete type annotations
"""
@abstractmethod
async def on_llm_start(self, messages: List[dict], **kwargs: Any) -> None:
"""Method that must be implemented by subclasses"""
pass
Advantages:
Type Safety: Compile-time checks ensure all methods are implemented
Clear Documentation: Provide clear interface documentation through abstract methods
IDE Support: Modern IDEs can provide better code completion and error checking
6.2 Asynchronous Programming Support
Design Principles:
All callback methods are asynchronous, avoiding blocking the main thread
Use
async/awaitsyntax to ensure non-blocking executionSupport concurrent execution of multiple callback handlers
Implementation Details:
# Asynchronous method definition
async def on_llm_start(self, messages: List[dict], **kwargs: Any) -> None:
"""Asynchronous callback method"""
# Can perform asynchronous operations
await self._async_operation()
async def _async_operation(self):
"""Internal asynchronous operation"""
# For example: asynchronous database operations
await database.save_log(...)
# For example: asynchronous network requests
await http_client.post(...)
Performance Advantages:
Non-blocking: Callback execution does not block main business flow
Concurrency Support: Can execute multiple callback handlers simultaneously
Resource Efficiency: Fully utilize advantages of asynchronous I/O
6.3 Type Safety Design
Design Principles:
Use generic data structures to avoid circular dependencies
Provide complete type annotations
Support arbitrary additional parameters
Implementation Details:
from typing import Any, List
# Use generic types to avoid circular dependencies
async def on_llm_start(self, messages: List[dict], **kwargs: Any) -> None:
"""
Type-safe design
- List[dict]: Generic dictionary list, avoiding specific type dependencies
- **kwargs: Support arbitrary additional parameters
- Any: Flexible return types
"""
# Safely access data
for msg in messages:
role = msg.get('role', 'unknown') # Safe access
content = msg.get('content', '') # Provide default value
Advantages:
Decoupled Design: Avoid circular dependencies with specific LLM implementations
Flexibility: Support data formats from different LLM providers
Extensibility: Easily add new data fields
6.4 Error Handling Mechanism
Design Principles:
Callback execution failures should not affect main business flow
Provide detailed error log recording
Support graceful error recovery
Implementation Details:
# Error handling in caller implementation
async def execute_callbacks(callbacks, method_name, *args, **kwargs):
"""Safely execute callback methods"""
for callback in callbacks:
try:
method = getattr(callback, method_name)
await method(*args, **kwargs)
except Exception as e:
# Record error but don't interrupt main flow
logger.error(f"Callback execution failed: {callback.__class__.__name__}.{method_name}")
logger.error(f"Error details: {e}")
# Can choose to continue executing other callbacks
continue
Fault Tolerance Strategy:
Isolated Execution: Each callback executes independently, not affecting each other
Error Recording: Record all error information in detail
Graceful Degradation: Partial callback failures do not affect overall functionality
6.5 Performance Optimization
Design Principles:
Minimize callback execution overhead
Support batch operations
Avoid unnecessary computations
Implementation Details:
class OptimizedCallback(CustomAsyncCallbackHandler):
def __init__(self):
# Cache commonly used data
self._cache = {}
async def on_llm_start(self, messages: List[dict], **kwargs: Any) -> None:
"""Optimized start callback"""
# Only calculate necessary information
message_count = len(messages)
provider = kwargs.get('provider', 'unknown')
# Cache calculation results
self._cache['message_count'] = message_count
self._cache['provider'] = provider
# Execute non-critical operations asynchronously
asyncio.create_task(self._background_processing())
async def _background_processing(self):
"""Background processing, non-blocking main flow"""
# Execute time-consuming statistics or analysis operations
pass
Optimization Strategies:
Delayed Computation: Put non-critical operations in background execution
Caching Mechanism: Avoid repeated computations
Batch Processing: Combine multiple operations to reduce overhead
7. Configuration & Deployment
7.1 Environment Requirements
Python Version:
Python 3.8+ (Python 3.9+ recommended)
Support complete functionality of
typingmoduleSupport
asyncioasynchronous programming
Dependencies:
# requirements.txt
# Core dependencies
typing-extensions>=4.0.0 # Enhanced type support
asyncio>=3.4.3 # Asynchronous programming support
# Optional dependencies (for specific implementations)
redis>=4.5.0 # Redis client
prometheus-client>=0.15.0 # Monitoring metrics
7.2 Configuration Options
Basic Configuration:
# config.py
class CallbackConfig:
"""Callback handler configuration"""
# Logging configuration
LOG_LEVEL = "INFO"
LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
# Performance configuration
MAX_CONCURRENT_CALLBACKS = 100
CALLBACK_TIMEOUT = 30 # seconds
# Error handling configuration
ENABLE_ERROR_RECOVERY = True
MAX_RETRY_ATTEMPTS = 3
RETRY_DELAY = 1 # seconds
Environment Variable Configuration:
# .env
CALLBACK_LOG_LEVEL=INFO
CALLBACK_MAX_CONCURRENT=100
CALLBACK_TIMEOUT=30
CALLBACK_ENABLE_RECOVERY=true
7.3 Deployment Configuration
Docker Configuration:
FROM python:3.9-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy code
COPY aiecs/ ./aiecs/
COPY config/ ./config/
# Set environment variables
ENV PYTHONPATH=/app
ENV CALLBACK_LOG_LEVEL=INFO
# Run application
CMD ["python", "-m", "aiecs.utils.base_callback"]
Kubernetes Configuration:
apiVersion: apps/v1
kind: Deployment
metadata:
name: aiecs-callbacks
spec:
replicas: 3
selector:
matchLabels:
app: aiecs-callbacks
template:
metadata:
labels:
app: aiecs-callbacks
spec:
containers:
- name: callbacks
image: aiecs/callbacks:latest
env:
- name: CALLBACK_LOG_LEVEL
value: "INFO"
- name: CALLBACK_MAX_CONCURRENT
value: "100"
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "256Mi"
cpu: "200m"
7.4 Monitoring Configuration
Prometheus Metrics:
from prometheus_client import Counter, Histogram, Gauge
# Define monitoring metrics
callback_executions_total = Counter(
'callback_executions_total',
'Total number of callback executions',
['callback_type', 'status']
)
callback_duration_seconds = Histogram(
'callback_duration_seconds',
'Callback execution duration',
['callback_type']
)
callback_errors_total = Counter(
'callback_errors_total',
'Total number of callback errors',
['callback_type', 'error_type']
)
active_callbacks = Gauge(
'active_callbacks',
'Number of active callback handlers'
)
Health Check:
async def health_check():
"""Callback handler health check"""
try:
# Check basic functionality
test_callback = TestCallback()
await test_callback.on_llm_start([{"role": "user", "content": "test"}])
return {
"status": "healthy",
"timestamp": time.time(),
"version": "1.0.0"
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e),
"timestamp": time.time()
}
8. Maintenance & Troubleshooting
8.1 Monitoring Metrics
Key Metrics:
Callback execution count and success rate
Callback execution time distribution
Error types and frequency
Memory usage
Monitoring Dashboard:
# Grafana query examples
# Callback execution success rate
rate(callback_executions_total{status="success"}[5m]) /
rate(callback_executions_total[5m])
# Average execution time
histogram_quantile(0.95, rate(callback_duration_seconds_bucket[5m]))
# Error rate trend
rate(callback_errors_total[5m])
8.2 Common Issues & Solutions
8.2.1 Callback Execution Timeout
Symptoms:
“Callback timeout” errors in logs
System response slows down
Callback handlers unresponsive
Troubleshooting Steps:
Check callback execution time:
grep "duration" /var/log/callbacks.logAnalyze performance bottlenecks: Use performance profiling tools
Check resource usage: Monitor CPU and memory usage
Solutions:
# Add timeout control
import asyncio
async def execute_callback_with_timeout(callback, method, timeout=30):
"""Callback execution with timeout"""
try:
await asyncio.wait_for(
getattr(callback, method)(),
timeout=timeout
)
except asyncio.TimeoutError:
logger.error(f"Callback execution timeout: {callback.__class__.__name__}")
# Can choose to cancel or continue execution
8.2.2 Memory Leak
Symptoms:
Memory usage continuously increases
System response slows down
Eventually causes OOM errors
Troubleshooting Steps:
Monitor memory usage trends
Analyze object reference relationships
Check for circular references
Solutions:
# Add memory monitoring
import gc
import weakref
class MemoryAwareCallback(CustomAsyncCallbackHandler):
def __init__(self):
# Use weak references to avoid circular references
self._refs = weakref.WeakSet()
async def on_llm_start(self, messages: List[dict], **kwargs: Any) -> None:
# Periodically clean memory
if len(self._refs) > 1000:
gc.collect()
8.2.3 Concurrency Issues
Symptoms:
Data inconsistency
Race conditions
Deadlock or livelock
Troubleshooting Steps:
Analyze concurrent execution patterns
Check shared resource access
Use concurrency analysis tools
Solutions:
# Add concurrency control
import asyncio
class ConcurrencySafeCallback(CustomAsyncCallbackHandler):
def __init__(self):
self._lock = asyncio.Lock()
async def on_llm_start(self, messages: List[dict], **kwargs: Any) -> None:
async with self._lock:
# Safely access shared resources
await self._safe_operation()
8.3 Log Analysis
Log Configuration:
import logging
# Configure callback handler specific logs
callback_logger = logging.getLogger('aiecs.callbacks')
callback_logger.setLevel(logging.INFO)
# Add file handler
file_handler = logging.FileHandler('/var/log/aiecs/callbacks.log')
file_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
callback_logger.addHandler(file_handler)
# Add console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
'%(levelname)s - %(message)s'
))
callback_logger.addHandler(console_handler)
Key Log Patterns:
# Find error logs
grep "ERROR" /var/log/aiecs/callbacks.log | tail -100
# Analyze performance issues
grep "duration" /var/log/aiecs/callbacks.log | awk '{print $NF}' | sort -n
# Monitor callback execution
grep "callback execution" /var/log/aiecs/callbacks.log | tail -50
8.4 Performance Tuning
Performance Profiling Tools:
import cProfile
import pstats
import time
class ProfiledCallback(CustomAsyncCallbackHandler):
def __init__(self):
self.profiler = cProfile.Profile()
async def on_llm_start(self, messages: List[dict], **kwargs: Any) -> None:
# Start performance profiling
self.profiler.enable()
try:
await self._actual_processing(messages, **kwargs)
finally:
# Stop performance profiling
self.profiler.disable()
# Save profiling results
stats = pstats.Stats(self.profiler)
stats.dump_stats(f'/tmp/callback_profile_{time.time()}.prof')
Optimization Recommendations:
Use asynchronous I/O operations
Avoid executing time-consuming operations in callbacks
Reasonably use caching mechanisms
Periodically clean up unused objects
9. Visualizations
9.1 Class Inheritance Diagram
classDiagram
class CustomAsyncCallbackHandler {
<<abstract>>
+on_llm_start(messages, **kwargs) async
+on_llm_end(response, **kwargs) async
+on_llm_error(error, **kwargs) async
}
class RedisTokenCallbackHandler {
-user_id: str
-cycle_start_date: str
-start_time: float
-messages: List[dict]
+on_llm_start(messages, **kwargs) async
+on_llm_end(response, **kwargs) async
+on_llm_error(error, **kwargs) async
}
class DetailedRedisTokenCallbackHandler {
-user_id: str
-cycle_start_date: str
-start_time: float
-messages: List[dict]
-prompt_tokens: int
+on_llm_start(messages, **kwargs) async
+on_llm_end(response, **kwargs) async
+on_llm_error(error, **kwargs) async
+_estimate_prompt_tokens(messages) int
+_extract_detailed_tokens(response) tuple
}
class CompositeCallbackHandler {
-handlers: List[CustomAsyncCallbackHandler]
+add_handler(handler)
+on_llm_start(messages, **kwargs) async
+on_llm_end(response, **kwargs) async
+on_llm_error(error, **kwargs) async
}
class CustomCallback {
+on_llm_start(messages, **kwargs) async
+on_llm_end(response, **kwargs) async
+on_llm_error(error, **kwargs) async
}
CustomAsyncCallbackHandler <|-- RedisTokenCallbackHandler
CustomAsyncCallbackHandler <|-- DetailedRedisTokenCallbackHandler
CustomAsyncCallbackHandler <|-- CompositeCallbackHandler
CustomAsyncCallbackHandler <|-- CustomCallback
CompositeCallbackHandler o-- CustomAsyncCallbackHandler
9.2 Data Flow Diagram
flowchart TD
A[LLM Call Start] --> B[on_llm_start Callback]
B --> C[Record Start State]
C --> D[Execute LLM API Call]
D --> E{Call Result}
E -->|Success| F[on_llm_end Callback]
E -->|Failure| G[on_llm_error Callback]
F --> H[Process Response Data]
H --> I[Update Statistics]
I --> J[Record Success Log]
G --> K[Record Error Information]
K --> L[Update Error Statistics]
L --> M[Record Error Log]
J --> N[Callback Execution Complete]
M --> N
9.3 System Architecture Diagram
graph TB
subgraph "Business Layer"
A[Chat Service] --> B[Document Processing Service]
B --> C[Task Execution Service]
end
subgraph "LLM Client Layer"
D[OpenAI Client] --> E[Vertex AI Client]
E --> F[XAI Client]
end
subgraph "Callback Processing Layer"
G[Base Callback Handler] --> H[Token Statistics Callback]
G --> I[Performance Monitoring Callback]
G --> J[Error Handling Callback]
G --> K[Custom Callback]
end
subgraph "Data Storage Layer"
L[Token Repository] --> M[Redis]
N[Monitoring Data] --> O[Prometheus]
end
A --> D
B --> E
C --> F
D --> G
E --> G
F --> G
H --> L
I --> N
J --> N
K --> L
9.4 Performance Monitoring Diagram
xychart-beta
title "Callback Execution Time Distribution"
x-axis ["0-10ms", "10-50ms", "50-100ms", "100-500ms", "500ms+"]
y-axis "Execution Count" 0 --> 1000
bar [800, 150, 30, 15, 5]
10. Version History
v1.0.0 (2024-01-15)
New Features:
Implement
CustomAsyncCallbackHandlerabstract base classDefine three core callback methods:
on_llm_start,on_llm_end,on_llm_errorUse generic data structures to avoid circular dependencies
Provide complete type annotations and documentation
Technical Features:
Abstract base class design based on Python ABC
Asynchronous method definitions, supporting modern Python asynchronous programming
Type-safe design, avoiding runtime errors
Clear interface contracts, ensuring implementation consistency
v1.1.0 (2024-02-01)
New Features:
Enhanced error handling mechanism
Added performance monitoring support
Optimized memory usage
Improved documentation and examples
Improvements:
More detailed error log recording
Better exception handling strategies
Performance optimization and memory management
Complete unit test coverage
v1.2.0 (2024-03-01)
New Features:
Added concurrency control mechanism
Support batch callback execution
Implement health check interface
Added monitoring metrics support
Performance Optimizations:
Support high concurrency scenarios
Optimize memory usage
Reduce callback execution overhead
Add performance profiling tools
v1.3.0 (2024-04-01) [Planned]
Planned Features:
Support callback priority
Add callback chain execution
Implement callback result aggregation
Support dynamic callback registration
Performance Goals:
Support 1000+ concurrent callbacks
Callback execution latency < 1ms
99.99% availability guarantee
50% memory usage optimization
Appendix
B. Example Code Repositories
C. Technical Support
Technical Documentation: https://docs.aiecs.com
Issue Reporting: https://github.com/aiecs/issues
Community Discussion: https://discord.gg/aiecs
D. Best Practices
D.1 Implementing Custom Callback Handlers
class MyCustomCallback(CustomAsyncCallbackHandler):
"""Custom callback handler example"""
def __init__(self, config: dict):
self.config = config
self.stats = {}
async def on_llm_start(self, messages: List[dict], **kwargs: Any) -> None:
"""Implement start callback"""
# Record start state
self.stats['start_time'] = time.time()
self.stats['message_count'] = len(messages)
# Execute custom logic
await self._custom_start_logic(messages, **kwargs)
async def on_llm_end(self, response: dict, **kwargs: Any) -> None:
"""Implement end callback"""
# Calculate statistics
if 'start_time' in self.stats:
duration = time.time() - self.stats['start_time']
self.stats['duration'] = duration
# Execute custom logic
await self._custom_end_logic(response, **kwargs)
async def on_llm_error(self, error: Exception, **kwargs: Any) -> None:
"""Implement error callback"""
# Record error information
self.stats['error'] = str(error)
self.stats['error_type'] = type(error).__name__
# Execute custom logic
await self._custom_error_logic(error, **kwargs)
async def _custom_start_logic(self, messages: List[dict], **kwargs: Any):
"""Custom start logic"""
pass
async def _custom_end_logic(self, response: dict, **kwargs: Any):
"""Custom end logic"""
pass
async def _custom_error_logic(self, error: Exception, **kwargs: Any):
"""Custom error logic"""
pass
D.2 Error Handling Best Practices
class RobustCallback(CustomAsyncCallbackHandler):
"""Robust callback handler"""
def __init__(self, max_retries: int = 3):
self.max_retries = max_retries
self.retry_count = 0
async def on_llm_start(self, messages: List[dict], **kwargs: Any) -> None:
"""Start callback with retry"""
await self._execute_with_retry(
self._on_llm_start_impl,
messages, **kwargs
)
async def _execute_with_retry(self, func, *args, **kwargs):
"""Execution method with retry"""
for attempt in range(self.max_retries):
try:
await func(*args, **kwargs)
self.retry_count = 0 # Reset retry count
return
except Exception as e:
self.retry_count += 1
if attempt == self.max_retries - 1:
logger.error(f"Callback execution failed, reached max retries: {e}")
raise
else:
logger.warning(f"Callback execution failed, retry {attempt + 1}/{self.max_retries}: {e}")
await asyncio.sleep(2 ** attempt) # Exponential backoff
async def _on_llm_start_impl(self, messages: List[dict], **kwargs: Any):
"""Actual start callback implementation"""
# Specific business logic
pass
D.3 Performance Optimization Best Practices
class OptimizedCallback(CustomAsyncCallbackHandler):
"""Optimized callback handler"""
def __init__(self):
self._cache = {}
self._batch_queue = []
self._batch_size = 100
async def on_llm_start(self, messages: List[dict], **kwargs: Any) -> None:
"""Optimized start callback"""
# Use cache to avoid repeated computation
cache_key = f"start_{hash(str(messages))}"
if cache_key in self._cache:
return self._cache[cache_key]
# Batch processing to improve efficiency
self._batch_queue.append(('start', messages, kwargs))
if len(self._batch_queue) >= self._batch_size:
await self._process_batch()
async def _process_batch(self):
"""Batch process callbacks"""
if not self._batch_queue:
return
# Batch execute operations
tasks = []
for callback_type, data, kwargs in self._batch_queue:
task = asyncio.create_task(
self._process_single(callback_type, data, kwargs)
)
tasks.append(task)
# Wait for all tasks to complete
await asyncio.gather(*tasks, return_exceptions=True)
# Clear queue
self._batch_queue.clear()
async def _process_single(self, callback_type: str, data, kwargs):
"""Process single callback"""
# Specific processing logic
pass