Examples

Table of Contents

Basic Examples

Example 1: Creating a Research Community

import asyncio
from aiecs.domain import (
    CommunityBuilder, CommunityManager, DecisionEngine,
    CommunicationHub, ResourceManager, CollaborativeWorkflowEngine,
    GovernanceType, CommunityRole, ConsensusAlgorithm, ResourceType
)

async def create_research_community():
    """Create a research community with democratic governance."""
    
    # Initialize components
    integration = CommunityIntegration()
    community_manager = CommunityManager(integration)
    decision_engine = DecisionEngine(community_manager)
    communication_hub = CommunicationHub(community_manager)
    resource_manager = ResourceManager(community_manager)
    workflow_engine = CollaborativeWorkflowEngine(community_manager)
    
    # Create community using builder
    community = (CommunityBuilder(integration)
        .with_name("AI Research Lab")
        .with_description("Collaborative AI research and development community")
        .with_governance(GovernanceType.DEMOCRATIC)
        .with_capacity(20)
        .with_template("research_team", {
            "default_roles": ["researcher", "reviewer", "coordinator"],
            "auto_approve_members": False,
            "voting_threshold": 0.6
        })
        .build())
    
    print(f"Created community: {community.name} (ID: {community.community_id})")
    return community

# Run the example
if __name__ == "__main__":
    asyncio.run(create_research_community())

Example 2: Adding Members and Managing Roles

async def manage_community_members():
    """Demonstrate member management operations."""
    
    integration = CommunityIntegration()
    community_manager = CommunityManager(integration)
    
    # Create a community
    community = await community_manager.create_community(
        name="Development Team",
        description="Software development collaboration",
        governance_type=GovernanceType.HIERARCHICAL,
        creator_agent_id="admin_agent",
        capacity=10
    )
    
    # Add members with different roles
    members = [
        ("lead_dev", CommunityRole.ADMIN, {"expertise": "full-stack", "experience": "5_years"}),
        ("senior_dev", CommunityRole.MODERATOR, {"expertise": "backend", "experience": "3_years"}),
        ("junior_dev", CommunityRole.MEMBER, {"expertise": "frontend", "experience": "1_year"}),
        ("qa_engineer", CommunityRole.MEMBER, {"expertise": "testing", "experience": "2_years"}),
        ("observer", CommunityRole.OBSERVER, {"expertise": "product", "experience": "4_years"})
    ]
    
    for agent_id, role, metadata in members:
        member = await community_manager.add_member(
            community.community_id, agent_id, role, metadata
        )
        print(f"Added {agent_id} as {role.value}")
    
    # Update a member's role
    await community_manager.update_member_role(
        community.community_id, "junior_dev", CommunityRole.MODERATOR
    )
    print("Promoted junior_dev to moderator")
    
    # List all members
    community = await community_manager.get_community(community.community_id)
    for member in community.members:
        print(f"Member: {member.agent_id}, Role: {member.role.value}")

asyncio.run(manage_community_members())

Example 3: Decision Making Process

async def decision_making_example():
    """Demonstrate the complete decision-making process."""
    
    integration = CommunityIntegration()
    community_manager = CommunityManager(integration)
    decision_engine = DecisionEngine(community_manager)
    
    # Create community and add members
    community = await community_manager.create_community(
        name="Product Team",
        description="Product development decisions",
        governance_type=GovernanceType.DEMOCRATIC,
        creator_agent_id="product_manager",
        capacity=8
    )
    
    # Add team members
    team_members = ["product_manager", "designer", "developer1", "developer2", "qa_lead"]
    for member_id in team_members:
        await community_manager.add_member(
            community.community_id, member_id, CommunityRole.MEMBER
        )
    
    # Propose a decision
    decision = await decision_engine.propose_decision(
        community_id=community.community_id,
        proposer_id="product_manager",
        title="Choose Next Feature Priority",
        description="Which feature should we prioritize for the next sprint?",
        options=["User Authentication", "Dashboard Redesign", "API Integration", "Mobile App"],
        algorithm=ConsensusAlgorithm.SUPERMAJORITY
    )
    
    print(f"Decision proposed: {decision.title}")
    print(f"Options: {decision.options}")
    
    # Cast votes
    votes = [
        ("product_manager", "User Authentication", 2.0),  # Weighted vote
        ("designer", "Dashboard Redesign", 1.0),
        ("developer1", "API Integration", 1.0),
        ("developer2", "API Integration", 1.0),
        ("qa_lead", "User Authentication", 1.0)
    ]
    
    for voter_id, choice, weight in votes:
        await decision_engine.cast_vote(decision.decision_id, voter_id, choice, weight)
        print(f"{voter_id} voted for {choice} (weight: {weight})")
    
    # Evaluate the decision
    passed, details = await decision_engine.evaluate_decision(
        decision.decision_id, community.community_id, ConsensusAlgorithm.SUPERMAJORITY
    )
    
    print(f"\nDecision Result:")
    print(f"Passed: {passed}")
    print(f"Details: {details}")
    
    if passed:
        winning_option = details.get('winning_option')
        print(f"Winning option: {winning_option}")

