TracingManager Technical Documentation
1. Overview
Purpose
TracingManager is a component specifically designed for distributed tracing and call chain tracking, built on Jaeger and OpenTracing standards. It provides core functionalities including cross-service request tracing, performance analysis, error localization, etc., serving as important infrastructure for observability in the AIECS system.
Core Value
Distributed Tracing: Track complete call chains of requests across multiple services
Performance Analysis: Identify system bottlenecks and performance hotspots
Error Localization: Quickly locate error sources in distributed systems
Dependency Analysis: Understand dependencies between services
SLA Monitoring: Monitor end-to-end service response times
2. Problem Background & Design Motivation
Problem Background
In the AIECS distributed system, the following challenges are faced:
Complex Call Chains: Requests pass through multiple services, difficult to track complete paths
Performance Bottlenecks Hard to Locate: Unable to quickly identify which service or operation causes delays
Complex Error Propagation: Errors propagate between services, difficult to locate root causes
Unclear Dependencies: Lack of visualization of dependencies between services
Difficult Debugging: Low efficiency of problem troubleshooting in distributed environments
Design Motivation
Observability Enhancement: Provide comprehensive visibility of system operational status
Performance Optimization: Identify performance bottlenecks through tracing data
Rapid Fault Localization: Reduce problem troubleshooting and resolution time
System Understanding: Help teams understand system architecture and dependencies
SLA Assurance: Ensure system meets service level agreements
3. Architecture Positioning & Context
System Architecture Location
┌─────────────────────────────────────────────────────────────┐
│ AIECS System Architecture │
├─────────────────────────────────────────────────────────────┤
│ Monitoring Layer │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ TracingManager │ │ Jaeger Agent │ │
│ └─────────────────┘ └─────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Infrastructure Layer │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ CeleryTaskManager│ │ WebSocketManager│ │
│ └─────────────────┘ └─────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Domain Layer │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ TaskService │ │ DSLProcessor │ │
│ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Upstream Callers
TaskService: Task management service that needs to trace task execution flows
DSLProcessor: DSL processor that needs to trace plan generation and execution
CeleryTaskManager: Task executor that needs to trace task scheduling
WebSocketManager: WebSocket manager that needs to trace connection handling
Downstream Dependencies
Jaeger Agent: Tracing data collection agent
Jaeger Collector: Tracing data collector
Jaeger UI: Tracing data visualization interface
OpenTracing: Distributed tracing standard library
4. Core Features & Use Cases
4.1 Basic Tracing
Manual Span Creation
# Create tracing manager
tracing = TracingManager(service_name="task_executor")
# Manually create and finish Span
span = tracing.start_span("process_user_request", tags={"user_id": "123"})
try:
result = await process_request()
tracing.finish_span(span, tags={"success": True})
except Exception as e:
tracing.finish_span(span, error=e)
raise
Automatic Tracing with Decorator
# Use decorator to automatically trace functions
@tracing.with_tracing("parse_user_intent", tags={"component": "nlp"})
async def parse_user_intent(user_input: str) -> Dict[str, Any]:
"""Parse user intent"""
return intent_result
@tracing.with_tracing("generate_task_plan", tags={"component": "planner"})
async def generate_task_plan(intent: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Generate task plan"""
return plan_steps
4.2 Database Operation Tracing
Database Query Tracing
# Trace database operations
@tracing.trace_database_operation("SELECT", table="tasks", query="SELECT * FROM tasks WHERE status = 'pending'")
async def get_pending_tasks() -> List[Dict[str, Any]]:
"""Get pending tasks"""
return await db.fetch_all("SELECT * FROM tasks WHERE status = 'pending'")
@tracing.trace_database_operation("INSERT", table="task_results")
async def save_task_result(result: Dict[str, Any]) -> int:
"""Save task result"""
return await db.execute("INSERT INTO task_results ...")
4.3 External Service Call Tracing
HTTP Service Call Tracing
# Trace external service calls
@tracing.trace_external_call("ml_service", endpoint="/api/predict")
async def call_ml_service(data: Dict[str, Any]) -> Dict[str, Any]:
"""Call machine learning service"""
async with httpx.AsyncClient() as client:
response = await client.post("http://ml-service/api/predict", json=data)
return response.json()
@tracing.trace_external_call("notification_service", endpoint="/api/send")
async def send_notification(user_id: str, message: str) -> bool:
"""Send notification"""
async with httpx.AsyncClient() as client:
response = await client.post("http://notification-service/api/send",
json={"user_id": user_id, "message": message})
return response.status_code == 200
4.4 Tool Execution Tracing
Tool Call Tracing
# Trace tool execution
@tracing.trace_tool_execution("data_processor", "clean_data")
async def clean_data(raw_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Clean data"""
return cleaned_data
@tracing.trace_tool_execution("ml_model", "train")
async def train_model(training_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Train model"""
return model_info
4.5 Cross-Service Tracing
Context Propagation
# Inter-service context propagation
async def process_request_with_context(request_data: Dict[str, Any], headers: Dict[str, str]):
"""Process request with context"""
# Extract tracing context from upstream service
span_context = tracing.extract_span_context(headers)
# Create child Span
span = tracing.start_span("process_request", parent_span=span_context)
try:
# Process request
result = await handle_request(request_data)
# Propagate context to downstream service
downstream_headers = {}
tracing.inject_span_context(span, downstream_headers)
# Call downstream service
await call_downstream_service(result, downstream_headers)
tracing.finish_span(span, tags={"success": True})
return result
except Exception as e:
tracing.finish_span(span, error=e)
raise
5. API Reference
5.1 Class Definition
TracingManager
class TracingManager:
"""Distributed tracing manager"""
def __init__(self, service_name: str = "service_executor",
jaeger_host: Optional[str] = None,
jaeger_port: Optional[int] = None,
enable_tracing: Optional[bool] = None) -> None
"""Initialize tracing manager
Args:
service_name: Service name
jaeger_host: Jaeger Agent host address
jaeger_port: Jaeger Agent port
enable_tracing: Whether to enable tracing
"""
5.2 Public Methods
start_span
def start_span(self, operation_name: str, parent_span: Optional[Span] = None,
tags: Optional[Dict[str, Any]] = None) -> Optional[Span]
Function: Start a new tracing Span
Parameters:
operation_name(str): Operation nameparent_span(Optional[Span]): Parent Spantags(Optional[Dict[str, Any]]): Initial tags
Returns:
Optional[Span]: Span object or None
finish_span
def finish_span(self, span: Optional[Span], tags: Optional[Dict[str, Any]] = None,
logs: Optional[Dict[str, Any]] = None, error: Optional[Exception] = None) -> None
Function: Finish tracing Span
Parameters:
span(Optional[Span]): Span to finishtags(Optional[Dict[str, Any]]): Additional tagslogs(Optional[Dict[str, Any]]): Log informationerror(Optional[Exception]): Error information
with_tracing
def with_tracing(self, operation_name: str, tags: Optional[Dict[str, Any]] = None) -> Callable
Function: Tracing decorator
Parameters:
operation_name(str): Operation nametags(Optional[Dict[str, Any]]): Initial tags
Returns:
Callable: Decorator function
trace_database_operation
def trace_database_operation(self, operation: str, table: str = None, query: str = None) -> Callable
Function: Database operation tracing decorator
Parameters:
operation(str): Database operation typetable(str): Table namequery(str): SQL query
Returns:
Callable: Decorator function
trace_external_call
def trace_external_call(self, service_name: str, endpoint: str = None) -> Callable
Function: External service call tracing decorator
Parameters:
service_name(str): Service nameendpoint(str): Endpoint path
Returns:
Callable: Decorator function
trace_tool_execution
def trace_tool_execution(self, tool_name: str, operation: str) -> Callable
Function: Tool execution tracing decorator
Parameters:
tool_name(str): Tool nameoperation(str): Operation name
Returns:
Callable: Decorator function
6. Technical Implementation Details
6.1 Jaeger Integration
Configuration Initialization
def _init_tracer(self):
"""Initialize Jaeger tracer"""
config = jaeger_client.config.Config(
config={
'sampler': {
'type': 'const', # Constant sampling
'param': 1, # 100% sampling
},
'local_agent': {
'reporting_host': self.jaeger_host,
'reporting_port': self.jaeger_port,
},
'logging': True,
},
service_name=self.service_name,
validate=True
)
self.tracer = config.initialize_tracer()
6.2 Decorator Implementation
Async Function Support
def with_tracing(self, operation_name: str, tags: Optional[Dict[str, Any]] = None):
"""Tracing decorator implementation"""
def decorator(func):
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
if not self.enable_tracing or not self.tracer:
return await func(*args, **kwargs)
span = self.start_span(operation_name, tags=tags)
try:
# Add function arguments to Span
self._add_function_args_to_span(span, args, kwargs)
result = await func(*args, **kwargs)
if span:
span.set_tag("success", True)
return result
except Exception as e:
self.finish_span(span, error=e)
raise
finally:
if span and not span.finished:
self.finish_span(span)
return async_wrapper
return decorator
6.3 Context Propagation
Cross-Service Context Propagation
def inject_span_context(self, span: Optional[Span], carrier: Dict[str, str]):
"""Inject Span context into carrier"""
if not self.enable_tracing or not span or not self.tracer:
return
try:
from opentracing.propagation import Format
self.tracer.inject(span.context, Format.TEXT_MAP, carrier)
except Exception as e:
logger.error(f"Error injecting span context: {e}")
def extract_span_context(self, carrier: Dict[str, str]) -> Optional[Any]:
"""Extract Span context from carrier"""
if not self.enable_tracing or not self.tracer:
return None
try:
from opentracing.propagation import Format
return self.tracer.extract(Format.TEXT_MAP, carrier)
except Exception as e:
logger.error(f"Error extracting span context: {e}")
return None
7. Configuration & Deployment
7.1 Basic Configuration
Environment Variable Configuration
# Jaeger configuration
export JAEGER_AGENT_HOST="jaeger"
export JAEGER_AGENT_PORT="6831"
export JAEGER_ENABLE_TRACING="true"
# Service configuration
export SERVICE_NAME="task_executor"
Code Configuration
# Basic configuration
tracing = TracingManager(
service_name="task_executor",
jaeger_host="jaeger",
jaeger_port=6831,
enable_tracing=True
)
7.2 Docker Deployment
Docker Compose Configuration
version: '3.8'
services:
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686" # Jaeger UI
- "14268:14268" # HTTP collector
- "6831:6831/udp" # UDP agent
environment:
- COLLECTOR_OTLP_ENABLED=true
task-executor:
build: .
environment:
- JAEGER_AGENT_HOST=jaeger
- JAEGER_AGENT_PORT=6831
- JAEGER_ENABLE_TRACING=true
depends_on:
- jaeger
8. Maintenance & Troubleshooting
8.1 Common Issues & Solutions
Issue 1: Jaeger Connection Failure
Symptoms: Failed to initialize Jaeger tracer error
Solution:
# Check Jaeger service status
import socket
def check_jaeger_connection(host: str, port: int) -> bool:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(1)
sock.connect((host, port))
sock.close()
return True
except:
return False
# Retry mechanism
def init_tracer_with_retry(max_retries: int = 3):
for attempt in range(max_retries):
try:
tracing = TracingManager()
if tracing.tracer:
return tracing
except Exception as e:
if attempt == max_retries - 1:
raise
time.sleep(1)
Issue 2: Span Creation Failed
Symptoms: Span creation returns None
Solution:
# Check tracing status
def validate_tracing_status(tracing: TracingManager):
info = tracing.get_tracer_info()
if not info["enabled"]:
print("Tracing disabled")
return False
if not info["tracer_initialized"]:
print("Tracer not initialized")
return False
return True
9. Visualizations
9.1 System Architecture Diagram
graph TB
subgraph "Application Layer"
APP[AIECS Application]
TS[TaskService]
DS[DSLProcessor]
end
subgraph "Tracing Layer"
TM[TracingManager]
JA[Jaeger Agent]
JC[Jaeger Collector]
JU[Jaeger UI]
end
subgraph "Infrastructure Layer"
CTM[CeleryTaskManager]
WSM[WebSocketManager]
end
APP --> TM
TS --> TM
DS --> TM
CTM --> TM
WSM --> TM
TM --> JA
JA --> JC
JC --> JU
9.2 Tracing Flow Diagram
sequenceDiagram
participant Client
participant Service1
participant Service2
participant Jaeger
Client->>Service1: Request (with tracing headers)
Service1->>Service1: Create Span
Service1->>Service2: Call (propagate context)
Service2->>Service2: Create Child Span
Service2->>Jaeger: Send Tracing Data
Service2->>Service1: Return Result
Service1->>Jaeger: Send Tracing Data
Service1->>Client: Return Result
10. Version History
v1.0.0 (2024-01-15)
New Features:
Basic Jaeger integration
Support manual and decorator tracing
Implement context propagation
Provide various tracing decorators
v1.1.0 (2024-02-01)
Feature Enhancements:
Add database operation tracing
Support external service call tracing
Implement tool execution tracing
Enhance error handling
v1.2.0 (2024-03-01)
New Features:
Support async function tracing
Add automatic function argument recording
Implement Span context management
Provide tracing information query
Appendix
B. External Dependencies
C. Contact Information
Technical Lead: AIECS Development Team
Issue Reporting: Through project Issue system
Documentation Updates: Regular maintenance, version synchronization