BaseLLMClient Technical Documentation
1. Overview
Purpose
BaseLLMClient is an abstract base class that provides a unified interface and foundational functionality for all Large Language Model (LLM) provider clients. It defines standard message formats, response structures, and core methods, implementing core features such as multi-provider support, asynchronous operations, and cost estimation. It is the core infrastructure of the LLM integration layer in the AIECS system.
Core Value
Unified Interface: Provides consistent API interfaces for different LLM providers
Async Support: High-performance asynchronous operations based on asyncio
Streaming Processing: Supports real-time streaming text generation
Cost Control: Provides Token usage and cost estimation functionality
Error Handling: Unified exception handling and error classification mechanisms
2. Problem Background & Design Motivation
Problem Background
The AIECS system needs to integrate multiple large language model services, including:
Multi-Provider Support: Need to simultaneously support different providers like OpenAI, Anthropic, Google, etc.
Interface Unification: Different providers have significantly different API interfaces, requiring unified abstraction
Cost Management: Need to track and estimate usage costs for different models
Performance Optimization: Need to support asynchronous operations and streaming processing to improve user experience
Error Handling: Need unified error handling and retry mechanisms
Design Motivation
Abstraction Unification: Unify interfaces of different LLM providers through abstract base classes
Extensibility: Facilitate adding support for new LLM providers
Cost Control: Provide Token usage and cost estimation functionality
Performance Optimization: Support asynchronous operations and streaming processing
Maintenance Simplification: Unified error handling and logging mechanisms
3. Architecture Positioning & Context
System Architecture Position
┌─────────────────────────────────────────────────────────────┐
│ AIECS System Architecture │
├─────────────────────────────────────────────────────────────┤
│ Application Layer │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ TaskService │ │ DSLProcessor │ │
│ └─────────────────┘ └─────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ LLM Integration Layer │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ BaseLLMClient │ │ Concrete LLM Clients │
│ └─────────────────┘ └─────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ External Services Layer │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ OpenAI API │ │ Anthropic API │ │
│ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Upstream Callers
TaskService: Task management service that needs to call LLM for text processing
DSLProcessor: DSL processor that needs LLM for natural language understanding
ChatService: Chat service that needs LLM for conversation generation
ContentGenerator: Content generation service that needs LLM for content creation
Downstream Dependencies
OpenAI API: OpenAI large language model service
Anthropic API: Anthropic Claude model service
Google AI API: Google Gemini model service
Other LLM Providers: Various third-party LLM services
4. Core Features & Use Cases
4.1 Basic Text Generation
Single Text Generation
from aiecs.llm import BaseLLMClient, LLMMessage, LLMResponse
# Create message list
messages = [
LLMMessage(role="system", content="You are a professional AI assistant specializing in technical questions."),
LLMMessage(role="user", content="Please explain what asynchronous programming is?")
]
# Call LLM to generate text
async def generate_explanation():
"""Generate technical explanation"""
# Here we need a concrete LLM client implementation
# client = OpenAIClient() # or other concrete implementation
response = await client.generate_text(
messages=messages,
model="gpt-4",
temperature=0.7,
max_tokens=500
)
print(f"Generated content: {response.content}")
print(f"Model used: {response.model}")
print(f"Tokens used: {response.tokens_used}")
print(f"Cost estimate: ${response.cost_estimate:.4f}")
return response
# Usage example
response = await generate_explanation()
Conversational Interaction
async def chat_with_llm():
"""Chat with LLM"""
conversation = [
LLMMessage(role="system", content="You are a friendly AI assistant."),
LLMMessage(role="user", content="Hello, I want to learn Python programming."),
]
# First round of conversation
response1 = await client.generate_text(
messages=conversation,
temperature=0.8
)
print(f"AI: {response1.content}")
# Add AI response to conversation history
conversation.append(LLMMessage(role="assistant", content=response1.content))
conversation.append(LLMMessage(role="user", content="Please recommend some learning resources."))
# Second round of conversation
response2 = await client.generate_text(
messages=conversation,
temperature=0.8
)
print(f"AI: {response2.content}")
return conversation
4.2 Streaming Text Generation
Real-Time Streaming Output
async def stream_text_generation():
"""Stream text generation"""
messages = [
LLMMessage(role="system", content="You are a creative writing assistant."),
LLMMessage(role="user", content="Please write a short story about artificial intelligence.")
]
print("Starting story generation...")
# Stream generation
async for chunk in client.stream_text(
messages=messages,
model="gpt-4",
temperature=0.9,
max_tokens=1000
):
print(chunk, end='', flush=True) # Real-time output
print("\nStory generation completed!")
# Usage example
await stream_text_generation()
Streaming Chat System
async def interactive_chat():
"""Interactive chat system"""
conversation = [
LLMMessage(role="system", content="You are a professional programming tutor.")
]
while True:
user_input = input("\nUser: ")
if user_input.lower() in ['quit', 'exit']:
break
conversation.append(LLMMessage(role="user", content=user_input))
print("AI: ", end='', flush=True)
# Stream generate response
ai_response = ""
async for chunk in client.stream_text(
messages=conversation,
temperature=0.7
):
print(chunk, end='', flush=True)
ai_response += chunk
conversation.append(LLMMessage(role="assistant", content=ai_response))
print() # New line
# Usage example
await interactive_chat()
4.3 Task Processing Integration
Task Description Generation
async def generate_task_description(user_input: str) -> str:
"""Generate task description from user input"""
messages = [
LLMMessage(
role="system",
content="You are a task analysis expert capable of converting user natural language descriptions into structured task descriptions."
),
LLMMessage(
role="user",
content=f"Please analyze the following user requirement and generate a detailed task description: {user_input}"
)
]
response = await client.generate_text(
messages=messages,
model="gpt-4",
temperature=0.3, # Lower temperature ensures consistency
max_tokens=300
)
return response.content
# Usage example
user_request = "I want to analyze sales data and generate a report"
task_description = await generate_task_description(user_request)
print(f"Generated task description: {task_description}")
Code Generation and Optimization
async def generate_code(requirements: str, language: str = "python") -> str:
"""Generate code based on requirements"""
messages = [
LLMMessage(
role="system",
content=f"You are a professional {language} programmer capable of generating high-quality code based on requirements."
),
LLMMessage(
role="user",
content=f"Please generate {language} code based on the following requirements:\n{requirements}"
)
]
response = await client.generate_text(
messages=messages,
model="gpt-4",
temperature=0.2, # Low temperature ensures code accuracy
max_tokens=1000
)
return response.content
# Usage example
requirements = "Implement a quicksort algorithm with detailed comments"
code = await generate_code(requirements, "python")
print(f"Generated code:\n{code}")
4.4 Content Analysis and Processing
Text Summarization
async def summarize_text(text: str, max_length: int = 200) -> str:
"""Generate text summary"""
messages = [
LLMMessage(
role="system",
content=f"You are a professional text summarization expert capable of generating concise and accurate summaries, not exceeding {max_length} characters."
),
LLMMessage(
role="user",
content=f"Please generate a summary for the following text:\n\n{text}"
)
]
response = await client.generate_text(
messages=messages,
model="gpt-3.5-turbo",
temperature=0.3,
max_tokens=max_length
)
return response.content
# Usage example
long_text = """
Artificial Intelligence (AI) is a branch of computer science that attempts to understand the essence of intelligence,
and produce a new type of intelligent machine that can react in ways similar to human intelligence...
"""
summary = await summarize_text(long_text, 100)
print(f"Summary: {summary}")
Sentiment Analysis
async def analyze_sentiment(text: str) -> dict:
"""Analyze text sentiment"""
messages = [
LLMMessage(
role="system",
content="You are a sentiment analysis expert capable of accurately analyzing text sentiment. Please return JSON format: {'sentiment': 'positive/negative/neutral', 'confidence': 0.0-1.0, 'reason': 'analysis reason'}"
),
LLMMessage(
role="user",
content=f"Please analyze the sentiment of the following text: {text}"
)
]
response = await client.generate_text(
messages=messages,
model="gpt-3.5-turbo",
temperature=0.1, # Low temperature ensures consistency
max_tokens=200
)
import json
try:
return json.loads(response.content)
except json.JSONDecodeError:
return {"error": "Unable to parse response", "raw_response": response.content}
# Usage example
text = "This product is really great, I'm very satisfied!"
sentiment = await analyze_sentiment(text)
print(f"Sentiment analysis result: {sentiment}")
4.5 Batch Processing
Batch Text Processing
async def batch_process_texts(texts: List[str], operation: str) -> List[str]:
"""Batch process texts"""
results = []
for i, text in enumerate(texts):
print(f"Processing text {i+1}/{len(texts)}")
messages = [
LLMMessage(
role="system",
content=f"You are a text processing expert, please perform the following operation: {operation}"
),
LLMMessage(role="user", content=text)
]
response = await client.generate_text(
messages=messages,
model="gpt-3.5-turbo",
temperature=0.3
)
results.append(response.content)
# Add delay to avoid rate limiting
await asyncio.sleep(0.1)
return results
# Usage example
texts = [
"This is the first text",
"This is the second text",
"This is the third text"
]
processed = await batch_process_texts(texts, "Convert text to formal business language")
for i, result in enumerate(processed):
print(f"Text {i+1}: {result}")
5. API Reference
5.1 Data Class Definitions
LLMMessage
@dataclass
class LLMMessage:
"""LLM message data class"""
role: str # Role: "system", "user", "assistant"
content: str # Message content
LLMResponse
@dataclass
class LLMResponse:
"""LLM response data class"""
content: str # Response content
provider: str # Provider name
model: str # Model name
tokens_used: Optional[int] = None # Total tokens
prompt_tokens: Optional[int] = None # Input tokens
completion_tokens: Optional[int] = None # Output tokens
cost_estimate: Optional[float] = None # Cost estimate
response_time: Optional[float] = None # Response time
5.2 Exception Class Definitions
LLMClientError
class LLMClientError(Exception):
"""LLM client base exception"""
pass
ProviderNotAvailableError
class ProviderNotAvailableError(LLMClientError):
"""Provider not available exception"""
pass
RateLimitError
class RateLimitError(LLMClientError):
"""Rate limit exception"""
pass
5.3 Abstract Base Class
BaseLLMClient
class BaseLLMClient(ABC):
"""Abstract base class for all LLM provider clients"""
def __init__(self, provider_name: str) -> None
"""Initialize LLM client
Args:
provider_name: Provider name
"""
5.4 Abstract Methods
generate_text
@abstractmethod
async def generate_text(
self,
messages: List[LLMMessage],
model: Optional[str] = None,
temperature: float = 0.7,
max_tokens: Optional[int] = None,
**kwargs
) -> LLMResponse
Function: Generate text
Parameters:
messages(List[LLMMessage]): Message listmodel(Optional[str]): Model nametemperature(float): Temperature parameter, default 0.7max_tokens(Optional[int]): Maximum tokens**kwargs: Other parameters
Returns:
LLMResponse: LLM response object
stream_text
@abstractmethod
async def stream_text(
self,
messages: List[LLMMessage],
model: Optional[str] = None,
temperature: float = 0.7,
max_tokens: Optional[int] = None,
**kwargs
) -> AsyncGenerator[str, None]
Function: Stream generate text
Parameters: Same as generate_text
Returns:
AsyncGenerator[str, None]: Text chunk generator
close
@abstractmethod
async def close(self) -> None
Function: Clean up resources
5.5 Utility Methods
_count_tokens_estimate
def _count_tokens_estimate(self, text: str) -> int
Function: Estimate token count
Parameters:
text(str): Text content
Returns:
int: Estimated token count
_estimate_cost
def _estimate_cost(self, model: str, input_tokens: int, output_tokens: int, token_costs: Dict) -> float
Function: Estimate cost
Parameters:
model(str): Model nameinput_tokens(int): Input token countoutput_tokens(int): Output token counttoken_costs(Dict): Token cost configuration
Returns:
float: Estimated cost
6. Technical Implementation Details
6.1 Abstract Base Class Design
Interface Unification
class BaseLLMClient(ABC):
"""Abstract base class design"""
def __init__(self, provider_name: str):
self.provider_name = provider_name
self.logger = logging.getLogger(f"{__name__}.{provider_name}")
@abstractmethod
async def generate_text(self, messages: List[LLMMessage], **kwargs) -> LLMResponse:
"""Unified text generation interface"""
pass
@abstractmethod
async def stream_text(self, messages: List[LLMMessage], **kwargs) -> AsyncGenerator[str, None]:
"""Unified streaming generation interface"""
pass
6.2 Data Class Design
Message Structure
@dataclass
class LLMMessage:
"""Standardized message structure"""
role: str # Role type
content: str # Message content
def __post_init__(self):
"""Validate message format"""
if self.role not in ["system", "user", "assistant"]:
raise ValueError(f"Invalid role: {self.role}")
if not self.content.strip():
raise ValueError("Message content cannot be empty")
Response Structure
@dataclass
class LLMResponse:
"""Standardized response structure"""
content: str
provider: str
model: str
tokens_used: Optional[int] = None
prompt_tokens: Optional[int] = None
completion_tokens: Optional[int] = None
cost_estimate: Optional[float] = None
response_time: Optional[float] = None
def __post_init__(self):
"""Ensure Token data consistency"""
# If detailed Token info exists but no total, calculate total
if (self.prompt_tokens is not None and
self.completion_tokens is not None and
self.tokens_used is None):
self.tokens_used = self.prompt_tokens + self.completion_tokens
6.3 Error Handling Mechanism
Exception Hierarchy
class LLMClientError(Exception):
"""Base exception class"""
pass
class ProviderNotAvailableError(LLMClientError):
"""Provider not available"""
pass
class RateLimitError(LLMClientError):
"""Rate limit"""
pass
class AuthenticationError(LLMClientError):
"""Authentication error"""
pass
class ModelNotFoundError(LLMClientError):
"""Model not found"""
pass
Error Handling Strategy
async def safe_generate_text(self, messages: List[LLMMessage], **kwargs) -> Optional[LLMResponse]:
"""Safe text generation (with error handling)"""
try:
return await self.generate_text(messages, **kwargs)
except RateLimitError as e:
self.logger.warning(f"Rate limit exceeded: {e}")
# Implement backoff retry
await asyncio.sleep(1)
return await self.generate_text(messages, **kwargs)
except ProviderNotAvailableError as e:
self.logger.error(f"Provider not available: {e}")
return None
except Exception as e:
self.logger.error(f"Unexpected error: {e}")
raise LLMClientError(f"Text generation failed: {e}")
6.4 Cost Estimation Mechanism
Token Counting
def _count_tokens_estimate(self, text: str) -> int:
"""Token count estimation"""
# Simple estimation: approximately 4 characters equals 1 Token (English)
return len(text) // 4
def _count_tokens_accurate(self, text: str, model: str) -> int:
"""Accurate Token counting (requires concrete implementation)"""
# This needs to be implemented based on specific provider
# For example, using tiktoken library
pass
Cost Calculation
def _estimate_cost(self, model: str, input_tokens: int, output_tokens: int, token_costs: Dict) -> float:
"""Cost estimation"""
if model in token_costs:
costs = token_costs[model]
return (input_tokens * costs["input"] + output_tokens * costs["output"]) / 1000
return 0.0
# Cost configuration example
TOKEN_COSTS = {
"gpt-4": {
"input": 0.03, # Per 1K input tokens
"output": 0.06 # Per 1K output tokens
},
"gpt-3.5-turbo": {
"input": 0.0015,
"output": 0.002
}
}
6.5 Context Management
Async Context Manager
async def __aenter__(self):
"""Async context manager entry"""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
await self.close()
# Usage example
async with client as llm:
response = await llm.generate_text(messages)
# Automatically clean up resources
7. Configuration & Deployment
7.1 Basic Configuration
Environment Variable Configuration
# OpenAI configuration
export OPENAI_API_KEY="your_openai_api_key"
export OPENAI_BASE_URL="https://api.openai.com/v1"
# Anthropic configuration
export ANTHROPIC_API_KEY="your_anthropic_api_key"
# Google AI configuration
export GOOGLE_AI_API_KEY="your_google_ai_api_key"
# General configuration
export LLM_DEFAULT_MODEL="gpt-3.5-turbo"
export LLM_DEFAULT_TEMPERATURE="0.7"
export LLM_MAX_TOKENS="1000"
export LLM_TIMEOUT="30"
Configuration File
# config/llm_config.py
LLM_CONFIG = {
"providers": {
"openai": {
"api_key": os.getenv("OPENAI_API_KEY"),
"base_url": os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1"),
"models": ["gpt-4", "gpt-3.5-turbo", "gpt-3.5-turbo-16k"]
},
"anthropic": {
"api_key": os.getenv("ANTHROPIC_API_KEY"),
"models": ["claude-3-opus", "claude-3-sonnet", "claude-3-haiku"]
},
"google": {
"api_key": os.getenv("GOOGLE_AI_API_KEY"),
"models": ["gemini-pro", "gemini-pro-vision"]
}
},
"defaults": {
"model": "gpt-3.5-turbo",
"temperature": 0.7,
"max_tokens": 1000,
"timeout": 30
},
"costs": {
"gpt-4": {"input": 0.03, "output": 0.06},
"gpt-3.5-turbo": {"input": 0.0015, "output": 0.002},
"claude-3-opus": {"input": 0.015, "output": 0.075}
}
}
7.2 Concrete Implementation Examples
OpenAI Client Implementation
# llm/openai_client.py
import openai
from .base_client import BaseLLMClient, LLMMessage, LLMResponse
class OpenAIClient(BaseLLMClient):
def __init__(self, api_key: str, base_url: str = None):
super().__init__("openai")
self.client = openai.AsyncOpenAI(
api_key=api_key,
base_url=base_url
)
async def generate_text(self, messages: List[LLMMessage], **kwargs) -> LLMResponse:
"""Implement OpenAI text generation"""
start_time = time.time()
# Convert message format
openai_messages = [
{"role": msg.role, "content": msg.content}
for msg in messages
]
# Call OpenAI API
response = await self.client.chat.completions.create(
messages=openai_messages,
**kwargs
)
# Build response
return LLMResponse(
content=response.choices[0].message.content,
provider="openai",
model=response.model,
prompt_tokens=response.usage.prompt_tokens,
completion_tokens=response.usage.completion_tokens,
tokens_used=response.usage.total_tokens,
response_time=time.time() - start_time
)
async def stream_text(self, messages: List[LLMMessage], **kwargs) -> AsyncGenerator[str, None]:
"""Implement OpenAI streaming generation"""
openai_messages = [
{"role": msg.role, "content": msg.content}
for msg in messages
]
stream = await self.client.chat.completions.create(
messages=openai_messages,
stream=True,
**kwargs
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
async def close(self):
"""Clean up resources"""
await self.client.close()
7.3 Docker Deployment
Dockerfile Configuration
FROM python:3.9-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt
# Copy application code
COPY . .
# Set environment variables
ENV LLM_DEFAULT_MODEL="gpt-3.5-turbo"
ENV LLM_DEFAULT_TEMPERATURE="0.7"
# Start command
CMD ["python", "-m", "aiecs.llm.base_client"]
Docker Compose Configuration
version: '3.8'
services:
llm-service:
build: .
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- GOOGLE_AI_API_KEY=${GOOGLE_AI_API_KEY}
- LLM_DEFAULT_MODEL=gpt-3.5-turbo
restart: unless-stopped
ports:
- "8000:8000"
8. Maintenance & Troubleshooting
8.1 Monitoring Metrics
Key Metrics
Request Success Rate:
(successful requests / total requests) * 100%Average Response Time: Average response time of LLM requests
Token Usage: Usage statistics for input and output tokens
Cost Statistics: Usage cost statistics for each model
Error Rate: Error statistics classified by error type
Monitoring Implementation
class LLMMonitor:
def __init__(self):
self.metrics = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"total_tokens": 0,
"total_cost": 0.0,
"avg_response_time": 0.0
}
def record_request(self, success: bool, tokens: int, cost: float, response_time: float):
"""Record request metrics"""
self.metrics["total_requests"] += 1
if success:
self.metrics["successful_requests"] += 1
else:
self.metrics["failed_requests"] += 1
self.metrics["total_tokens"] += tokens
self.metrics["total_cost"] += cost
# Update average response time
total = self.metrics["total_requests"]
current_avg = self.metrics["avg_response_time"]
self.metrics["avg_response_time"] = (current_avg * (total - 1) + response_time) / total
def get_metrics(self) -> dict:
"""Get monitoring metrics"""
success_rate = (self.metrics["successful_requests"] /
max(self.metrics["total_requests"], 1)) * 100
return {
**self.metrics,
"success_rate": success_rate,
"error_rate": 100 - success_rate
}
# Use monitor
monitor = LLMMonitor()
# Record metrics in requests
async def monitored_generate_text(self, messages: List[LLMMessage], **kwargs) -> LLMResponse:
start_time = time.time()
try:
response = await self.generate_text(messages, **kwargs)
monitor.record_request(
success=True,
tokens=response.tokens_used or 0,
cost=response.cost_estimate or 0.0,
response_time=time.time() - start_time
)
return response
except Exception as e:
monitor.record_request(
success=False,
tokens=0,
cost=0.0,
response_time=time.time() - start_time
)
raise
8.2 Common Issues and Solutions
Issue 1: Invalid API Key
Symptoms: AuthenticationError or 401 Unauthorized error
Possible Causes:
API key is incorrect or expired
Insufficient key permissions
Environment variables not set correctly
Solution:
# 1. Validate API key
async def validate_api_key(provider: str, api_key: str) -> bool:
"""Validate API key"""
try:
if provider == "openai":
client = openai.AsyncOpenAI(api_key=api_key)
await client.models.list()
return True
elif provider == "anthropic":
# Implement Anthropic key validation
pass
return False
except Exception:
return False
# 2. Check environment variables
def check_environment_variables():
"""Check environment variables"""
required_vars = ["OPENAI_API_KEY", "ANTHROPIC_API_KEY"]
missing_vars = []
for var in required_vars:
if not os.getenv(var):
missing_vars.append(var)
if missing_vars:
raise ValueError(f"Missing environment variables: {missing_vars}")
# 3. Implement key rotation
class APIKeyManager:
def __init__(self):
self.keys = {
"openai": [os.getenv("OPENAI_API_KEY_1"), os.getenv("OPENAI_API_KEY_2")],
"anthropic": [os.getenv("ANTHROPIC_API_KEY_1"), os.getenv("ANTHROPIC_API_KEY_2")]
}
self.current_index = {provider: 0 for provider in self.keys}
def get_key(self, provider: str) -> str:
"""Get currently available API key"""
keys = self.keys[provider]
current = self.current_index[provider]
return keys[current]
def rotate_key(self, provider: str):
"""Rotate to next API key"""
self.current_index[provider] = (self.current_index[provider] + 1) % len(self.keys[provider])
Issue 2: Rate Limiting
Symptoms: RateLimitError or 429 Too Many Requests error
Possible Causes:
Request frequency exceeds limit
Too many concurrent requests
Quota exhausted
Solution:
import asyncio
from functools import wraps
def rate_limit_retry(max_retries: int = 3, base_delay: float = 1.0):
"""Rate limit retry decorator"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except RateLimitError as e:
if attempt == max_retries - 1:
raise
# Exponential backoff
delay = base_delay * (2 ** attempt)
logger.warning(f"Rate limit hit, retrying in {delay}s (attempt {attempt + 1}/{max_retries})")
await asyncio.sleep(delay)
return None
return wrapper
return decorator
# Use retry decorator
@rate_limit_retry(max_retries=3, base_delay=1.0)
async def generate_text_with_retry(self, messages: List[LLMMessage], **kwargs) -> LLMResponse:
return await self.generate_text(messages, **kwargs)
# Implement request queue
class RequestQueue:
def __init__(self, max_concurrent: int = 5):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.queue = asyncio.Queue()
async def process_request(self, func, *args, **kwargs):
"""Process request (with concurrency control)"""
async with self.semaphore:
return await func(*args, **kwargs)
Issue 3: Model Not Available
Symptoms: ModelNotFoundError or 404 Not Found error
Possible Causes:
Incorrect model name
Model not available in current region
Model deprecated
Solution:
class ModelManager:
def __init__(self):
self.available_models = {
"openai": ["gpt-4", "gpt-3.5-turbo", "gpt-3.5-turbo-16k"],
"anthropic": ["claude-3-opus", "claude-3-sonnet", "claude-3-haiku"],
"google": ["gemini-pro", "gemini-pro-vision"]
}
self.fallback_models = {
"gpt-4": "gpt-3.5-turbo",
"claude-3-opus": "claude-3-sonnet"
}
def get_available_model(self, provider: str, requested_model: str) -> str:
"""Get available model"""
available = self.available_models.get(provider, [])
if requested_model in available:
return requested_model
# Try fallback model
fallback = self.fallback_models.get(requested_model)
if fallback and fallback in available:
logger.warning(f"Model {requested_model} not available, using {fallback}")
return fallback
# Use default model
default = available[0] if available else "gpt-3.5-turbo"
logger.warning(f"Using default model {default}")
return default
async def validate_model(self, provider: str, model: str) -> bool:
"""Validate if model is available"""
try:
# Implement model validation logic
return True
except Exception:
return False
Issue 4: Network Connection Problems
Symptoms: ConnectionError or timeout error
Possible Causes:
Unstable network connection
Proxy configuration issues
DNS resolution failure
Solution:
import aiohttp
from aiohttp import ClientTimeout, TCPConnector
class RobustHTTPClient:
def __init__(self, timeout: int = 30, max_retries: int = 3):
self.timeout = ClientTimeout(total=timeout)
self.max_retries = max_retries
# Configure connector
connector = TCPConnector(
limit=100,
limit_per_host=30,
keepalive_timeout=30,
enable_cleanup_closed=True
)
self.session = aiohttp.ClientSession(
timeout=self.timeout,
connector=connector
)
async def request_with_retry(self, method: str, url: str, **kwargs):
"""Request with retry"""
for attempt in range(self.max_retries):
try:
async with self.session.request(method, url, **kwargs) as response:
return await response.json()
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if attempt == self.max_retries - 1:
raise
delay = 2 ** attempt
logger.warning(f"Request failed (attempt {attempt + 1}/{self.max_retries}): {e}")
await asyncio.sleep(delay)
async def close(self):
"""Close session"""
await self.session.close()
9. Visualizations
9.1 System Architecture Diagram
graph TB
subgraph "Application Layer"
APP[AIECS Application]
TS[TaskService]
DS[DSLProcessor]
end
subgraph "LLM Integration Layer"
BLC[BaseLLMClient]
OAI[OpenAIClient]
ANT[AnthropicClient]
GAI[GoogleAIClient]
end
subgraph "External Services"
OAI_API[OpenAI API]
ANT_API[Anthropic API]
GAI_API[Google AI API]
end
APP --> BLC
TS --> BLC
DS --> BLC
BLC --> OAI
BLC --> ANT
BLC --> GAI
OAI --> OAI_API
ANT --> ANT_API
GAI --> GAI_API
9.2 Request Flow Diagram
sequenceDiagram
participant App as Application
participant BLC as BaseLLMClient
participant Impl as Concrete Implementation
participant API as External API
App->>BLC: generate_text(messages)
BLC->>Impl: Call concrete implementation
Impl->>API: Send request
API-->>Impl: Return response
Impl->>Impl: Build LLMResponse
Impl-->>BLC: Return response
BLC-->>App: Return result
9.3 Error Handling Flow Diagram
flowchart TD
Start([Start Request]) --> Validate{Validate Parameters}
Validate -->|Failed| ParamError[Parameter Error]
Validate -->|Success| Request[Send Request]
Request --> Success{Request Success?}
Success -->|Yes| Process[Process Response]
Success -->|No| ErrorType{Error Type}
ErrorType -->|Authentication Error| AuthError[Authentication Error]
ErrorType -->|Rate Limit| RateLimit[Rate Limit]
ErrorType -->|Network Error| NetworkError[Network Error]
ErrorType -->|Other| OtherError[Other Error]
RateLimit --> Retry{Retry?}
Retry -->|Yes| Wait[Wait Retry]
Retry -->|No| RateLimitError[Rate Limit Error]
Wait --> Request
Process --> End([End])
ParamError --> End
AuthError --> End
RateLimitError --> End
NetworkError --> End
OtherError --> End
10. Version History
v1.0.0 (2024-01-15)
New Features:
Basic abstract class implementation
Support for message and response data structures
Implement asynchronous operation interfaces
Provide basic error handling
Technical Features:
ABC abstract base class design
Support for asynchronous generation and streaming processing
Implement context manager
v1.1.0 (2024-02-01)
Feature Enhancements:
Add Token counting and cost estimation
Implement error classification and exception handling
Support multiple provider interfaces
Add logging functionality
Performance Optimizations:
Optimize asynchronous operation performance
Improve error handling mechanisms
Enhance resource management
v1.2.0 (2024-03-01)
New Features:
Support streaming text generation
Add cost monitoring and statistics
Implement request retry mechanism
Provide health check interface
Stability Improvements:
Enhance error recovery capabilities
Improve concurrency control
Optimize resource cleanup
v1.3.0 (2024-04-01)
Architecture Upgrades:
Support more LLM providers
Add model management and fallback mechanisms
Implement advanced caching strategies
Support batch processing
Monitoring Enhancements:
Add detailed performance metrics
Implement cost analysis tools
Support alert integration
Provide operations management interface
Appendix
B. External Dependencies
C. Best Practices
# 1. Message building best practices
def build_conversation(system_prompt: str, user_messages: List[str]) -> List[LLMMessage]:
"""Build conversation messages"""
messages = [LLMMessage(role="system", content=system_prompt)]
for i, content in enumerate(user_messages):
role = "user" if i % 2 == 0 else "assistant"
messages.append(LLMMessage(role=role, content=content))
return messages
# 2. Error handling best practices
async def safe_llm_call(client: BaseLLMClient, messages: List[LLMMessage], **kwargs) -> Optional[LLMResponse]:
"""Safe LLM call"""
try:
return await client.generate_text(messages, **kwargs)
except RateLimitError:
logger.warning("Rate limit exceeded, implementing backoff")
await asyncio.sleep(5)
return await client.generate_text(messages, **kwargs)
except ProviderNotAvailableError:
logger.error("Provider not available, using fallback")
return None
except Exception as e:
logger.error(f"Unexpected error: {e}")
return None
# 3. Cost control best practices
class CostController:
def __init__(self, daily_budget: float = 10.0):
self.daily_budget = daily_budget
self.daily_spent = 0.0
def can_make_request(self, estimated_cost: float) -> bool:
"""Check if request can be made"""
return self.daily_spent + estimated_cost <= self.daily_budget
def record_cost(self, cost: float):
"""Record cost"""
self.daily_spent += cost
D. Contact Information
Technical Lead: AIECS Development Team
Issue Reporting: Through project Issue system
Documentation Updates: Regular maintenance, version synchronization