asyncio.run(decision_making_example())

Advanced Use Cases

Example 4: Resource Sharing and Collaboration

async def resource_sharing_example():
    """Demonstrate resource sharing and collaborative work."""
    
    integration = CommunityIntegration()
    community_manager = CommunityManager(integration)
    resource_manager = ResourceManager(community_manager)
    communication_hub = CommunicationHub(community_manager)
    
    # Create a data science community
    community = await community_manager.create_community(
        name="Data Science Team",
        description="Collaborative data science and ML research",
        governance_type=GovernanceType.DEMOCRATIC,
        creator_agent_id="data_lead",
        capacity=12
    )
    
    # Add team members
    team_members = ["data_lead", "ml_engineer", "data_analyst", "researcher"]
    for member_id in team_members:
        await community_manager.add_member(
            community.community_id, member_id, CommunityRole.MEMBER
        )
    
    # Create and share a dataset
    dataset_resource = await resource_manager.create_resource(
        community_id=community.community_id,
        creator_id="data_lead",
        name="Customer Behavior Dataset",
        resource_type=ResourceType.DATA,
        data={
            "file_path": "/data/customer_behavior_2024.parquet",
            "size": "1.2GB",
            "records": 100000,
            "features": 45,
            "schema": {
                "user_id": "string",
                "timestamp": "datetime",
                "action": "string",
                "value": "float"
            }
        },
        metadata={
            "description": "Customer interaction data for Q1 2024",
            "version": "1.0",
            "privacy_level": "internal",
            "retention_days": 365
        }
    )
    
    print(f"Created dataset resource: {dataset_resource.name}")
    
    # Share with specific team members
    await resource_manager.share_resource(
        resource_id=dataset_resource.resource_id,
        from_agent_id="data_lead",
        to_agent_ids=["ml_engineer", "data_analyst"],
        permissions={
            "read": True,
            "write": False,
            "share": False,
            "download": True
        }
    )
    
    # Create a trained model resource
    model_resource = await resource_manager.create_resource(
        community_id=community.community_id,
        creator_id="ml_engineer",
        name="Customer Churn Prediction Model",
        resource_type=ResourceType.MODEL,
        data={
            "model_path": "/models/churn_prediction_v2.pkl",
            "algorithm": "Random Forest",
            "accuracy": 0.89,
            "precision": 0.87,
            "recall": 0.91,
            "training_data": "customer_behavior_2024",
            "features_used": ["action_frequency", "session_duration", "page_views"]
        },
        metadata={
            "framework": "scikit-learn",
            "python_version": "3.9",
            "dependencies": ["pandas", "numpy", "scikit-learn"],
            "model_version": "2.0"
        }
    )
    
    # Share model with all team members
    all_members = [member.agent_id for member in community.members]
    await resource_manager.share_resource(
        resource_id=model_resource.resource_id,
        from_agent_id="ml_engineer",
        to_agent_ids=all_members,
        permissions={
            "read": True,
            "inference": True,
            "evaluate": True,
            "retrain": False
        }
    )
    
    print(f"Shared model with {len(all_members)} team members")
    
    # Send notification about new resources
    await communication_hub.broadcast_message(
        sender_id="data_lead",
        community_id=community.community_id,
        content="New dataset and model resources are now available for the team!",
        message_type=MessageType.NOTIFICATION
    )

asyncio.run(resource_sharing_example())

Example 5: Collaborative Workflow Execution

