Source code for aiecs.application.knowledge_graph.reasoning.reasoning_engine

# /*---------------------------------------------------------------------------------------------
#  *  Copyright (c) IRETBL Corporation. All rights reserved.
#  *  Licensed under the Apache-2.0. See License.txt in the project root for license information.
#  *--------------------------------------------------------------------------------------------*/
"""
Reasoning Engine

Multi-hop reasoning over knowledge graphs with evidence collection and answer generation.
"""

import uuid
import time
from typing import List, Optional, Dict, Any, Tuple
from aiecs.infrastructure.graph_storage.base import GraphStore
from aiecs.domain.knowledge_graph.models.path import Path
from aiecs.domain.knowledge_graph.models.evidence import (
    Evidence,
    EvidenceType,
    ReasoningResult,
)
from aiecs.domain.knowledge_graph.models.query_plan import QueryPlan, QueryStep
from aiecs.domain.knowledge_graph.models.query import QueryType
from aiecs.application.knowledge_graph.traversal.enhanced_traversal import (
    EnhancedTraversal,
)
from aiecs.application.knowledge_graph.traversal.path_scorer import PathScorer
from aiecs.application.knowledge_graph.reasoning.query_planner import (
    QueryPlanner,
)


[docs] class ReasoningEngine: """ Multi-Hop Reasoning Engine Executes query plans, collects evidence, and generates answers for complex multi-hop queries over knowledge graphs. Features: - Execute query plans from QueryPlanner - Multi-hop path finding - Evidence collection and scoring - Path ranking by relevance - Answer generation from evidence Example: ```python engine = ReasoningEngine(graph_store) # Reason over a query result = await engine.reason( query="What companies does Alice know people at?", context={"start_entity_id": "person_alice"} ) print(f"Answer: {result.answer}") print(f"Confidence: {result.confidence}") print(f"Evidence: {result.evidence_count} pieces") ``` """
[docs] def __init__( self, graph_store: GraphStore, query_planner: Optional[QueryPlanner] = None, ): """ Initialize reasoning engine Args: graph_store: Graph storage backend query_planner: Query planner (creates one if not provided) """ self.graph_store = graph_store self.query_planner = query_planner or QueryPlanner(graph_store) self.traversal = EnhancedTraversal(graph_store) self.path_scorer = PathScorer()
[docs] async def reason( self, query: str, context: Optional[Dict[str, Any]] = None, max_hops: int = 3, max_evidence: int = 20, ) -> ReasoningResult: """ Perform multi-hop reasoning on a query Args: query: Natural language query context: Query context (entity IDs, embeddings, etc.) max_hops: Maximum number of hops for traversal max_evidence: Maximum number of evidence pieces to collect Returns: Reasoning result with evidence and answer """ start_time = time.time() context = context or {} trace = [] # Step 1: Plan the query trace.append(f"Planning query: {query}") plan = self.query_planner.plan_query(query, context) trace.append(f"Created plan with {len(plan.steps)} steps") # Step 2: Execute plan and collect evidence trace.append("Executing query plan...") evidence = await self._execute_plan_with_evidence(plan, trace) # Fallback: If no evidence found but start_entity_id is provided, try # direct traversal if not evidence and context.get("start_entity_id"): import logging logger = logging.getLogger(__name__) trace.append(f"WARNING: Query plan returned no evidence. Plan had {len(plan.steps)} steps.") trace.append(f"Plan steps: {[s.step_id + ':' + s.operation.value for s in plan.steps]}") logger.warning(f"Query plan returned no evidence. " f"Plan ID: {plan.plan_id}, Steps: {len(plan.steps)}, " f"Query: {query}, Context: {context}") trace.append(f"FALLBACK: Trying direct traversal from {context['start_entity_id']}") start_id = context["start_entity_id"] target_id = context.get("target_entity_id") try: paths = await self.find_multi_hop_paths( start_entity_id=start_id, target_entity_id=target_id, max_hops=max_hops, relation_types=context.get("relation_types"), max_paths=max_evidence, ) if paths: path_evidence = await self.collect_evidence_from_paths(paths, source="direct_traversal_fallback") evidence.extend(path_evidence) trace.append(f"FALLBACK SUCCESS: Found {len(path_evidence)} evidence pieces from direct traversal") logger.info(f"Fallback traversal succeeded: {len(path_evidence)} evidence pieces from {start_id}") else: trace.append(f"FALLBACK FAILED: No paths found from {start_id}") logger.warning(f"Fallback traversal found no paths from {start_id}") except Exception as e: trace.append(f"FALLBACK ERROR: {str(e)}") logger.error(f"Fallback traversal failed: {str(e)}", exc_info=True) # Step 3: Rank and filter evidence trace.append(f"Collected {len(evidence)} pieces of evidence") evidence = self._rank_and_filter_evidence(evidence, max_evidence) trace.append(f"Filtered to top {len(evidence)} pieces") # Step 4: Generate answer trace.append("Generating answer from evidence...") answer, confidence = self._generate_answer(query, evidence) execution_time = (time.time() - start_time) * 1000 return ReasoningResult( query=query, evidence=evidence, answer=answer, confidence=confidence, reasoning_trace=trace, execution_time_ms=execution_time, metadata={"plan_id": plan.plan_id, "num_steps": len(plan.steps)}, )
[docs] async def find_multi_hop_paths( self, start_entity_id: str, target_entity_id: Optional[str] = None, max_hops: int = 3, relation_types: Optional[List[str]] = None, max_paths: int = 10, ) -> List[Path]: """ Find multi-hop paths between entities Args: start_entity_id: Starting entity ID target_entity_id: Target entity ID (None for all reachable) max_hops: Maximum number of hops relation_types: Allowed relation types (None for all) max_paths: Maximum number of paths to return Returns: List of paths found """ # Use graph store's traverse method paths = await self.graph_store.traverse( start_entity_id=start_entity_id, relation_type=None, # Will filter later if needed max_depth=max_hops, max_results=max_paths * 2, # Get more, then filter ) # Filter by target if specified if target_entity_id: paths = [path for path in paths if path.nodes[-1].id == target_entity_id] # Filter by relation types if specified if relation_types: paths = [path for path in paths if all(rel.relation_type in relation_types for rel in path.edges)] return paths[:max_paths]
[docs] async def collect_evidence_from_paths(self, paths: List[Path], source: str = "path_finding") -> List[Evidence]: """ Collect evidence from paths Args: paths: List of paths to extract evidence from source: Source identifier for the evidence Returns: List of evidence pieces """ evidence_list = [] for i, path in enumerate(paths): # Calculate confidence based on path properties confidence = self._calculate_path_confidence(path) # Calculate relevance (for now, use path length as proxy) relevance = 1.0 / max(1, len(path.nodes) - 1) # Create explanation explanation = self._create_path_explanation(path) evidence = Evidence( evidence_id=f"ev_{uuid.uuid4().hex[:8]}", evidence_type=EvidenceType.PATH, entities=path.nodes, relations=path.edges, paths=[path], confidence=confidence, relevance_score=relevance, explanation=explanation, source=source, metadata={"path_index": i, "path_length": len(path.nodes)}, ) evidence_list.append(evidence) return evidence_list
[docs] def rank_evidence(self, evidence: List[Evidence], ranking_method: str = "combined_score") -> List[Evidence]: """ Rank evidence by relevance Args: evidence: List of evidence to rank ranking_method: Method to use for ranking - "combined_score": confidence * relevance - "confidence": confidence only - "relevance": relevance only Returns: Ranked evidence list """ if ranking_method == "combined_score": return sorted(evidence, key=lambda e: e.combined_score, reverse=True) elif ranking_method == "confidence": return sorted(evidence, key=lambda e: e.confidence, reverse=True) elif ranking_method == "relevance": return sorted(evidence, key=lambda e: e.relevance_score, reverse=True) else: return evidence
def _calculate_path_confidence(self, path: Path) -> float: """Calculate confidence score for a path""" if not path.edges: return 1.0 # Use average weight of relations as confidence proxy weights = [rel.weight for rel in path.edges if rel.weight is not None] if not weights: return 0.5 return sum(weights) / len(weights) def _create_path_explanation(self, path: Path) -> str: """Create human-readable explanation of a path""" if len(path.nodes) == 1: entity = path.nodes[0] return f"Entity: {entity.properties.get('name', entity.id)} ({entity.entity_type})" parts = [] for i, entity in enumerate(path.nodes): entity_name = entity.properties.get("name", entity.id) entity_type = entity.entity_type parts.append(f"{entity_name} ({entity_type})") if i < len(path.edges): relation = path.edges[i] parts.append(f" --[{relation.relation_type}]--> ") return "".join(parts) async def _execute_plan_with_evidence(self, plan: QueryPlan, trace: List[str]) -> List[Evidence]: """Execute query plan and collect evidence""" import logging logger = logging.getLogger(__name__) all_evidence = [] completed_steps = set() step_results: Dict[str, Any] = {} # Get execution order execution_order = plan.get_execution_order() trace.append(f"Plan has {len(plan.steps)} steps, execution order: {execution_order}") for level, step_ids in enumerate(execution_order): trace.append(f"Executing level {level}: {step_ids}") # Execute steps in this level (could be parallelized) for step_id in step_ids: try: step = next(s for s in plan.steps if s.step_id == step_id) trace.append(f" Executing {step_id}: {step.operation.value} - {step.description}") # Execute step step_evidence = await self._execute_step(step, step_results) all_evidence.extend(step_evidence) # Store results for dependent steps step_results[step_id] = step_evidence completed_steps.add(step_id) trace.append(f" {step_id}: Collected {len(step_evidence)} evidence") logger.debug(f"Step {step_id} completed: {len(step_evidence)} evidence pieces") if len(step_evidence) == 0: trace.append(f" WARNING: {step_id} returned no evidence") logger.warning( f"Step {step_id} ({step.operation.value}) returned no evidence. " f"Query: {step.query.query_type}, " f"Entity ID: {getattr(step.query, 'entity_id', None)}, " f"Source: {getattr(step.query, 'source_entity_id', None)}" ) except Exception as e: error_msg = f"Error executing step {step_id}: {str(e)}" trace.append(f" ERROR: {error_msg}") logger.error(error_msg, exc_info=True) # Continue with other steps even if one fails return all_evidence async def _execute_step(self, step: QueryStep, previous_results: Dict[str, Any]) -> List[Evidence]: """Execute a single query step""" query = step.query evidence = [] # Entity lookup if query.query_type == QueryType.ENTITY_LOOKUP: if query.entity_id: entity = await self.graph_store.get_entity(query.entity_id) if entity: evidence.append( Evidence( evidence_id=f"ev_{uuid.uuid4().hex[:8]}", evidence_type=EvidenceType.ENTITY, entities=[entity], confidence=1.0, relevance_score=1.0, explanation=f"Found entity: {entity.id}", source=step.step_id, ) ) # Vector search elif query.query_type == QueryType.VECTOR_SEARCH: if query.embedding: results = await self.graph_store.vector_search( query_embedding=query.embedding, entity_type=query.entity_type, max_results=query.max_results, score_threshold=query.score_threshold, ) for entity, score in results: evidence.append( Evidence( evidence_id=f"ev_{uuid.uuid4().hex[:8]}", evidence_type=EvidenceType.ENTITY, entities=[entity], confidence=score, relevance_score=score, explanation=f"Similar entity: {entity.id} (score: {score:.2f})", source=step.step_id, ) ) # Traversal elif query.query_type == QueryType.TRAVERSAL: import logging logger = logging.getLogger(__name__) # Get starting entities from previous steps or query start_ids = [] if query.entity_id: start_ids = [query.entity_id] logger.debug(f"TRAVERSAL: Using entity_id from query: {query.entity_id}") elif step.depends_on: # Get entities from dependent steps for dep_id in step.depends_on: if dep_id in previous_results: dep_evidence = previous_results[dep_id] extracted_ids = [e.id for ev in dep_evidence for e in ev.entities] start_ids.extend(extracted_ids) logger.debug(f"TRAVERSAL: Extracted {len(extracted_ids)} entity IDs from step {dep_id}") else: logger.warning(f"TRAVERSAL: Dependent step {dep_id} not found in previous_results") else: logger.warning("TRAVERSAL: No entity_id and no dependencies. Cannot traverse.") if not start_ids: logger.warning( f"TRAVERSAL step {step.step_id} has no starting entities. " f"Query entity_id: {getattr(query, 'entity_id', None)}, " f"Dependencies: {step.depends_on}, " f"Previous results keys: {list(previous_results.keys())}" ) else: # Traverse from each starting entity # Limit starting points for start_id in start_ids[: query.max_results]: try: paths = await self.graph_store.traverse( start_entity_id=start_id, relation_type=query.relation_type, max_depth=query.max_depth, max_results=query.max_results, ) logger.debug(f"TRAVERSAL: Found {len(paths)} paths from {start_id}") # Convert paths to evidence path_evidence = await self.collect_evidence_from_paths(paths, source=step.step_id) evidence.extend(path_evidence) logger.debug(f"TRAVERSAL: Collected {len(path_evidence)} evidence from {start_id}") except Exception as e: logger.error( f"TRAVERSAL: Error traversing from {start_id}: {str(e)}", exc_info=True, ) # Path finding elif query.query_type == QueryType.PATH_FINDING: if query.source_entity_id and query.target_entity_id: paths = await self.find_multi_hop_paths( start_entity_id=query.source_entity_id, target_entity_id=query.target_entity_id, max_hops=query.max_depth, max_paths=query.max_results, ) path_evidence = await self.collect_evidence_from_paths(paths, source=step.step_id) evidence.extend(path_evidence) return evidence def _rank_and_filter_evidence(self, evidence: List[Evidence], max_evidence: int) -> List[Evidence]: """Rank and filter evidence to top N""" # Rank by combined score ranked = self.rank_evidence(evidence, ranking_method="combined_score") # Filter to top N return ranked[:max_evidence] def _generate_answer(self, query: str, evidence: List[Evidence]) -> Tuple[str, float]: """ Generate answer from evidence Args: query: Original query evidence: Collected evidence Returns: (answer, confidence) tuple """ if not evidence: return "No evidence found to answer the query.", 0.0 # Calculate overall confidence if evidence: confidence = sum(e.combined_score for e in evidence) / len(evidence) else: confidence = 0.0 # Generate answer based on evidence type top_evidence = evidence[:5] # Top 5 pieces # Collect unique entities from evidence entity_ids = set() entity_names = [] for ev in top_evidence: for entity in ev.entities: if entity.id not in entity_ids: entity_ids.add(entity.id) name = entity.properties.get("name", entity.id) entity_type = entity.entity_type entity_names.append(f"{name} ({entity_type})") # Build answer if len(entity_names) == 0: answer = "No relevant entities found." elif len(entity_names) == 1: answer = f"Found: {entity_names[0]}" elif len(entity_names) <= 3: answer = f"Found: {', '.join(entity_names)}" else: answer = f"Found {len(entity_names)} entities: {', '.join(entity_names[:3])}, and {len(entity_names) - 3} more" # Add path information if available path_count = sum(1 for ev in top_evidence if ev.evidence_type == EvidenceType.PATH) if path_count > 0: answer += f" (through {path_count} connection{'s' if path_count != 1 else ''})" return answer, confidence