Application API
This section documents the application layer components.
Application Executors
Application executors module
Contains service executors and application-level coordination.
- class aiecs.application.executors.OperationExecutor[source]
Bases:
objectCore logic for handling operation execution
- async execute_operation(operation_spec, params)[source]
Execute a single operation (tool_name.operation_name)
- async execute_operations_sequence(operations, user_id, task_id, stop_on_failure=False, save_callback=None)[source]
Execute operations sequence sequentially, with option to stop on failure
- async batch_tool_calls(tool_calls, tool_executor_func=None)[source]
Execute batch tool calls with rate limiting
Knowledge Graph
Note
The knowledge graph module contains multiple submodules. See individual submodule documentation for details.
Knowledge Graph Builder Pipeline
Orchestrates document-to-graph conversion workflow.
- class aiecs.application.knowledge_graph.builder.GraphBuilder[source]
Bases:
objectMain pipeline for building knowledge graphs from text
The pipeline: 1. Extract entities from text 2. Deduplicate entities 3. Link entities to existing graph 4. Extract relations between entities 5. Validate relations 6. Deduplicate relations 7. Store entities and relations in graph
Features: - Async/parallel processing - Progress callbacks - Error handling and recovery - Provenance tracking - Configurable components
Example:
# Initialize components entity_extractor = LLMEntityExtractor(schema) relation_extractor = LLMRelationExtractor(schema) # Create builder builder = GraphBuilder( graph_store=store, entity_extractor=entity_extractor, relation_extractor=relation_extractor, schema=schema ) # Build graph from text result = await builder.build_from_text( text="Alice works at Tech Corp.", source="document_1.pdf" ) print(f"Added {result.entities_added} entities, {result.relations_added} relations")
- __init__(graph_store, entity_extractor, relation_extractor, schema=None, enable_deduplication=True, enable_linking=True, enable_validation=True, progress_callback=None, embedding_client=None)[source]
Initialize graph builder
- Parameters:
graph_store (GraphStore) – Graph storage to save entities/relations
entity_extractor (EntityExtractor) – Entity extractor to use
relation_extractor (RelationExtractor) – Relation extractor to use
schema (GraphSchema | None) – Optional schema for validation
enable_deduplication (bool) – Enable entity/relation deduplication
enable_linking (bool) – Enable linking to existing entities
enable_validation (bool) – Enable relation validation
progress_callback (Callable[[str, float], None] | None) – Optional callback for progress updates (message, progress_pct)
embedding_client (LLMClientProtocol | None) – Optional custom LLM client for generating embeddings
- static from_config(graph_store, entity_extractor, relation_extractor, schema=None, enable_deduplication=True, enable_linking=True, enable_validation=True, progress_callback=None)[source]
Create GraphBuilder with embedding client resolved from configuration
This factory method automatically resolves the embedding client from the global Settings configuration using LLMClientFactory.
- Parameters:
graph_store (GraphStore) – Graph storage to save entities/relations
entity_extractor (EntityExtractor) – Entity extractor to use
relation_extractor (RelationExtractor) – Relation extractor to use
schema (GraphSchema | None) – Optional schema for validation
enable_deduplication (bool) – Enable entity/relation deduplication
enable_linking (bool) – Enable linking to existing entities
enable_validation (bool) – Enable relation validation
progress_callback (Callable[[str, float], None] | None) – Optional callback for progress updates
- Returns:
GraphBuilder instance with configured embedding client
- Return type:
Example:
from aiecs.config import get_settings from aiecs.llm.factory import LLMClientFactory # Register custom embedding provider LLMClientFactory.register_custom_provider("my_embedder", my_client) # Set environment variable os.environ["KG_EMBEDDING_PROVIDER"] = "my_embedder" # Create builder with auto-resolved embedding client builder = GraphBuilder.from_config( graph_store=store, entity_extractor=extractor, relation_extractor=rel_extractor )
- class aiecs.application.knowledge_graph.builder.DocumentGraphBuilder[source]
Bases:
objectBuild knowledge graphs from documents
Supports multiple document formats: - PDF - DOCX (Microsoft Word) - TXT (Plain text) - And more via AIECS DocumentParserTool
For large documents, automatically chunks text into manageable pieces.
Example
```python builder = DocumentGraphBuilder(
graph_builder=graph_builder, chunk_size=1000
)
result = await builder.build_from_document(“research_paper.pdf”)
print(f”Processed {result.total_chunks} chunks”) print(f”Added {result.total_entities_added} entities”) print(f”Added {result.total_relations_added} relations”) ```
- __init__(graph_builder, chunk_size=2000, chunk_overlap=200, enable_chunking=True, parallel_chunks=True, max_parallel_chunks=3)[source]
Initialize document graph builder
- Parameters:
graph_builder (GraphBuilder) – GraphBuilder instance for text processing
chunk_size (int) – Size of text chunks (in characters)
chunk_overlap (int) – Overlap between chunks
enable_chunking (bool) – Whether to chunk large documents
parallel_chunks (bool) – Process chunks in parallel
max_parallel_chunks (int) – Maximum parallel chunk processing
- async build_from_document(document_path, metadata=None)[source]
Build knowledge graph from a document
- class aiecs.application.knowledge_graph.builder.TextChunker[source]
Bases:
objectSplit large texts into smaller chunks
Strategies: - Fixed size chunking (by character or token count) - Sentence-aware chunking (don’t break sentences) - Paragraph-aware chunking (preserve paragraphs) - Overlapping chunks (for context preservation)
Example
```python chunker = TextChunker(chunk_size=1000, overlap=100) chunks = chunker.chunk_text(long_document)
- for chunk in chunks:
# Process each chunk separately result = await process(chunk.text)
- __init__(chunk_size=1000, overlap=100, respect_sentences=True, respect_paragraphs=False, min_chunk_size=100)[source]
Initialize text chunker
- Parameters:
chunk_size (int) – Target size for each chunk (in characters)
overlap (int) – Number of characters to overlap between chunks
respect_sentences (bool) – Try to break at sentence boundaries
respect_paragraphs (bool) – Try to break at paragraph boundaries
min_chunk_size (int) – Minimum chunk size (don’t create tiny chunks)
- class aiecs.application.knowledge_graph.builder.SchemaMapping[source]
Bases:
BaseModelSchema mapping configuration
Defines how structured data (CSV, JSON) maps to knowledge graph entities and relations.
- entity_mappings: List[EntityMapping]
- relation_mappings: List[RelationMapping]
- get_entity_mapping(entity_type)[source]
Get entity mapping by entity type name
- Parameters:
entity_type (str) – Entity type name
- Returns:
Entity mapping or None if not found
- Return type:
EntityMapping | None
- get_relation_mapping(relation_type)[source]
Get relation mapping by relation type name
- Parameters:
relation_type (str) – Relation type name
- Returns:
Relation mapping or None if not found
- Return type:
RelationMapping | None
- get_aggregations(entity_type)[source]
Get aggregation configuration for entity type
- Parameters:
entity_type (str) – Entity type name
- Returns:
EntityAggregation or None if not found
- Return type:
EntityAggregation | None
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class aiecs.application.knowledge_graph.builder.EntityMapping[source]
Bases:
BaseModelEntity mapping configuration
Maps source data columns to an entity type with property transformations.
- transformations: List[PropertyTransformation]
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class aiecs.application.knowledge_graph.builder.RelationMapping[source]
Bases:
BaseModelRelation mapping configuration
Maps source data columns to a relation type between entities.
- transformations: List[PropertyTransformation]
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class aiecs.application.knowledge_graph.builder.PropertyTransformation[source]
Bases:
BaseModelProperty transformation configuration
Defines how a source column/value is transformed into a target property.
- transformation_type: TransformationType
- classmethod validate_transformation_type(v)[source]
Validate transformation type
- Parameters:
- Return type:
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class aiecs.application.knowledge_graph.builder.TransformationType[source]
-
Types of property transformations
- RENAME = 'rename'
- TYPE_CAST = 'type_cast'
- COMPUTE = 'compute'
- CONSTANT = 'constant'
- SKIP = 'skip'
- __new__(value)
- class aiecs.application.knowledge_graph.builder.StructuredDataPipeline[source]
Bases:
objectPipeline for importing structured data (CSV, JSON, SPSS, Excel) into knowledge graphs
Uses SchemaMapping to map source data columns to entity and relation types. Supports batch processing, progress tracking, and error handling.
Example
```python # Define schema mapping mapping = SchemaMapping(
- entity_mappings=[
- EntityMapping(
source_columns=[“id”, “name”, “age”], entity_type=”Person”, property_mapping={“id”: “id”, “name”: “name”, “age”: “age”}
)
], relation_mappings=[
- RelationMapping(
source_columns=[“person_id”, “company_id”], relation_type=”WORKS_FOR”, source_entity_column=”person_id”, target_entity_column=”company_id”
)
]
)
# Create pipeline pipeline = StructuredDataPipeline(
mapping=mapping, graph_store=store
)
# Import CSV result = await pipeline.import_from_csv(“employees.csv”) print(f”Added {result.entities_added} entities, {result.relations_added} relations”) ```
- __init__(mapping, graph_store, batch_size=100, progress_callback=None, skip_errors=True, enable_parallel=False, max_workers=None, auto_tune_batch_size=False, enable_streaming=False, use_bulk_writes=True, track_performance=True)[source]
Initialize structured data pipeline
- Parameters:
mapping (SchemaMapping) – Schema mapping configuration
graph_store (GraphStore) – Graph storage to save entities/relations
batch_size (int) – Number of rows to process in each batch (ignored if auto_tune_batch_size=True)
progress_callback (Callable[[str, float], None] | None) – Optional callback for progress updates (message, progress_pct)
skip_errors (bool) – Whether to skip rows with errors and continue processing
enable_parallel (bool) – Enable parallel batch processing for faster imports
max_workers (int | None) – Maximum number of parallel workers (default: CPU count - 1)
auto_tune_batch_size (bool) – Automatically tune batch size based on system resources
enable_streaming (bool) – Enable streaming mode for large files (memory-efficient)
use_bulk_writes (bool) – Use bulk write operations for better performance
track_performance (bool) – Track detailed performance metrics
- static infer_schema_from_csv(file_path, encoding='utf-8', sample_size=1000)[source]
Infer schema mapping from CSV file
Analyzes CSV structure and content to automatically generate schema mappings.
- Parameters:
- Returns:
InferredSchema with entity and relation mappings
- Return type:
InferredSchema
Example
```python # Infer schema from CSV inferred = StructuredDataPipeline.infer_schema_from_csv(“data.csv”)
# Review and modify if needed print(f”Inferred entity types: {[em.entity_type for em in inferred.entity_mappings]}”) print(f”Warnings: {inferred.warnings}”)
# Use inferred schema mapping = inferred.to_schema_mapping() pipeline = StructuredDataPipeline(mapping, graph_store) ```
- static infer_schema_from_spss(file_path, encoding='utf-8', sample_size=1000)[source]
Infer schema mapping from SPSS file
Uses SPSS variable labels and value labels to generate schema mappings.
- static infer_schema_from_dataframe(df, entity_type_hint=None, metadata=None, sample_size=1000)[source]
Infer schema mapping from pandas DataFrame
- Parameters:
- Returns:
InferredSchema with entity and relation mappings
- Return type:
InferredSchema
- static create_with_auto_reshape(file_path, graph_store, entity_type_hint=None, reshape_threshold=50, **kwargs)[source]
Create pipeline with automatic reshaping for wide format data
Detects wide format data and automatically reshapes to normalized structure before creating the pipeline.
- Parameters:
file_path (str | Path) – Path to data file (CSV, SPSS, Excel)
graph_store (GraphStore) – Graph storage to save entities/relations
entity_type_hint (str | None) – Optional hint for entity type name
reshape_threshold (int) – Minimum columns to trigger reshaping (default: 50)
**kwargs – Additional arguments for StructuredDataPipeline
- Returns:
StructuredDataPipeline configured for the data
- Return type:
- async import_from_csv(file_path, encoding='utf-8', delimiter=',', header=True)[source]
Import data from CSV file
- async import_from_json(file_path, encoding='utf-8', array_key=None)[source]
Import data from JSON file
Supports: - Array of objects: [{“id”: 1, “name”: “Alice”}, …] - Object with array: {“items”: [{“id”: 1, …}, …]} - Single object: {“id”: 1, “name”: “Alice”}
- Parameters:
- Returns:
ImportResult with statistics
- Return type:
- async import_from_csv_streaming(file_path, encoding='utf-8', delimiter=',', chunk_size=10000)[source]
Import data from CSV file using streaming mode.
Memory-efficient import for large files (>1GB). Reads file in chunks without loading entire file into memory.
- async import_from_spss(file_path, encoding='utf-8', preserve_metadata=True)[source]
Import data from SPSS file (.sav, .por)
Uses pyreadstat library to read SPSS files and extract metadata. SPSS variable labels and value labels are preserved as entity properties.
- Parameters:
- Returns:
ImportResult with statistics
- Return type:
- async import_from_excel(file_path, sheet_name=0, encoding='utf-8', header=True)[source]
Import data from Excel file (.xlsx, .xls)
Supports importing from specific sheets or all sheets.
- Parameters:
- Returns:
ImportResult with statistics
- Return type:
- async reshape_and_import_csv(file_path, id_vars=None, value_vars=None, var_name='variable', value_name='value', entity_type_hint=None, encoding='utf-8')[source]
Reshape wide format CSV to normalized structure and import
Automatically converts wide format data (many columns) to long format (normalized structure) before importing into the graph.
- Parameters:
id_vars (List[str] | None) – Columns to use as identifiers (auto-detected if None)
value_vars (List[str] | None) – Columns to unpivot (auto-detected if None)
var_name (str) – Name for variable column (default: ‘variable’)
value_name (str) – Name for value column (default: ‘value’)
entity_type_hint (str | None) – Optional hint for entity type name
encoding (str) – File encoding (default: utf-8)
- Returns:
ImportResult with statistics
- Return type:
- class aiecs.application.knowledge_graph.builder.ImportResult[source]
Bases:
objectResult of structured data import operation
- quality_report
Data quality validation report (if validation enabled)
- Type:
aiecs.application.knowledge_graph.builder.data_quality.QualityReport | None
- start_time
When import started
- Type:
datetime.datetime | None
- end_time
When import ended
- Type:
datetime.datetime | None
- performance_metrics
Detailed performance metrics (if tracking enabled)
- Type:
aiecs.application.knowledge_graph.builder.import_optimizer.PerformanceMetrics | None
- __init__(success=True, entities_added=0, relations_added=0, rows_processed=0, rows_failed=0, errors=<factory>, warnings=<factory>, quality_report=None, start_time=None, end_time=None, duration_seconds=0.0, performance_metrics=None)
Knowledge Graph Reasoning Module
Provides query planning, multi-hop reasoning, and inference capabilities.
- class aiecs.application.knowledge_graph.reasoning.QueryPlanner[source]
Bases:
objectQuery Planning Engine
Translates natural language queries into structured, optimized execution plans.
Features: - Natural language to graph query translation - Query decomposition (complex queries → multiple steps) - Query optimization (reorder operations for efficiency) - Cost estimation
Example:
planner = QueryPlanner(graph_store) # Plan a complex query plan = planner.plan_query( "Who works at companies that Alice knows people at?" ) # Optimize the plan optimized_plan = planner.optimize_plan( plan, strategy=OptimizationStrategy.MINIMIZE_COST )
- __init__(graph_store, enable_advanced_optimization=True, schema=None)[source]
Initialize query planner
- plan_query(natural_language_query, context=None)[source]
Create an execution plan from natural language query
- Parameters:
- Returns:
Query execution plan
- Return type:
QueryPlan
Example:
plan = planner.plan_query( "Find papers similar to 'Deep Learning' and their authors" )
- optimize_plan(plan, strategy=OptimizationStrategy.BALANCED)[source]
Optimize query execution plan
- Parameters:
plan (QueryPlan) – Original query plan
strategy (OptimizationStrategy) – Optimization strategy
- Returns:
Optimized query plan
- Return type:
QueryPlan
Example:
optimized = planner.optimize_plan( plan, strategy=OptimizationStrategy.MINIMIZE_COST )
- translate_to_graph_query(natural_language_query, context=None)[source]
Translate natural language to a single graph query
For simple queries that don’t need decomposition.
- Parameters:
- Returns:
Single graph query
- Return type:
GraphQuery
Example:
query = planner.translate_to_graph_query( "Find entities similar to X", context={"query_embedding": [0.1, 0.2, ...]} )
- update_statistics()[source]
Update query statistics from graph store
Call this periodically to keep optimizer statistics up-to-date
- Return type:
None
- record_execution_time(execution_time_ms)[source]
Record query execution time for statistics
- Parameters:
execution_time_ms (float) – Execution time in milliseconds
- Return type:
None
- plan_logic_query(logic_query)[source]
Create execution plan from logic query DSL
This method parses a logic query (e.g., “Find(Person) WHERE age > 30”) and converts it directly to a QueryPlan.
- Parameters:
logic_query (str) – Logic query string in DSL format
- Returns:
QueryPlan if successful, List[ParserError] if errors occurred
- Return type:
Example:
plan = planner.plan_logic_query("Find(Person) WHERE age > 30") if isinstance(plan, list): # Parsing errors for error in plan: print(f"Error at line {error.line}: {error.message}") else: # Success - execute the plan result = await graph_store.execute_plan(plan)
- class aiecs.application.knowledge_graph.reasoning.ReasoningEngine[source]
Bases:
objectMulti-Hop Reasoning Engine
Executes query plans, collects evidence, and generates answers for complex multi-hop queries over knowledge graphs.
Features: - Execute query plans from QueryPlanner - Multi-hop path finding - Evidence collection and scoring - Path ranking by relevance - Answer generation from evidence
Example
```python engine = ReasoningEngine(graph_store)
# Reason over a query result = await engine.reason(
query=”What companies does Alice know people at?”, context={“start_entity_id”: “person_alice”}
)
print(f”Answer: {result.answer}”) print(f”Confidence: {result.confidence}”) print(f”Evidence: {result.evidence_count} pieces”) ```
- __init__(graph_store, query_planner=None)[source]
Initialize reasoning engine
- Parameters:
graph_store (GraphStore) – Graph storage backend
query_planner (QueryPlanner | None) – Query planner (creates one if not provided)
- async reason(query, context=None, max_hops=3, max_evidence=20)[source]
Perform multi-hop reasoning on a query
- Parameters:
- Returns:
Reasoning result with evidence and answer
- Return type:
ReasoningResult
- async find_multi_hop_paths(start_entity_id, target_entity_id=None, max_hops=3, relation_types=None, max_paths=10)[source]
Find multi-hop paths between entities
- Parameters:
- Returns:
List of paths found
- Return type:
List[Path]
- async collect_evidence_from_paths(paths, source='path_finding')[source]
Collect evidence from paths
- class aiecs.application.knowledge_graph.reasoning.InferenceEngine[source]
Bases:
objectRule-Based Inference Engine
Applies logical inference rules to infer new relations from existing ones.
Features: - Transitive inference (A->B, B->C => A->C) - Symmetric inference (A->B => B->A) - Custom inference rules - Result caching - Explainability (trace inference steps)
Example
```python engine = InferenceEngine(graph_store)
# Add rules engine.add_rule(InferenceRule(
rule_id=”transitive_works_for”, rule_type=RuleType.TRANSITIVE, relation_type=”WORKS_FOR”
))
# Infer relations result = await engine.infer_relations(
relation_type=”WORKS_FOR”, max_steps=3
)
print(f”Inferred {len(result.inferred_relations)} relations”) print(result.get_explanation_string()) ```
- __init__(graph_store, cache=None)[source]
Initialize inference engine
- Parameters:
graph_store (GraphStore) – Graph storage backend
cache (InferenceCache | None) – Optional inference cache (creates one if not provided)
- add_rule(rule)[source]
Add an inference rule
- Parameters:
rule (InferenceRule) – Inference rule to add
- Return type:
None
- remove_rule(rule_id)[source]
Remove an inference rule
- Parameters:
rule_id (str) – ID of rule to remove
- Return type:
None
- class aiecs.application.knowledge_graph.reasoning.InferenceCache[source]
Bases:
objectCache for inference results
Stores previously computed inference results to avoid recomputation.
- class aiecs.application.knowledge_graph.reasoning.EvidenceSynthesizer[source]
Bases:
objectEvidence Synthesizer
Combines evidence from multiple sources to create more robust conclusions.
Features: - Merge overlapping evidence - Calculate combined confidence - Detect contradictions - Synthesize explanations
Example
```python synthesizer = EvidenceSynthesizer()
# Combine evidence from different sources combined = synthesizer.synthesize_evidence([ev1, ev2, ev3])
# Get most reliable evidence reliable = synthesizer.filter_by_confidence(combined, threshold=0.7) ```
- __init__(confidence_threshold=0.5, contradiction_threshold=0.3)[source]
Initialize evidence synthesizer
- synthesize_evidence(evidence_list, method='weighted_average')[source]
Synthesize evidence from multiple sources
- filter_by_confidence(evidence_list, threshold=None)[source]
Filter evidence by confidence threshold
Knowledge Graph Search Application Layer
Advanced search strategies including hybrid search and text similarity utilities.
- class aiecs.application.knowledge_graph.search.HybridSearchStrategy[source]
Bases:
objectHybrid Search Strategy
Combines vector similarity search with graph structure traversal to provide enhanced search results that leverage both semantic similarity and structural relationships.
Search Modes: - VECTOR_ONLY: Pure vector similarity search - GRAPH_ONLY: Pure graph traversal from seed entities - HYBRID: Combines both approaches with weighted scoring
Example
```python strategy = HybridSearchStrategy(graph_store)
- config = HybridSearchConfig(
mode=SearchMode.HYBRID, vector_weight=0.6, graph_weight=0.4, max_results=10, expand_results=True
)
- results = await strategy.search(
query_embedding=[0.1, 0.2, …], config=config
)
- for entity, score in results:
print(f”{entity.id}: {score:.3f}”)
- __init__(graph_store)[source]
Initialize hybrid search strategy
- Parameters:
graph_store (GraphStore) – Graph storage backend
- async search(query_embedding, config=None, seed_entity_ids=None)[source]
Perform hybrid search
- Parameters:
config (HybridSearchConfig | None) – Search configuration (uses defaults if None)
seed_entity_ids (List[str] | None) – Optional seed entities for graph traversal
- Returns:
List of (entity, score) tuples sorted by score descending
- Return type:
- async search_with_expansion(query_embedding, config=None, include_paths=False)[source]
Search with result expansion and optional path tracking
- Parameters:
config (HybridSearchConfig | None) – Search configuration
include_paths (bool) – Whether to include paths to results
- Returns:
Tuple of (results, paths) where paths is None if not requested
- Return type:
- class aiecs.application.knowledge_graph.search.HybridSearchConfig[source]
Bases:
BaseModelConfiguration for hybrid search
- mode
Search mode (vector_only, graph_only, hybrid)
- mode: SearchMode
- model_config: ClassVar[ConfigDict] = {'use_enum_values': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class aiecs.application.knowledge_graph.search.SearchMode[source]
-
Search mode for hybrid search
- VECTOR_ONLY = 'vector_only'
- GRAPH_ONLY = 'graph_only'
- HYBRID = 'hybrid'
- __new__(value)
- class aiecs.application.knowledge_graph.search.TextSimilarity[source]
Bases:
objectConvenience class for text similarity operations
Provides a unified interface for various text similarity methods.
Example:
similarity = TextSimilarity() # Jaccard similarity score = similarity.jaccard("hello world", "world hello") # Cosine similarity score = similarity.cosine("machine learning", "deep learning") # Levenshtein distance distance = similarity.levenshtein("kitten", "sitting") # Fuzzy matching matches = similarity.fuzzy_match( "python", ["python3", "pyton", "java", "pythn"], threshold=0.7 )
- class aiecs.application.knowledge_graph.search.BM25Scorer[source]
Bases:
objectBM25 (Best Matching 25) scorer for text similarity
BM25 is a ranking function used to estimate the relevance of documents to a given search query. It’s an improvement over TF-IDF.
Example:
scorer = BM25Scorer(corpus=[ "The quick brown fox jumps over the lazy dog", "A quick brown dog jumps over a lazy fox", "The lazy dog sleeps all day" ]) scores = scorer.score("quick brown fox") # Returns scores for each document in corpus
- aiecs.application.knowledge_graph.search.jaccard_similarity(set1, set2)[source]
Calculate Jaccard similarity between two sets
Jaccard similarity = (size of intersection) / (size of union)
- aiecs.application.knowledge_graph.search.jaccard_similarity_text(text1, text2, tokenizer=None)[source]
Calculate Jaccard similarity between two text strings
- aiecs.application.knowledge_graph.search.cosine_similarity_text(text1, text2, tokenizer=None)[source]
Calculate cosine similarity between two text strings
Cosine similarity measures the cosine of the angle between two vectors in a multi-dimensional space. For text, vectors are TF-IDF representations.
- aiecs.application.knowledge_graph.search.levenshtein_distance(s1, s2)[source]
Calculate Levenshtein distance (edit distance) between two strings
Levenshtein distance is the minimum number of single-character edits (insertions, deletions, or substitutions) required to change one string into another.
- aiecs.application.knowledge_graph.search.normalized_levenshtein_similarity(s1, s2)[source]
Calculate normalized Levenshtein similarity (0.0 to 1.0)
- aiecs.application.knowledge_graph.search.fuzzy_match(query, candidates, threshold=0.6, method='jaccard')[source]
Find fuzzy matches for a query string in a list of candidates
- Parameters:
- Returns:
List of (candidate, similarity_score) tuples above threshold, sorted by score descending
- Return type:
- class aiecs.application.knowledge_graph.search.RerankerStrategy[source]
Bases:
ABCAbstract base class for reranking strategies
Each strategy computes a relevance score for entities given a query. Strategies can be combined using different combination methods.
Example:
class TextSimilarityReranker(RerankerStrategy): async def score( self, query: str, entities: List[Entity] ) -> List[float]: # Compute BM25 scores return scores
- class aiecs.application.knowledge_graph.search.ResultReranker[source]
Bases:
objectResult Reranker orchestrator
Combines multiple reranking strategies to improve search result relevance.
Example:
# Create strategies text_reranker = TextSimilarityReranker() semantic_reranker = SemanticReranker() # Create reranker reranker = ResultReranker( strategies=[text_reranker, semantic_reranker], combination_method=ScoreCombinationMethod.WEIGHTED_AVERAGE, weights={"text": 0.6, "semantic": 0.4} ) # Rerank results reranked = await reranker.rerank( query="machine learning", entities=search_results )
- __init__(strategies, combination_method=ScoreCombinationMethod.WEIGHTED_AVERAGE, weights=None, normalize_scores=True, normalization_method='min_max')[source]
Initialize ResultReranker
- Parameters:
strategies (List[RerankerStrategy]) – List of reranking strategies
combination_method (ScoreCombinationMethod) – Method for combining scores
weights (Dict[str, float] | None) – Optional weights for strategies (for weighted_average)
normalize_scores (bool) – Whether to normalize scores before combining
normalization_method (str) – Normalization method (“min_max”, “z_score”, “softmax”)
- class aiecs.application.knowledge_graph.search.ScoreCombinationMethod[source]
-
Methods for combining scores from multiple reranking strategies
- WEIGHTED_AVERAGE = 'weighted_average'
- RRF = 'rrf'
- MAX = 'max'
- MIN = 'min'
- __new__(value)
- aiecs.application.knowledge_graph.search.normalize_scores(scores, method='min_max')[source]
Normalize scores to [0.0, 1.0] range
- aiecs.application.knowledge_graph.search.combine_scores(score_dicts, method=ScoreCombinationMethod.WEIGHTED_AVERAGE, weights=None)[source]
Combine scores from multiple strategies
- Parameters:
- Returns:
combined_score}
- Return type:
Combined scores as {entity_id
- class aiecs.application.knowledge_graph.search.TextSimilarityReranker[source]
Bases:
RerankerStrategyText similarity reranker using BM25 and Jaccard similarity
Combines BM25 (term-based relevance) and Jaccard (set overlap) scores to rerank entities based on text similarity to query.
Example:
reranker = TextSimilarityReranker( bm25_weight=0.7, jaccard_weight=0.3 ) scores = await reranker.score("machine learning", entities)
- class aiecs.application.knowledge_graph.search.SemanticReranker[source]
Bases:
RerankerStrategySemantic reranker using vector cosine similarity
Uses entity embeddings to compute semantic similarity to query embedding.
Example:
reranker = SemanticReranker() scores = await reranker.score( query="machine learning", entities=entities, query_embedding=[0.1, 0.2, ...] )
- class aiecs.application.knowledge_graph.search.StructuralReranker[source]
Bases:
RerankerStrategyStructural reranker using graph centrality and PageRank
Scores entities based on their structural importance in the graph. Uses PageRank scores and degree centrality.
Example:
reranker = StructuralReranker(graph_store) scores = await reranker.score("query", entities)
- class aiecs.application.knowledge_graph.search.HybridReranker[source]
Bases:
RerankerStrategyHybrid reranker combining multiple signals
Combines text similarity, semantic similarity, and structural importance into a single score.
Example:
reranker = HybridReranker( graph_store=store, text_weight=0.4, semantic_weight=0.4, structural_weight=0.2 ) scores = await reranker.score( query="machine learning", entities=entities, query_embedding=[0.1, 0.2, ...] )
- __init__(graph_store, text_weight=0.4, semantic_weight=0.4, structural_weight=0.2)[source]
Initialize HybridReranker
- class aiecs.application.knowledge_graph.search.CrossEncoderReranker[source]
Bases:
RerankerStrategyCross-encoder reranker using transformer models (optional)
Uses a cross-encoder model to compute semantic relevance between query and entity text. More accurate but slower than bi-encoder.
Note: This is a placeholder implementation. For production use, integrate with a cross-encoder model library (e.g., sentence-transformers).
Example:
reranker = CrossEncoderReranker(model_name="cross-encoder/ms-marco-MiniLM-L-6-v2") scores = await reranker.score("machine learning", entities)