async def collaborative_workflow_example():
    """Demonstrate multi-agent collaborative workflow execution."""
    
    integration = CommunityIntegration()
    community_manager = CommunityManager(integration)
    workflow_engine = CollaborativeWorkflowEngine(community_manager)
    communication_hub = CommunicationHub(community_manager)
    
    # Create a research community
    community = await community_manager.create_community(
        name="Research Collaboration",
        description="Multi-agent research workflow",
        governance_type=GovernanceType.CONSENSUS,
        creator_agent_id="research_coordinator",
        capacity=6
    )
    
    # Add research team members
    researchers = ["research_coordinator", "data_scientist", "ml_engineer", "domain_expert"]
    for researcher_id in researchers:
        await community_manager.add_member(
            community.community_id, researcher_id, CommunityRole.MEMBER
        )
    
    # Start a collaborative research session
    session = await workflow_engine.start_collaboration_session(
        community_id=community.community_id,
        session_name="Market Analysis Research",
        participants=researchers,
        workflow_config={
            "workflow_type": "research_analysis",
            "steps": [
                {
                    "name": "data_collection",
                    "executor": "data_scientist",
                    "timeout_minutes": 30,
                    "dependencies": []
                },
                {
                    "name": "data_preprocessing",
                    "executor": "data_scientist",
                    "timeout_minutes": 20,
                    "dependencies": ["data_collection"]
                },
                {
                    "name": "model_training",
                    "executor": "ml_engineer",
                    "timeout_minutes": 45,
                    "dependencies": ["data_preprocessing"]
                },
                {
                    "name": "domain_analysis",
                    "executor": "domain_expert",
                    "timeout_minutes": 25,
                    "dependencies": ["model_training"]
                },
                {
                    "name": "report_generation",
                    "executor": "research_coordinator",
                    "timeout_minutes": 15,
                    "dependencies": ["domain_analysis"]
                }
            ],
            "parallel_execution": True,
            "checkpoint_frequency": 5
        }
    )
    
    print(f"Started collaboration session: {session.session_name}")
    
    # Execute workflow steps
    try:
        # Step 1: Data Collection
        data_result = await workflow_engine.execute_workflow_step(
            session_id=session.session_id,
            step_name="data_collection",
            executor_id="data_scientist",
            input_data={
                "data_sources": ["market_data_2024", "competitor_analysis"],
                "collection_method": "api_and_web_scraping"
            }
        )
        print(f"Data collection completed: {data_result['records_collected']} records")
        
        # Step 2: Data Preprocessing
        processed_data = await workflow_engine.execute_workflow_step(
            session_id=session.session_id,
            step_name="data_preprocessing",
            executor_id="data_scientist",
            input_data={
                "raw_data": data_result,
                "cleaning_rules": ["remove_duplicates", "handle_missing_values"],
                "feature_engineering": ["normalization", "encoding"]
            }
        )
        print(f"Data preprocessing completed: {processed_data['features_created']} features")
        
        # Step 3: Model Training (can run in parallel with domain analysis)
        model_task = asyncio.create_task(
            workflow_engine.execute_workflow_step(
                session_id=session.session_id,
                step_name="model_training",
                executor_id="ml_engineer",
                input_data={
                    "processed_data": processed_data,
                    "algorithms": ["random_forest", "gradient_boosting"],
                    "validation_strategy": "cross_validation"
                }
            )
        )
        
        # Step 4: Domain Analysis (parallel execution)
        domain_task = asyncio.create_task(
            workflow_engine.execute_workflow_step(
                session_id=session.session_id,
                step_name="domain_analysis",
                executor_id="domain_expert",
                input_data={
                    "market_context": "technology_sector",
                    "analysis_framework": "swot_analysis",
                    "stakeholders": ["customers", "competitors", "regulators"]
                }
            )
        )
        
        # Wait for parallel tasks to complete
        model_result, domain_result = await asyncio.gather(model_task, domain_task)
        
        print(f"Model training completed: {model_result['best_algorithm']} (accuracy: {model_result['accuracy']})")
        print(f"Domain analysis completed: {domain_result['key_insights']} insights")
        
        # Step 5: Report Generation
        final_report = await workflow_engine.execute_workflow_step(
            session_id=session.session_id,
            step_name="report_generation",
            executor_id="research_coordinator",
            input_data={
                "model_results": model_result,
                "domain_insights": domain_result,
                "report_format": "executive_summary",
                "include_visualizations": True
            }
        )
        
        print(f"Research report generated: {final_report['report_path']}")
        
        # Notify team of completion
        await communication_hub.broadcast_message(
            sender_id="research_coordinator",
            community_id=community.community_id,
            content=f"Research session '{session.session_name}' completed successfully!",
            message_type=MessageType.NOTIFICATION
        )
        
    except Exception as e:
        print(f"Workflow execution failed: {e}")
        # Handle workflow failure and cleanup

asyncio.run(collaborative_workflow_example())

Integration Examples

Example 6: Custom Agent Adapter

