Examples
Table of Contents
Basic Examples
Example 1: Creating a Research Community
import asyncio
from aiecs.domain import (
CommunityBuilder, CommunityManager, DecisionEngine,
CommunicationHub, ResourceManager, CollaborativeWorkflowEngine,
GovernanceType, CommunityRole, ConsensusAlgorithm, ResourceType
)
async def create_research_community():
"""Create a research community with democratic governance."""
# Initialize components
integration = CommunityIntegration()
community_manager = CommunityManager(integration)
decision_engine = DecisionEngine(community_manager)
communication_hub = CommunicationHub(community_manager)
resource_manager = ResourceManager(community_manager)
workflow_engine = CollaborativeWorkflowEngine(community_manager)
# Create community using builder
community = (CommunityBuilder(integration)
.with_name("AI Research Lab")
.with_description("Collaborative AI research and development community")
.with_governance(GovernanceType.DEMOCRATIC)
.with_capacity(20)
.with_template("research_team", {
"default_roles": ["researcher", "reviewer", "coordinator"],
"auto_approve_members": False,
"voting_threshold": 0.6
})
.build())
print(f"Created community: {community.name} (ID: {community.community_id})")
return community
# Run the example
if __name__ == "__main__":
asyncio.run(create_research_community())
Example 2: Adding Members and Managing Roles
async def manage_community_members():
"""Demonstrate member management operations."""
integration = CommunityIntegration()
community_manager = CommunityManager(integration)
# Create a community
community = await community_manager.create_community(
name="Development Team",
description="Software development collaboration",
governance_type=GovernanceType.HIERARCHICAL,
creator_agent_id="admin_agent",
capacity=10
)
# Add members with different roles
members = [
("lead_dev", CommunityRole.ADMIN, {"expertise": "full-stack", "experience": "5_years"}),
("senior_dev", CommunityRole.MODERATOR, {"expertise": "backend", "experience": "3_years"}),
("junior_dev", CommunityRole.MEMBER, {"expertise": "frontend", "experience": "1_year"}),
("qa_engineer", CommunityRole.MEMBER, {"expertise": "testing", "experience": "2_years"}),
("observer", CommunityRole.OBSERVER, {"expertise": "product", "experience": "4_years"})
]
for agent_id, role, metadata in members:
member = await community_manager.add_member(
community.community_id, agent_id, role, metadata
)
print(f"Added {agent_id} as {role.value}")
# Update a member's role
await community_manager.update_member_role(
community.community_id, "junior_dev", CommunityRole.MODERATOR
)
print("Promoted junior_dev to moderator")
# List all members
community = await community_manager.get_community(community.community_id)
for member in community.members:
print(f"Member: {member.agent_id}, Role: {member.role.value}")
asyncio.run(manage_community_members())
Example 3: Decision Making Process
async def decision_making_example():
"""Demonstrate the complete decision-making process."""
integration = CommunityIntegration()
community_manager = CommunityManager(integration)
decision_engine = DecisionEngine(community_manager)
# Create community and add members
community = await community_manager.create_community(
name="Product Team",
description="Product development decisions",
governance_type=GovernanceType.DEMOCRATIC,
creator_agent_id="product_manager",
capacity=8
)
# Add team members
team_members = ["product_manager", "designer", "developer1", "developer2", "qa_lead"]
for member_id in team_members:
await community_manager.add_member(
community.community_id, member_id, CommunityRole.MEMBER
)
# Propose a decision
decision = await decision_engine.propose_decision(
community_id=community.community_id,
proposer_id="product_manager",
title="Choose Next Feature Priority",
description="Which feature should we prioritize for the next sprint?",
options=["User Authentication", "Dashboard Redesign", "API Integration", "Mobile App"],
algorithm=ConsensusAlgorithm.SUPERMAJORITY
)
print(f"Decision proposed: {decision.title}")
print(f"Options: {decision.options}")
# Cast votes
votes = [
("product_manager", "User Authentication", 2.0), # Weighted vote
("designer", "Dashboard Redesign", 1.0),
("developer1", "API Integration", 1.0),
("developer2", "API Integration", 1.0),
("qa_lead", "User Authentication", 1.0)
]
for voter_id, choice, weight in votes:
await decision_engine.cast_vote(decision.decision_id, voter_id, choice, weight)
print(f"{voter_id} voted for {choice} (weight: {weight})")
# Evaluate the decision
passed, details = await decision_engine.evaluate_decision(
decision.decision_id, community.community_id, ConsensusAlgorithm.SUPERMAJORITY
)
print(f"\nDecision Result:")
print(f"Passed: {passed}")
print(f"Details: {details}")
if passed:
winning_option = details.get('winning_option')
print(f"Winning option: {winning_option}")
asyncio.run(decision_making_example())
Advanced Use Cases
Example 4: Resource Sharing and Collaboration
async def resource_sharing_example():
"""Demonstrate resource sharing and collaborative work."""
integration = CommunityIntegration()
community_manager = CommunityManager(integration)
resource_manager = ResourceManager(community_manager)
communication_hub = CommunicationHub(community_manager)
# Create a data science community
community = await community_manager.create_community(
name="Data Science Team",
description="Collaborative data science and ML research",
governance_type=GovernanceType.DEMOCRATIC,
creator_agent_id="data_lead",
capacity=12
)
# Add team members
team_members = ["data_lead", "ml_engineer", "data_analyst", "researcher"]
for member_id in team_members:
await community_manager.add_member(
community.community_id, member_id, CommunityRole.MEMBER
)
# Create and share a dataset
dataset_resource = await resource_manager.create_resource(
community_id=community.community_id,
creator_id="data_lead",
name="Customer Behavior Dataset",
resource_type=ResourceType.DATA,
data={
"file_path": "/data/customer_behavior_2024.parquet",
"size": "1.2GB",
"records": 100000,
"features": 45,
"schema": {
"user_id": "string",
"timestamp": "datetime",
"action": "string",
"value": "float"
}
},
metadata={
"description": "Customer interaction data for Q1 2024",
"version": "1.0",
"privacy_level": "internal",
"retention_days": 365
}
)
print(f"Created dataset resource: {dataset_resource.name}")
# Share with specific team members
await resource_manager.share_resource(
resource_id=dataset_resource.resource_id,
from_agent_id="data_lead",
to_agent_ids=["ml_engineer", "data_analyst"],
permissions={
"read": True,
"write": False,
"share": False,
"download": True
}
)
# Create a trained model resource
model_resource = await resource_manager.create_resource(
community_id=community.community_id,
creator_id="ml_engineer",
name="Customer Churn Prediction Model",
resource_type=ResourceType.MODEL,
data={
"model_path": "/models/churn_prediction_v2.pkl",
"algorithm": "Random Forest",
"accuracy": 0.89,
"precision": 0.87,
"recall": 0.91,
"training_data": "customer_behavior_2024",
"features_used": ["action_frequency", "session_duration", "page_views"]
},
metadata={
"framework": "scikit-learn",
"python_version": "3.9",
"dependencies": ["pandas", "numpy", "scikit-learn"],
"model_version": "2.0"
}
)
# Share model with all team members
all_members = [member.agent_id for member in community.members]
await resource_manager.share_resource(
resource_id=model_resource.resource_id,
from_agent_id="ml_engineer",
to_agent_ids=all_members,
permissions={
"read": True,
"inference": True,
"evaluate": True,
"retrain": False
}
)
print(f"Shared model with {len(all_members)} team members")
# Send notification about new resources
await communication_hub.broadcast_message(
sender_id="data_lead",
community_id=community.community_id,
content="New dataset and model resources are now available for the team!",
message_type=MessageType.NOTIFICATION
)
asyncio.run(resource_sharing_example())
Example 5: Collaborative Workflow Execution
async def collaborative_workflow_example():
"""Demonstrate multi-agent collaborative workflow execution."""
integration = CommunityIntegration()
community_manager = CommunityManager(integration)
workflow_engine = CollaborativeWorkflowEngine(community_manager)
communication_hub = CommunicationHub(community_manager)
# Create a research community
community = await community_manager.create_community(
name="Research Collaboration",
description="Multi-agent research workflow",
governance_type=GovernanceType.CONSENSUS,
creator_agent_id="research_coordinator",
capacity=6
)
# Add research team members
researchers = ["research_coordinator", "data_scientist", "ml_engineer", "domain_expert"]
for researcher_id in researchers:
await community_manager.add_member(
community.community_id, researcher_id, CommunityRole.MEMBER
)
# Start a collaborative research session
session = await workflow_engine.start_collaboration_session(
community_id=community.community_id,
session_name="Market Analysis Research",
participants=researchers,
workflow_config={
"workflow_type": "research_analysis",
"steps": [
{
"name": "data_collection",
"executor": "data_scientist",
"timeout_minutes": 30,
"dependencies": []
},
{
"name": "data_preprocessing",
"executor": "data_scientist",
"timeout_minutes": 20,
"dependencies": ["data_collection"]
},
{
"name": "model_training",
"executor": "ml_engineer",
"timeout_minutes": 45,
"dependencies": ["data_preprocessing"]
},
{
"name": "domain_analysis",
"executor": "domain_expert",
"timeout_minutes": 25,
"dependencies": ["model_training"]
},
{
"name": "report_generation",
"executor": "research_coordinator",
"timeout_minutes": 15,
"dependencies": ["domain_analysis"]
}
],
"parallel_execution": True,
"checkpoint_frequency": 5
}
)
print(f"Started collaboration session: {session.session_name}")
# Execute workflow steps
try:
# Step 1: Data Collection
data_result = await workflow_engine.execute_workflow_step(
session_id=session.session_id,
step_name="data_collection",
executor_id="data_scientist",
input_data={
"data_sources": ["market_data_2024", "competitor_analysis"],
"collection_method": "api_and_web_scraping"
}
)
print(f"Data collection completed: {data_result['records_collected']} records")
# Step 2: Data Preprocessing
processed_data = await workflow_engine.execute_workflow_step(
session_id=session.session_id,
step_name="data_preprocessing",
executor_id="data_scientist",
input_data={
"raw_data": data_result,
"cleaning_rules": ["remove_duplicates", "handle_missing_values"],
"feature_engineering": ["normalization", "encoding"]
}
)
print(f"Data preprocessing completed: {processed_data['features_created']} features")
# Step 3: Model Training (can run in parallel with domain analysis)
model_task = asyncio.create_task(
workflow_engine.execute_workflow_step(
session_id=session.session_id,
step_name="model_training",
executor_id="ml_engineer",
input_data={
"processed_data": processed_data,
"algorithms": ["random_forest", "gradient_boosting"],
"validation_strategy": "cross_validation"
}
)
)
# Step 4: Domain Analysis (parallel execution)
domain_task = asyncio.create_task(
workflow_engine.execute_workflow_step(
session_id=session.session_id,
step_name="domain_analysis",
executor_id="domain_expert",
input_data={
"market_context": "technology_sector",
"analysis_framework": "swot_analysis",
"stakeholders": ["customers", "competitors", "regulators"]
}
)
)
# Wait for parallel tasks to complete
model_result, domain_result = await asyncio.gather(model_task, domain_task)
print(f"Model training completed: {model_result['best_algorithm']} (accuracy: {model_result['accuracy']})")
print(f"Domain analysis completed: {domain_result['key_insights']} insights")
# Step 5: Report Generation
final_report = await workflow_engine.execute_workflow_step(
session_id=session.session_id,
step_name="report_generation",
executor_id="research_coordinator",
input_data={
"model_results": model_result,
"domain_insights": domain_result,
"report_format": "executive_summary",
"include_visualizations": True
}
)
print(f"Research report generated: {final_report['report_path']}")
# Notify team of completion
await communication_hub.broadcast_message(
sender_id="research_coordinator",
community_id=community.community_id,
content=f"Research session '{session.session_name}' completed successfully!",
message_type=MessageType.NOTIFICATION
)
except Exception as e:
print(f"Workflow execution failed: {e}")
# Handle workflow failure and cleanup
asyncio.run(collaborative_workflow_example())
Integration Examples
Example 6: Custom Agent Adapter
class CustomResearchAgent(AgentAdapter):
"""Custom agent adapter for research-specific tasks."""
def __init__(self, agent_id: str, specialization: str, llm_client=None):
self.agent_id = agent_id
self.specialization = specialization
self.llm_client = llm_client
self.research_tools = ["data_analysis", "literature_review", "experiment_design"]
async def process_message(self, message: Message) -> Any:
"""Process incoming messages based on type and content."""
if message.message_type == MessageType.REQUEST:
return await self.handle_research_request(message.content)
elif message.message_type == MessageType.SHARE:
return await self.handle_resource_share(message.content)
else:
return await self.handle_general_message(message.content)
async def get_capabilities(self) -> List[AgentCapability]:
"""Return agent capabilities."""
return [
AgentCapability(
name="data_analysis",
description="Statistical analysis and data visualization",
parameters={
"methods": ["descriptive", "inferential", "predictive"],
"tools": ["pandas", "numpy", "matplotlib", "seaborn"]
}
),
AgentCapability(
name="literature_review",
description="Academic literature analysis and synthesis",
parameters={
"sources": ["academic_papers", "conference_proceedings", "journals"],
"languages": ["english", "chinese", "spanish"]
}
),
AgentCapability(
name="experiment_design",
description="Design and analyze research experiments",
parameters={
"types": ["controlled", "observational", "quasi_experimental"],
"statistical_tests": ["t_test", "anova", "chi_square"]
}
)
]
async def handle_research_request(self, content: Any) -> Any:
"""Handle research-specific requests."""
request_type = content.get("type")
if request_type == "data_analysis":
return await self.perform_data_analysis(content.get("data"))
elif request_type == "literature_review":
return await self.perform_literature_review(content.get("topic"))
elif request_type == "experiment_design":
return await self.design_experiment(content.get("hypothesis"))
else:
return {"error": f"Unknown request type: {request_type}"}
async def perform_data_analysis(self, data: Any) -> Dict[str, Any]:
"""Perform data analysis using specialized tools."""
# Simulate data analysis
analysis_result = {
"summary_statistics": {
"mean": 42.5,
"std": 12.3,
"min": 15.2,
"max": 89.7
},
"correlations": {
"feature_a_feature_b": 0.73,
"feature_a_feature_c": -0.45
},
"insights": [
"Strong positive correlation between feature A and B",
"Negative correlation suggests inverse relationship"
]
}
return analysis_result
async def perform_literature_review(self, topic: str) -> Dict[str, Any]:
"""Perform literature review on given topic."""
# Simulate literature review
review_result = {
"topic": topic,
"papers_found": 45,
"key_papers": [
{"title": "Recent Advances in AI", "authors": "Smith et al.", "year": 2023},
{"title": "Machine Learning Applications", "authors": "Johnson et al.", "year": 2023}
],
"research_gaps": [
"Limited research on real-time applications",
"Need for more diverse datasets"
],
"recommendations": [
"Focus on practical implementations",
"Consider ethical implications"
]
}
return review_result
async def design_experiment(self, hypothesis: str) -> Dict[str, Any]:
"""Design an experiment to test the hypothesis."""
# Simulate experiment design
experiment_design = {
"hypothesis": hypothesis,
"experiment_type": "controlled_experiment",
"variables": {
"independent": ["treatment_group", "control_group"],
"dependent": ["performance_metric", "user_satisfaction"]
},
"sample_size": 100,
"duration_weeks": 4,
"statistical_tests": ["t_test", "chi_square"],
"expected_outcomes": [
"Significant difference between groups",
"Effect size > 0.5"
]
}
return experiment_design
# Usage example
async def custom_agent_example():
"""Demonstrate custom agent adapter usage."""
# Create custom research agent
research_agent = CustomResearchAgent(
agent_id="research_agent_001",
specialization="machine_learning",
llm_client=None # Would be actual LLM client
)
# Register with adapter registry
registry = AgentAdapterRegistry()
registry.register_adapter("research_agent_001", research_agent)
# Get agent capabilities
capabilities = await research_agent.get_capabilities()
print("Agent capabilities:")
for cap in capabilities:
print(f"- {cap.name}: {cap.description}")
# Test message processing
test_message = Message(
sender_id="coordinator",
recipient_ids=["research_agent_001"],
message_type=MessageType.REQUEST,
content={
"type": "data_analysis",
"data": {"dataset": "customer_behavior", "size": 10000}
}
)
result = await research_agent.process_message(test_message)
print(f"Analysis result: {result}")
asyncio.run(custom_agent_example())
Real-World Scenarios
Example 7: Academic Research Collaboration
async def academic_research_scenario():
"""Simulate an academic research collaboration scenario."""
integration = CommunityIntegration()
community_manager = CommunityManager(integration)
decision_engine = DecisionEngine(community_manager)
resource_manager = ResourceManager(community_manager)
workflow_engine = CollaborativeWorkflowEngine(community_manager)
# Create research lab community
research_lab = await community_manager.create_community(
name="AI Ethics Research Lab",
description="Interdisciplinary research on AI ethics and fairness",
governance_type=GovernanceType.DEMOCRATIC,
creator_agent_id="lab_director",
capacity=15
)
# Add interdisciplinary team members
team_members = [
("lab_director", CommunityRole.ADMIN, {"field": "computer_science", "expertise": "AI_ethics"}),
("ethics_professor", CommunityRole.MODERATOR, {"field": "philosophy", "expertise": "moral_philosophy"}),
("law_professor", CommunityRole.MODERATOR, {"field": "law", "expertise": "technology_law"}),
("psychologist", CommunityRole.MEMBER, {"field": "psychology", "expertise": "human_behavior"}),
("data_scientist", CommunityRole.MEMBER, {"field": "data_science", "expertise": "ML_fairness"}),
("sociologist", CommunityRole.MEMBER, {"field": "sociology", "expertise": "social_impact"}),
("graduate_student1", CommunityRole.MEMBER, {"field": "computer_science", "expertise": "NLP"}),
("graduate_student2", CommunityRole.MEMBER, {"field": "philosophy", "expertise": "ethics"}),
]
for member_id, role, metadata in team_members:
await community_manager.add_member(
research_lab.community_id, member_id, role, metadata
)
# Propose research direction decision
research_decision = await decision_engine.propose_decision(
community_id=research_lab.community_id,
proposer_id="lab_director",
title="Choose Primary Research Focus for Next Year",
description="Which area of AI ethics should be our primary research focus?",
options=[
"Algorithmic Bias in Hiring Systems",
"Privacy in AI-Powered Healthcare",
"Autonomous Vehicle Decision Making",
"AI in Criminal Justice Systems"
],
algorithm=ConsensusAlgorithm.WEIGHTED_VOTING
)
# Cast weighted votes based on expertise
votes = [
("lab_director", "Algorithmic Bias in Hiring Systems", 2.0),
("ethics_professor", "AI in Criminal Justice Systems", 2.0),
("law_professor", "Privacy in AI-Powered Healthcare", 2.0),
("psychologist", "Algorithmic Bias in Hiring Systems", 1.5),
("data_scientist", "Algorithmic Bias in Hiring Systems", 2.0),
("sociologist", "AI in Criminal Justice Systems", 1.5),
("graduate_student1", "Autonomous Vehicle Decision Making", 1.0),
("graduate_student2", "AI in Criminal Justice Systems", 1.0),
]
for voter_id, choice, weight in votes:
await decision_engine.cast_vote(research_decision.decision_id, voter_id, choice, weight)
# Evaluate decision
passed, details = await decision_engine.evaluate_decision(
research_decision.decision_id, research_lab.community_id
)
winning_topic = details.get('winning_option')
print(f"Research focus chosen: {winning_topic}")
# Create shared research resources
literature_review = await resource_manager.create_resource(
community_id=research_lab.community_id,
creator_id="ethics_professor",
name="AI Ethics Literature Review 2024",
resource_type=ResourceType.KNOWLEDGE,
data={
"papers_reviewed": 150,
"key_themes": ["fairness", "transparency", "accountability", "privacy"],
"research_gaps": ["cross-cultural studies", "long-term impact assessment"],
"bibliography": "ai_ethics_bibliography_2024.bib"
},
metadata={
"review_period": "2024",
"languages": ["english", "chinese", "spanish"],
"update_frequency": "quarterly"
}
)
# Start collaborative research session
research_session = await workflow_engine.start_collaboration_session(
community_id=research_lab.community_id,
session_name=f"Research on {winning_topic}",
participants=[member[0] for member in team_members],
workflow_config={
"workflow_type": "academic_research",
"phases": [
"literature_review",
"hypothesis_development",
"experimental_design",
"data_collection",
"analysis",
"paper_writing"
],
"timeline_months": 12,
"milestones": [
"Literature review complete",
"Research proposal approved",
"Data collection finished",
"Analysis complete",
"Paper submitted"
]
}
)
print(f"Research session started: {research_session.session_name}")
return research_lab, research_session
asyncio.run(academic_research_scenario())
Example 8: Corporate Innovation Team
async def corporate_innovation_scenario():
"""Simulate a corporate innovation team scenario."""
integration = CommunityIntegration()
community_manager = CommunityManager(integration)
decision_engine = DecisionEngine(community_manager)
resource_manager = ResourceManager(community_manager)
workflow_engine = CollaborativeWorkflowEngine(community_manager)
# Create innovation team
innovation_team = await community_manager.create_community(
name="Product Innovation Team",
description="Cross-functional team for product innovation and development",
governance_type=GovernanceType.HIERARCHICAL,
creator_agent_id="vp_product",
capacity=12
)
# Add cross-functional team members
team_members = [
("vp_product", CommunityRole.ADMIN, {"department": "product", "level": "executive"}),
("product_manager", CommunityRole.MODERATOR, {"department": "product", "level": "senior"}),
("engineering_lead", CommunityRole.MODERATOR, {"department": "engineering", "level": "senior"}),
("design_lead", CommunityRole.MODERATOR, {"department": "design", "level": "senior"}),
("marketing_manager", CommunityRole.MEMBER, {"department": "marketing", "level": "senior"}),
("sales_representative", CommunityRole.MEMBER, {"department": "sales", "level": "senior"}),
("data_analyst", CommunityRole.MEMBER, {"department": "analytics", "level": "mid"}),
("ux_researcher", CommunityRole.MEMBER, {"department": "design", "level": "mid"}),
("backend_engineer", CommunityRole.MEMBER, {"department": "engineering", "level": "mid"}),
("frontend_engineer", CommunityRole.MEMBER, {"department": "engineering", "level": "mid"}),
]
for member_id, role, metadata in team_members:
await community_manager.add_member(
innovation_team.community_id, member_id, role, metadata
)
# Propose product feature decision
feature_decision = await decision_engine.propose_decision(
community_id=innovation_team.community_id,
proposer_id="product_manager",
title="Q2 Product Feature Priority",
description="Which feature should we prioritize for Q2 development?",
options=[
"AI-Powered Recommendations",
"Real-time Collaboration Tools",
"Advanced Analytics Dashboard",
"Mobile App Redesign"
],
algorithm=ConsensusAlgorithm.SUPERMAJORITY
)
# Cast votes with business context
votes = [
("vp_product", "AI-Powered Recommendations", 3.0), # Executive weight
("product_manager", "Real-time Collaboration Tools", 2.0),
("engineering_lead", "Advanced Analytics Dashboard", 2.0),
("design_lead", "Mobile App Redesign", 2.0),
("marketing_manager", "AI-Powered Recommendations", 1.5),
("sales_representative", "Real-time Collaboration Tools", 1.5),
("data_analyst", "Advanced Analytics Dashboard", 2.0),
("ux_researcher", "Mobile App Redesign", 1.5),
("backend_engineer", "AI-Powered Recommendations", 1.0),
("frontend_engineer", "Mobile App Redesign", 1.0),
]
for voter_id, choice, weight in votes:
await decision_engine.cast_vote(feature_decision.decision_id, voter_id, choice, weight)
# Evaluate decision
passed, details = await decision_engine.evaluate_decision(
feature_decision.decision_id, innovation_team.community_id
)
chosen_feature = details.get('winning_option')
print(f"Chosen feature for Q2: {chosen_feature}")
# Create shared product resources
market_research = await resource_manager.create_resource(
community_id=innovation_team.community_id,
creator_id="marketing_manager",
name="Market Research Q1 2024",
resource_type=ResourceType.DATA,
data={
"survey_responses": 5000,
"key_insights": [
"Users want more personalized experiences",
"Mobile usage increased 40%",
"Collaboration tools are highly requested"
],
"competitor_analysis": {
"competitor_a": {"market_share": 0.35, "strengths": ["AI", "UX"]},
"competitor_b": {"market_share": 0.25, "strengths": ["price", "features"]}
}
},
metadata={
"research_period": "Q1_2024",
"methodology": "online_survey",
"confidence_level": 0.95
}
)
# Start product development session
dev_session = await workflow_engine.start_collaboration_session(
community_id=innovation_team.community_id,
session_name=f"Development of {chosen_feature}",
participants=[member[0] for member in team_members],
workflow_config={
"workflow_type": "product_development",
"sprint_duration_weeks": 2,
"total_sprints": 6,
"phases": [
"requirements_gathering",
"design_and_prototyping",
"development",
"testing",
"deployment",
"monitoring"
],
"success_metrics": [
"user_adoption_rate",
"performance_improvement",
"customer_satisfaction"
]
}
)
print(f"Product development session started: {dev_session.session_name}")
return innovation_team, dev_session
asyncio.run(corporate_innovation_scenario())
Performance Examples
Example 9: High-Performance Community Operations
async def performance_optimization_example():
"""Demonstrate performance optimization techniques."""
integration = CommunityIntegration()
community_manager = CommunityManager(integration)
decision_engine = DecisionEngine(community_manager)
resource_manager = ResourceManager(community_manager)
# Create a large community
large_community = await community_manager.create_community(
name="Large Scale Research Community",
description="High-performance community for large-scale research",
governance_type=GovernanceType.DEMOCRATIC,
creator_agent_id="admin",
capacity=100
)
# Batch add members for performance
member_batch = []
for i in range(50): # Add 50 members
member_batch.append({
"agent_id": f"researcher_{i:03d}",
"role": CommunityRole.MEMBER,
"metadata": {"specialization": f"field_{i % 10}"}
})
# Add members in parallel
add_tasks = [
community_manager.add_member(
large_community.community_id,
member["agent_id"],
member["role"],
member["metadata"]
)
for member in member_batch
]
start_time = asyncio.get_event_loop().time()
await asyncio.gather(*add_tasks)
end_time = asyncio.get_event_loop().time()
print(f"Added {len(member_batch)} members in {end_time - start_time:.2f} seconds")
# Batch create resources
resource_batch = []
for i in range(20): # Create 20 resources
resource_batch.append({
"name": f"Dataset_{i:03d}",
"resource_type": ResourceType.DATA,
"data": {"size": f"{i * 100}MB", "records": i * 1000},
"creator_id": f"researcher_{i % 10:03d}"
})
# Create resources in parallel
create_tasks = [
resource_manager.create_resource(
community_id=large_community.community_id,
creator_id=resource["creator_id"],
name=resource["name"],
resource_type=resource["resource_type"],
data=resource["data"]
)
for resource in resource_batch
]
start_time = asyncio.get_event_loop().time()
resources = await asyncio.gather(*create_tasks)
end_time = asyncio.get_event_loop().time()
print(f"Created {len(resources)} resources in {end_time - start_time:.2f} seconds")
# Batch decision voting
decision = await decision_engine.propose_decision(
community_id=large_community.community_id,
proposer_id="admin",
title="Research Direction Vote",
description="Choose the primary research direction",
options=["AI", "ML", "Data Science", "Robotics"],
algorithm=ConsensusAlgorithm.SIMPLE_MAJORITY
)
# Cast votes in parallel
vote_tasks = []
for i in range(50):
choice = ["AI", "ML", "Data Science", "Robotics"][i % 4]
vote_tasks.append(
decision_engine.cast_vote(
decision.decision_id,
f"researcher_{i:03d}",
choice
)
)
start_time = asyncio.get_event_loop().time()
await asyncio.gather(*vote_tasks)
end_time = asyncio.get_event_loop().time()
print(f"Cast {len(vote_tasks)} votes in {end_time - start_time:.2f} seconds")
# Evaluate decision
passed, details = await decision_engine.evaluate_decision(
decision.decision_id, large_community.community_id
)
print(f"Decision result: {details.get('winning_option')} (passed: {passed})")
asyncio.run(performance_optimization_example())
Error Handling Examples
Example 10: Comprehensive Error Handling
async def error_handling_example():
"""Demonstrate comprehensive error handling patterns."""
integration = CommunityIntegration()
community_manager = CommunityManager(integration)
decision_engine = DecisionEngine(community_manager)
resource_manager = ResourceManager(community_manager)
# Create a test community
community = await community_manager.create_community(
name="Error Handling Test Community",
description="Community for testing error scenarios",
governance_type=GovernanceType.DEMOCRATIC,
creator_agent_id="test_admin",
capacity=5
)
# Test 1: Community not found error
try:
await community_manager.get_community("nonexistent_community")
except CommunityNotFoundError as e:
print(f"Caught expected error: {e}")
# Test 2: Member already exists error
await community_manager.add_member(
community.community_id, "test_member", CommunityRole.MEMBER
)
try:
await community_manager.add_member(
community.community_id, "test_member", CommunityRole.MEMBER
)
except MembershipError as e:
print(f"Caught expected error: {e}")
# Test 3: Community capacity exceeded
try:
# Add more members than capacity allows
for i in range(10):
await community_manager.add_member(
community.community_id, f"member_{i}", CommunityRole.MEMBER
)
except CommunityCapacityError as e:
print(f"Caught expected error: {e}")
# Test 4: Resource access denied
resource = await resource_manager.create_resource(
community_id=community.community_id,
creator_id="test_admin",
name="Private Resource",
resource_type=ResourceType.DATA,
data={"sensitive": "data"}
)
try:
await resource_manager.get_resource(
resource.resource_id, "unauthorized_agent"
)
except AccessDeniedError as e:
print(f"Caught expected error: {e}")
# Test 5: Decision not found
try:
await decision_engine.evaluate_decision(
"nonexistent_decision", community.community_id
)
except DecisionNotFoundError as e:
print(f"Caught expected error: {e}")
# Test 6: Quorum not met
decision = await decision_engine.propose_decision(
community_id=community.community_id,
proposer_id="test_admin",
title="Test Decision",
description="A test decision",
options=["Yes", "No"],
algorithm=ConsensusAlgorithm.UNANIMOUS
)
# Cast only one vote (unanimous requires all)
await decision_engine.cast_vote(
decision.decision_id, "test_member", "Yes"
)
try:
await decision_engine.evaluate_decision(
decision.decision_id, community.community_id
)
except QuorumNotMetError as e:
print(f"Caught expected error: {e}")
# Test 7: Graceful error recovery
async def safe_community_operation():
"""Demonstrate safe error handling with recovery."""
try:
# Attempt risky operation
result = await community_manager.add_member(
"nonexistent_community", "new_member", CommunityRole.MEMBER
)
return result
except CommunityNotFoundError:
print("Community not found, creating new one...")
# Recovery: create the community first
new_community = await community_manager.create_community(
name="Recovery Community",
description="Created during error recovery",
governance_type=GovernanceType.DEMOCRATIC,
creator_agent_id="recovery_admin",
capacity=10
)
# Retry the operation
return await community_manager.add_member(
new_community.community_id, "new_member", CommunityRole.MEMBER
)
except Exception as e:
print(f"Unexpected error: {e}")
return None
result = await safe_community_operation()
if result:
print(f"Operation succeeded: {result.agent_id}")
# Test 8: Retry mechanism with exponential backoff
async def retry_with_backoff(operation, max_retries=3):
"""Retry operation with exponential backoff."""
for attempt in range(max_retries):
try:
return await operation()
except Exception as e:
if attempt == max_retries - 1:
raise e
wait_time = 2 ** attempt # Exponential backoff
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
# Example usage of retry mechanism
async def unreliable_operation():
"""Simulate an unreliable operation."""
import random
if random.random() < 0.7: # 70% failure rate
raise Exception("Simulated failure")
return "Success!"
try:
result = await retry_with_backoff(unreliable_operation)
print(f"Retry succeeded: {result}")
except Exception as e:
print(f"All retries failed: {e}")
asyncio.run(error_handling_example())
These examples demonstrate the comprehensive capabilities of the DOMAIN_COMMUNITY module, from basic operations to advanced use cases, performance optimization, and robust error handling. Each example is designed to be practical and applicable to real-world scenarios.