class CustomResearchAgent(AgentAdapter):
    """Custom agent adapter for research-specific tasks."""
    
    def __init__(self, agent_id: str, specialization: str, llm_client=None):
        self.agent_id = agent_id
        self.specialization = specialization
        self.llm_client = llm_client
        self.research_tools = ["data_analysis", "literature_review", "experiment_design"]
    
    async def process_message(self, message: Message) -> Any:
        """Process incoming messages based on type and content."""
        if message.message_type == MessageType.REQUEST:
            return await self.handle_research_request(message.content)
        elif message.message_type == MessageType.SHARE:
            return await self.handle_resource_share(message.content)
        else:
            return await self.handle_general_message(message.content)
    
    async def get_capabilities(self) -> List[AgentCapability]:
        """Return agent capabilities."""
        return [
            AgentCapability(
                name="data_analysis",
                description="Statistical analysis and data visualization",
                parameters={
                    "methods": ["descriptive", "inferential", "predictive"],
                    "tools": ["pandas", "numpy", "matplotlib", "seaborn"]
                }
            ),
            AgentCapability(
                name="literature_review",
                description="Academic literature analysis and synthesis",
                parameters={
                    "sources": ["academic_papers", "conference_proceedings", "journals"],
                    "languages": ["english", "chinese", "spanish"]
                }
            ),
            AgentCapability(
                name="experiment_design",
                description="Design and analyze research experiments",
                parameters={
                    "types": ["controlled", "observational", "quasi_experimental"],
                    "statistical_tests": ["t_test", "anova", "chi_square"]
                }
            )
        ]
    
    async def handle_research_request(self, content: Any) -> Any:
        """Handle research-specific requests."""
        request_type = content.get("type")
        
        if request_type == "data_analysis":
            return await self.perform_data_analysis(content.get("data"))
        elif request_type == "literature_review":
            return await self.perform_literature_review(content.get("topic"))
        elif request_type == "experiment_design":
            return await self.design_experiment(content.get("hypothesis"))
        else:
            return {"error": f"Unknown request type: {request_type}"}
    
    async def perform_data_analysis(self, data: Any) -> Dict[str, Any]:
        """Perform data analysis using specialized tools."""
        # Simulate data analysis
        analysis_result = {
            "summary_statistics": {
                "mean": 42.5,
                "std": 12.3,
                "min": 15.2,
                "max": 89.7
            },
            "correlations": {
                "feature_a_feature_b": 0.73,
                "feature_a_feature_c": -0.45
            },
            "insights": [
                "Strong positive correlation between feature A and B",
                "Negative correlation suggests inverse relationship"
            ]
        }
        return analysis_result
    
    async def perform_literature_review(self, topic: str) -> Dict[str, Any]:
        """Perform literature review on given topic."""
        # Simulate literature review
        review_result = {
            "topic": topic,
            "papers_found": 45,
            "key_papers": [
                {"title": "Recent Advances in AI", "authors": "Smith et al.", "year": 2023},
                {"title": "Machine Learning Applications", "authors": "Johnson et al.", "year": 2023}
            ],
            "research_gaps": [
                "Limited research on real-time applications",
                "Need for more diverse datasets"
            ],
            "recommendations": [
                "Focus on practical implementations",
                "Consider ethical implications"
            ]
        }
        return review_result
    
    async def design_experiment(self, hypothesis: str) -> Dict[str, Any]:
        """Design an experiment to test the hypothesis."""
        # Simulate experiment design
        experiment_design = {
            "hypothesis": hypothesis,
            "experiment_type": "controlled_experiment",
            "variables": {
                "independent": ["treatment_group", "control_group"],
                "dependent": ["performance_metric", "user_satisfaction"]
            },
            "sample_size": 100,
            "duration_weeks": 4,
            "statistical_tests": ["t_test", "chi_square"],
            "expected_outcomes": [
                "Significant difference between groups",
                "Effect size > 0.5"
            ]
        }
        return experiment_design

# Usage example
async def custom_agent_example():
    """Demonstrate custom agent adapter usage."""
    
    # Create custom research agent
    research_agent = CustomResearchAgent(
        agent_id="research_agent_001",
        specialization="machine_learning",
        llm_client=None  # Would be actual LLM client
    )
    
    # Register with adapter registry
    registry = AgentAdapterRegistry()
    registry.register_adapter("research_agent_001", research_agent)
    
    # Get agent capabilities
    capabilities = await research_agent.get_capabilities()
    print("Agent capabilities:")
    for cap in capabilities:
        print(f"- {cap.name}: {cap.description}")
    
    # Test message processing
    test_message = Message(
        sender_id="coordinator",
        recipient_ids=["research_agent_001"],
        message_type=MessageType.REQUEST,
        content={
            "type": "data_analysis",
            "data": {"dataset": "customer_behavior", "size": 10000}
        }
    )
    
    result = await research_agent.process_message(test_message)
    print(f"Analysis result: {result}")

asyncio.run(custom_agent_example())

Real-World Scenarios

Example 7: Academic Research Collaboration

async def academic_research_scenario():
    """Simulate an academic research collaboration scenario."""
    
    integration = CommunityIntegration()
    community_manager = CommunityManager(integration)
    decision_engine = DecisionEngine(community_manager)
    resource_manager = ResourceManager(community_manager)
    workflow_engine = CollaborativeWorkflowEngine(community_manager)
    
    # Create research lab community
    research_lab = await community_manager.create_community(
        name="AI Ethics Research Lab",
        description="Interdisciplinary research on AI ethics and fairness",
        governance_type=GovernanceType.DEMOCRATIC,
        creator_agent_id="lab_director",
        capacity=15
    )
    
    # Add interdisciplinary team members
    team_members = [
        ("lab_director", CommunityRole.ADMIN, {"field": "computer_science", "expertise": "AI_ethics"}),
        ("ethics_professor", CommunityRole.MODERATOR, {"field": "philosophy", "expertise": "moral_philosophy"}),
        ("law_professor", CommunityRole.MODERATOR, {"field": "law", "expertise": "technology_law"}),
        ("psychologist", CommunityRole.MEMBER, {"field": "psychology", "expertise": "human_behavior"}),
        ("data_scientist", CommunityRole.MEMBER, {"field": "data_science", "expertise": "ML_fairness"}),
        ("sociologist", CommunityRole.MEMBER, {"field": "sociology", "expertise": "social_impact"}),
        ("graduate_student1", CommunityRole.MEMBER, {"field": "computer_science", "expertise": "NLP"}),
        ("graduate_student2", CommunityRole.MEMBER, {"field": "philosophy", "expertise": "ethics"}),
    ]
    
    for member_id, role, metadata in team_members:
        await community_manager.add_member(
            research_lab.community_id, member_id, role, metadata
        )
    
    # Propose research direction decision
    research_decision = await decision_engine.propose_decision(
        community_id=research_lab.community_id,
        proposer_id="lab_director",
        title="Choose Primary Research Focus for Next Year",
        description="Which area of AI ethics should be our primary research focus?",
        options=[
            "Algorithmic Bias in Hiring Systems",
            "Privacy in AI-Powered Healthcare",
            "Autonomous Vehicle Decision Making",
            "AI in Criminal Justice Systems"
        ],
        algorithm=ConsensusAlgorithm.WEIGHTED_VOTING
    )
    
    # Cast weighted votes based on expertise
    votes = [
        ("lab_director", "Algorithmic Bias in Hiring Systems", 2.0),
        ("ethics_professor", "AI in Criminal Justice Systems", 2.0),
        ("law_professor", "Privacy in AI-Powered Healthcare", 2.0),
        ("psychologist", "Algorithmic Bias in Hiring Systems", 1.5),
        ("data_scientist", "Algorithmic Bias in Hiring Systems", 2.0),
        ("sociologist", "AI in Criminal Justice Systems", 1.5),
        ("graduate_student1", "Autonomous Vehicle Decision Making", 1.0),
        ("graduate_student2", "AI in Criminal Justice Systems", 1.0),
    ]
    
    for voter_id, choice, weight in votes:
        await decision_engine.cast_vote(research_decision.decision_id, voter_id, choice, weight)
    
    # Evaluate decision
    passed, details = await decision_engine.evaluate_decision(
        research_decision.decision_id, research_lab.community_id
    )
    
    winning_topic = details.get('winning_option')
    print(f"Research focus chosen: {winning_topic}")
    
    # Create shared research resources
    literature_review = await resource_manager.create_resource(
        community_id=research_lab.community_id,
        creator_id="ethics_professor",
        name="AI Ethics Literature Review 2024",
        resource_type=ResourceType.KNOWLEDGE,
        data={
            "papers_reviewed": 150,
            "key_themes": ["fairness", "transparency", "accountability", "privacy"],
            "research_gaps": ["cross-cultural studies", "long-term impact assessment"],
            "bibliography": "ai_ethics_bibliography_2024.bib"
        },
        metadata={
            "review_period": "2024",
            "languages": ["english", "chinese", "spanish"],
            "update_frequency": "quarterly"
        }
    )
    
    # Start collaborative research session
    research_session = await workflow_engine.start_collaboration_session(
        community_id=research_lab.community_id,
        session_name=f"Research on {winning_topic}",
        participants=[member[0] for member in team_members],
        workflow_config={
            "workflow_type": "academic_research",
            "phases": [
                "literature_review",
                "hypothesis_development",
                "experimental_design",
                "data_collection",
                "analysis",
                "paper_writing"
            ],
            "timeline_months": 12,
            "milestones": [
                "Literature review complete",
                "Research proposal approved",
                "Data collection finished",
                "Analysis complete",
                "Paper submitted"
            ]
        }
    )
    
    print(f"Research session started: {research_session.session_name}")
    return research_lab, research_session

asyncio.run(academic_research_scenario())

Example 8: Corporate Innovation Team

async def corporate_innovation_scenario():
    """Simulate a corporate innovation team scenario."""
    
    integration = CommunityIntegration()
    community_manager = CommunityManager(integration)
    decision_engine = DecisionEngine(community_manager)
    resource_manager = ResourceManager(community_manager)
    workflow_engine = CollaborativeWorkflowEngine(community_manager)
    
    # Create innovation team
    innovation_team = await community_manager.create_community(
        name="Product Innovation Team",
        description="Cross-functional team for product innovation and development",
        governance_type=GovernanceType.HIERARCHICAL,
        creator_agent_id="vp_product",
        capacity=12
    )
    
    # Add cross-functional team members
    team_members = [
        ("vp_product", CommunityRole.ADMIN, {"department": "product", "level": "executive"}),
        ("product_manager", CommunityRole.MODERATOR, {"department": "product", "level": "senior"}),
        ("engineering_lead", CommunityRole.MODERATOR, {"department": "engineering", "level": "senior"}),
        ("design_lead", CommunityRole.MODERATOR, {"department": "design", "level": "senior"}),
        ("marketing_manager", CommunityRole.MEMBER, {"department": "marketing", "level": "senior"}),
        ("sales_representative", CommunityRole.MEMBER, {"department": "sales", "level": "senior"}),
        ("data_analyst", CommunityRole.MEMBER, {"department": "analytics", "level": "mid"}),
        ("ux_researcher", CommunityRole.MEMBER, {"department": "design", "level": "mid"}),
        ("backend_engineer", CommunityRole.MEMBER, {"department": "engineering", "level": "mid"}),
        ("frontend_engineer", CommunityRole.MEMBER, {"department": "engineering", "level": "mid"}),
    ]
    
    for member_id, role, metadata in team_members:
        await community_manager.add_member(
            innovation_team.community_id, member_id, role, metadata
        )
    
    # Propose product feature decision
    feature_decision = await decision_engine.propose_decision(
        community_id=innovation_team.community_id,
        proposer_id="product_manager",
        title="Q2 Product Feature Priority",
        description="Which feature should we prioritize for Q2 development?",
        options=[
            "AI-Powered Recommendations",
            "Real-time Collaboration Tools",
            "Advanced Analytics Dashboard",
            "Mobile App Redesign"
        ],
        algorithm=ConsensusAlgorithm.SUPERMAJORITY
    )
    
    # Cast votes with business context
    votes = [
        ("vp_product", "AI-Powered Recommendations", 3.0),  # Executive weight
        ("product_manager", "Real-time Collaboration Tools", 2.0),
        ("engineering_lead", "Advanced Analytics Dashboard", 2.0),
        ("design_lead", "Mobile App Redesign", 2.0),
        ("marketing_manager", "AI-Powered Recommendations", 1.5),
        ("sales_representative", "Real-time Collaboration Tools", 1.5),
        ("data_analyst", "Advanced Analytics Dashboard", 2.0),
        ("ux_researcher", "Mobile App Redesign", 1.5),
        ("backend_engineer", "AI-Powered Recommendations", 1.0),
        ("frontend_engineer", "Mobile App Redesign", 1.0),
    ]
    
    for voter_id, choice, weight in votes:
        await decision_engine.cast_vote(feature_decision.decision_id, voter_id, choice, weight)
    
    # Evaluate decision
    passed, details = await decision_engine.evaluate_decision(
        feature_decision.decision_id, innovation_team.community_id
    )
    
    chosen_feature = details.get('winning_option')
    print(f"Chosen feature for Q2: {chosen_feature}")
    
    # Create shared product resources
    market_research = await resource_manager.create_resource(
        community_id=innovation_team.community_id,
        creator_id="marketing_manager",
        name="Market Research Q1 2024",
        resource_type=ResourceType.DATA,
        data={
            "survey_responses": 5000,
            "key_insights": [
                "Users want more personalized experiences",
                "Mobile usage increased 40%",
                "Collaboration tools are highly requested"
            ],
            "competitor_analysis": {
                "competitor_a": {"market_share": 0.35, "strengths": ["AI", "UX"]},
                "competitor_b": {"market_share": 0.25, "strengths": ["price", "features"]}
            }
        },
        metadata={
            "research_period": "Q1_2024",
            "methodology": "online_survey",
            "confidence_level": 0.95
        }
    )
    
    # Start product development session
    dev_session = await workflow_engine.start_collaboration_session(
        community_id=innovation_team.community_id,
        session_name=f"Development of {chosen_feature}",
        participants=[member[0] for member in team_members],
        workflow_config={
            "workflow_type": "product_development",
            "sprint_duration_weeks": 2,
            "total_sprints": 6,
            "phases": [
                "requirements_gathering",
                "design_and_prototyping",
                "development",
                "testing",
                "deployment",
                "monitoring"
            ],
            "success_metrics": [
                "user_adoption_rate",
                "performance_improvement",
                "customer_satisfaction"
            ]
        }
    )
    
    print(f"Product development session started: {dev_session.session_name}")
    return innovation_team, dev_session

asyncio.run(corporate_innovation_scenario())

Performance Examples

Example 9: High-Performance Community Operations

async def performance_optimization_example():
    """Demonstrate performance optimization techniques."""
    
    integration = CommunityIntegration()
    community_manager = CommunityManager(integration)
    decision_engine = DecisionEngine(community_manager)
    resource_manager = ResourceManager(community_manager)
    
    # Create a large community
    large_community = await community_manager.create_community(
        name="Large Scale Research Community",
        description="High-performance community for large-scale research",
        governance_type=GovernanceType.DEMOCRATIC,
        creator_agent_id="admin",
        capacity=100
    )
    
    # Batch add members for performance
    member_batch = []
    for i in range(50):  # Add 50 members
        member_batch.append({
            "agent_id": f"researcher_{i:03d}",
            "role": CommunityRole.MEMBER,
            "metadata": {"specialization": f"field_{i % 10}"}
        })
    
    # Add members in parallel
    add_tasks = [
        community_manager.add_member(
            large_community.community_id,
            member["agent_id"],
            member["role"],
            member["metadata"]
        )
        for member in member_batch
    ]
    
    start_time = asyncio.get_event_loop().time()
    await asyncio.gather(*add_tasks)
    end_time = asyncio.get_event_loop().time()
    
    print(f"Added {len(member_batch)} members in {end_time - start_time:.2f} seconds")
    
    # Batch create resources
    resource_batch = []
    for i in range(20):  # Create 20 resources
        resource_batch.append({
            "name": f"Dataset_{i:03d}",
            "resource_type": ResourceType.DATA,
            "data": {"size": f"{i * 100}MB", "records": i * 1000},
            "creator_id": f"researcher_{i % 10:03d}"
        })
    
    # Create resources in parallel
    create_tasks = [
        resource_manager.create_resource(
            community_id=large_community.community_id,
            creator_id=resource["creator_id"],
            name=resource["name"],
            resource_type=resource["resource_type"],
            data=resource["data"]
        )
        for resource in resource_batch
    ]
    
    start_time = asyncio.get_event_loop().time()
    resources = await asyncio.gather(*create_tasks)
    end_time = asyncio.get_event_loop().time()
    
    print(f"Created {len(resources)} resources in {end_time - start_time:.2f} seconds")
    
    # Batch decision voting
    decision = await decision_engine.propose_decision(
        community_id=large_community.community_id,
        proposer_id="admin",
        title="Research Direction Vote",
        description="Choose the primary research direction",
        options=["AI", "ML", "Data Science", "Robotics"],
        algorithm=ConsensusAlgorithm.SIMPLE_MAJORITY
    )
    
    # Cast votes in parallel
    vote_tasks = []
    for i in range(50):
        choice = ["AI", "ML", "Data Science", "Robotics"][i % 4]
        vote_tasks.append(
            decision_engine.cast_vote(
                decision.decision_id,
                f"researcher_{i:03d}",
                choice
            )
        )
    
    start_time = asyncio.get_event_loop().time()
    await asyncio.gather(*vote_tasks)
    end_time = asyncio.get_event_loop().time()
    
    print(f"Cast {len(vote_tasks)} votes in {end_time - start_time:.2f} seconds")
    
    # Evaluate decision
    passed, details = await decision_engine.evaluate_decision(
        decision.decision_id, large_community.community_id
    )
    
    print(f"Decision result: {details.get('winning_option')} (passed: {passed})")

asyncio.run(performance_optimization_example())

Error Handling Examples

Example 10: Comprehensive Error Handling

async def error_handling_example():
    """Demonstrate comprehensive error handling patterns."""
    
    integration = CommunityIntegration()
    community_manager = CommunityManager(integration)
    decision_engine = DecisionEngine(community_manager)
    resource_manager = ResourceManager(community_manager)
    
    # Create a test community
    community = await community_manager.create_community(
        name="Error Handling Test Community",
        description="Community for testing error scenarios",
        governance_type=GovernanceType.DEMOCRATIC,
        creator_agent_id="test_admin",
        capacity=5
    )
    
    # Test 1: Community not found error
    try:
        await community_manager.get_community("nonexistent_community")
    except CommunityNotFoundError as e:
        print(f"Caught expected error: {e}")
    
    # Test 2: Member already exists error
    await community_manager.add_member(
        community.community_id, "test_member", CommunityRole.MEMBER
    )
    
    try:
        await community_manager.add_member(
            community.community_id, "test_member", CommunityRole.MEMBER
        )
    except MembershipError as e:
        print(f"Caught expected error: {e}")
    
    # Test 3: Community capacity exceeded
    try:
        # Add more members than capacity allows
        for i in range(10):
            await community_manager.add_member(
                community.community_id, f"member_{i}", CommunityRole.MEMBER
            )
    except CommunityCapacityError as e:
        print(f"Caught expected error: {e}")
    
    # Test 4: Resource access denied
    resource = await resource_manager.create_resource(
        community_id=community.community_id,
        creator_id="test_admin",
        name="Private Resource",
        resource_type=ResourceType.DATA,
        data={"sensitive": "data"}
    )
    
    try:
        await resource_manager.get_resource(
            resource.resource_id, "unauthorized_agent"
        )
    except AccessDeniedError as e:
        print(f"Caught expected error: {e}")
    
    # Test 5: Decision not found
    try:
        await decision_engine.evaluate_decision(
            "nonexistent_decision", community.community_id
        )
    except DecisionNotFoundError as e:
        print(f"Caught expected error: {e}")
    
    # Test 6: Quorum not met
    decision = await decision_engine.propose_decision(
        community_id=community.community_id,
        proposer_id="test_admin",
        title="Test Decision",
        description="A test decision",
        options=["Yes", "No"],
        algorithm=ConsensusAlgorithm.UNANIMOUS
    )
    
    # Cast only one vote (unanimous requires all)
    await decision_engine.cast_vote(
        decision.decision_id, "test_member", "Yes"
    )
    
    try:
        await decision_engine.evaluate_decision(
            decision.decision_id, community.community_id
        )
    except QuorumNotMetError as e:
        print(f"Caught expected error: {e}")
    
    # Test 7: Graceful error recovery
    async def safe_community_operation():
        """Demonstrate safe error handling with recovery."""
        try:
            # Attempt risky operation
            result = await community_manager.add_member(
                "nonexistent_community", "new_member", CommunityRole.MEMBER
            )
            return result
        except CommunityNotFoundError:
            print("Community not found, creating new one...")
            # Recovery: create the community first
            new_community = await community_manager.create_community(
                name="Recovery Community",
                description="Created during error recovery",
                governance_type=GovernanceType.DEMOCRATIC,
                creator_agent_id="recovery_admin",
                capacity=10
            )
            # Retry the operation
            return await community_manager.add_member(
                new_community.community_id, "new_member", CommunityRole.MEMBER
            )
        except Exception as e:
            print(f"Unexpected error: {e}")
            return None
    
    result = await safe_community_operation()
    if result:
        print(f"Operation succeeded: {result.agent_id}")
    
    # Test 8: Retry mechanism with exponential backoff
    async def retry_with_backoff(operation, max_retries=3):
        """Retry operation with exponential backoff."""
        for attempt in range(max_retries):
            try:
                return await operation()
            except Exception as e:
                if attempt == max_retries - 1:
                    raise e
                wait_time = 2 ** attempt  # Exponential backoff
                print(f"Attempt {attempt + 1} failed: {e}. Retrying in {wait_time}s...")
                await asyncio.sleep(wait_time)
    
    # Example usage of retry mechanism
    async def unreliable_operation():
        """Simulate an unreliable operation."""
        import random
        if random.random() < 0.7:  # 70% failure rate
            raise Exception("Simulated failure")
        return "Success!"
    
    try:
        result = await retry_with_backoff(unreliable_operation)
        print(f"Retry succeeded: {result}")
    except Exception as e:
        print(f"All retries failed: {e}")

asyncio.run(error_handling_example())

These examples demonstrate the comprehensive capabilities of the DOMAIN_COMMUNITY module, from basic operations to advanced use cases, performance optimization, and robust error handling. Each example is designed to be practical and applicable to real-world scenarios.