Application API

This section documents the application layer components.

Application Executors

Application executors module

Contains service executors and application-level coordination.

class aiecs.application.executors.OperationExecutor[source]

Bases: object

Core logic for handling operation execution

__init__(tool_executor, execution_utils, config)[source]
Parameters:
  • tool_executor (ToolExecutor)

  • execution_utils (ExecutionUtils)

  • config (Dict[str, Any])

async execute_operation(operation_spec, params)[source]

Execute a single operation (tool_name.operation_name)

Parameters:
Return type:

Any

async batch_execute_operations(operations)[source]

Batch execute operations with rate limiting

Parameters:

operations (List[Dict[str, Any]])

Return type:

List[Any]

async execute_operations_sequence(operations, user_id, task_id, stop_on_failure=False, save_callback=None)[source]

Execute operations sequence sequentially, with option to stop on failure

Parameters:
Return type:

List[TaskStepResult]

async batch_tool_calls(tool_calls, tool_executor_func=None)[source]

Execute batch tool calls with rate limiting

Parameters:

tool_calls (List[Dict])

Return type:

List[Any]

extract_tool_calls(description, input_data, context)[source]

Extract tool calls from description

Parameters:
Return type:

List[Dict]

async execute_parallel_operations(operations)[source]

Execute multiple operations in parallel

Parameters:

operations (List[Dict[str, Any]])

Return type:

List[TaskStepResult]

get_tool_instance(tool_name)[source]

Get tool instance

Parameters:

tool_name (str)

clear_tool_cache()[source]

Clear tool instance cache

get_stats()[source]

Get operation executor statistics

Return type:

Dict[str, Any]

Knowledge Graph

Note

The knowledge graph module contains multiple submodules. See individual submodule documentation for details.

Knowledge Graph Builder Pipeline

Orchestrates document-to-graph conversion workflow.

class aiecs.application.knowledge_graph.builder.GraphBuilder[source]

Bases: object

Main pipeline for building knowledge graphs from text

The pipeline: 1. Extract entities from text 2. Deduplicate entities 3. Link entities to existing graph 4. Extract relations between entities 5. Validate relations 6. Deduplicate relations 7. Store entities and relations in graph

Features: - Async/parallel processing - Progress callbacks - Error handling and recovery - Provenance tracking - Configurable components

Example:

# Initialize components
entity_extractor = LLMEntityExtractor(schema)
relation_extractor = LLMRelationExtractor(schema)

# Create builder
builder = GraphBuilder(
    graph_store=store,
    entity_extractor=entity_extractor,
    relation_extractor=relation_extractor,
    schema=schema
)

# Build graph from text
result = await builder.build_from_text(
    text="Alice works at Tech Corp.",
    source="document_1.pdf"
)

print(f"Added {result.entities_added} entities, {result.relations_added} relations")
__init__(graph_store, entity_extractor, relation_extractor, schema=None, enable_deduplication=True, enable_linking=True, enable_validation=True, progress_callback=None, embedding_client=None)[source]

Initialize graph builder

Parameters:
  • graph_store (GraphStore) – Graph storage to save entities/relations

  • entity_extractor (EntityExtractor) – Entity extractor to use

  • relation_extractor (RelationExtractor) – Relation extractor to use

  • schema (GraphSchema | None) – Optional schema for validation

  • enable_deduplication (bool) – Enable entity/relation deduplication

  • enable_linking (bool) – Enable linking to existing entities

  • enable_validation (bool) – Enable relation validation

  • progress_callback (Callable[[str, float], None] | None) – Optional callback for progress updates (message, progress_pct)

  • embedding_client (LLMClientProtocol | None) – Optional custom LLM client for generating embeddings

static from_config(graph_store, entity_extractor, relation_extractor, schema=None, enable_deduplication=True, enable_linking=True, enable_validation=True, progress_callback=None)[source]

Create GraphBuilder with embedding client resolved from configuration

This factory method automatically resolves the embedding client from the global Settings configuration using LLMClientFactory.

Parameters:
  • graph_store (GraphStore) – Graph storage to save entities/relations

  • entity_extractor (EntityExtractor) – Entity extractor to use

  • relation_extractor (RelationExtractor) – Relation extractor to use

  • schema (GraphSchema | None) – Optional schema for validation

  • enable_deduplication (bool) – Enable entity/relation deduplication

  • enable_linking (bool) – Enable linking to existing entities

  • enable_validation (bool) – Enable relation validation

  • progress_callback (Callable[[str, float], None] | None) – Optional callback for progress updates

Returns:

GraphBuilder instance with configured embedding client

Return type:

GraphBuilder

Example:

from aiecs.config import get_settings
from aiecs.llm.factory import LLMClientFactory

# Register custom embedding provider
LLMClientFactory.register_custom_provider("my_embedder", my_client)

# Set environment variable
os.environ["KG_EMBEDDING_PROVIDER"] = "my_embedder"

# Create builder with auto-resolved embedding client
builder = GraphBuilder.from_config(
    graph_store=store,
    entity_extractor=extractor,
    relation_extractor=rel_extractor
)
async build_from_text(text, source=None, metadata=None)[source]

Build knowledge graph from text

Parameters:
  • text (str) – Input text to process

  • source (str | None) – Optional source identifier (document name, URL, etc.)

  • metadata (Dict[str, Any] | None) – Optional metadata to attach to entities/relations

Returns:

BuildResult with statistics and errors

Return type:

BuildResult

async build_batch(texts, sources=None, parallel=True, max_parallel=5)[source]

Build graph from multiple texts in batch

Parameters:
  • texts (List[str]) – List of texts to process

  • sources (List[str] | None) – Optional list of source identifiers (same length as texts)

  • parallel (bool) – Process in parallel (default: True)

  • max_parallel (int) – Maximum parallel tasks (default: 5)

Returns:

List of BuildResult objects (one per text)

Return type:

List[BuildResult]

class aiecs.application.knowledge_graph.builder.DocumentGraphBuilder[source]

Bases: object

Build knowledge graphs from documents

Supports multiple document formats: - PDF - DOCX (Microsoft Word) - TXT (Plain text) - And more via AIECS DocumentParserTool

For large documents, automatically chunks text into manageable pieces.

Example

```python builder = DocumentGraphBuilder(

graph_builder=graph_builder, chunk_size=1000

)

result = await builder.build_from_document(“research_paper.pdf”)

print(f”Processed {result.total_chunks} chunks”) print(f”Added {result.total_entities_added} entities”) print(f”Added {result.total_relations_added} relations”) ```

__init__(graph_builder, chunk_size=2000, chunk_overlap=200, enable_chunking=True, parallel_chunks=True, max_parallel_chunks=3)[source]

Initialize document graph builder

Parameters:
  • graph_builder (GraphBuilder) – GraphBuilder instance for text processing

  • chunk_size (int) – Size of text chunks (in characters)

  • chunk_overlap (int) – Overlap between chunks

  • enable_chunking (bool) – Whether to chunk large documents

  • parallel_chunks (bool) – Process chunks in parallel

  • max_parallel_chunks (int) – Maximum parallel chunk processing

async build_from_document(document_path, metadata=None)[source]

Build knowledge graph from a document

Parameters:
  • document_path (str | Path) – Path to document file

  • metadata (Dict[str, Any] | None) – Optional metadata to attach to extracted entities/relations

Returns:

DocumentBuildResult with statistics

Return type:

DocumentBuildResult

async build_from_documents(document_paths, parallel=True, max_parallel=3)[source]

Build knowledge graph from multiple documents

Parameters:
  • document_paths (List[str | Path]) – List of document paths

  • parallel (bool) – Process documents in parallel

  • max_parallel (int) – Maximum parallel documents

Returns:

List of DocumentBuildResult objects

Return type:

List[DocumentBuildResult]

class aiecs.application.knowledge_graph.builder.TextChunker[source]

Bases: object

Split large texts into smaller chunks

Strategies: - Fixed size chunking (by character or token count) - Sentence-aware chunking (don’t break sentences) - Paragraph-aware chunking (preserve paragraphs) - Overlapping chunks (for context preservation)

Example

```python chunker = TextChunker(chunk_size=1000, overlap=100) chunks = chunker.chunk_text(long_document)

for chunk in chunks:

# Process each chunk separately result = await process(chunk.text)

```

__init__(chunk_size=1000, overlap=100, respect_sentences=True, respect_paragraphs=False, min_chunk_size=100)[source]

Initialize text chunker

Parameters:
  • chunk_size (int) – Target size for each chunk (in characters)

  • overlap (int) – Number of characters to overlap between chunks

  • respect_sentences (bool) – Try to break at sentence boundaries

  • respect_paragraphs (bool) – Try to break at paragraph boundaries

  • min_chunk_size (int) – Minimum chunk size (don’t create tiny chunks)

chunk_text(text, metadata=None)[source]

Split text into chunks

Parameters:
  • text (str) – Text to chunk

  • metadata (Dict[str, Any] | None) – Optional metadata to attach to chunks

Returns:

List of TextChunk objects

Return type:

List[TextChunk]

class aiecs.application.knowledge_graph.builder.SchemaMapping[source]

Bases: BaseModel

Schema mapping configuration

Defines how structured data (CSV, JSON) maps to knowledge graph entities and relations.

entity_mappings: List[EntityMapping]
relation_mappings: List[RelationMapping]
aggregations: List[EntityAggregation]
validation_config: Dict[str, Any] | None
description: str | None
validate_mapping()[source]

Validate mapping consistency

Returns:

List of validation error messages (empty if valid)

Return type:

List[str]

is_valid()[source]

Check if mapping is valid

Returns:

True if mapping is valid

Return type:

bool

get_entity_mapping(entity_type)[source]

Get entity mapping by entity type name

Parameters:

entity_type (str) – Entity type name

Returns:

Entity mapping or None if not found

Return type:

EntityMapping | None

get_relation_mapping(relation_type)[source]

Get relation mapping by relation type name

Parameters:

relation_type (str) – Relation type name

Returns:

Relation mapping or None if not found

Return type:

RelationMapping | None

get_aggregations(entity_type)[source]

Get aggregation configuration for entity type

Parameters:

entity_type (str) – Entity type name

Returns:

EntityAggregation or None if not found

Return type:

EntityAggregation | None

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class aiecs.application.knowledge_graph.builder.EntityMapping[source]

Bases: BaseModel

Entity mapping configuration

Maps source data columns to an entity type with property transformations.

source_columns: List[str]
entity_type: str
property_mapping: Dict[str, str]
transformations: List[PropertyTransformation]
id_column: str | None
classmethod validate_source_columns(v)[source]

Validate source columns are not empty

Parameters:

v (List[str])

Return type:

List[str]

map_row_to_entity(row, entity_id=None)[source]

Map a data row to entity properties

Parameters:
  • row (Dict[str, Any]) – Dictionary of column name -> value

  • entity_id (str | None) – Optional entity ID (if not provided, will use id_column or generate)

Returns:

Dictionary with entity properties

Return type:

Dict[str, Any]

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class aiecs.application.knowledge_graph.builder.RelationMapping[source]

Bases: BaseModel

Relation mapping configuration

Maps source data columns to a relation type between entities.

source_columns: List[str]
relation_type: str
source_entity_column: str
target_entity_column: str
property_mapping: Dict[str, str]
transformations: List[PropertyTransformation]
classmethod validate_source_columns(v)[source]

Validate source columns are not empty

Parameters:

v (List[str])

Return type:

List[str]

classmethod validate_entity_columns(v)[source]

Validate entity column names are provided

Parameters:

v (str)

Return type:

str

map_row_to_relation(row)[source]

Map a data row to relation properties

Parameters:

row (Dict[str, Any]) – Dictionary of column name -> value

Returns:

Dictionary with relation properties (source_id, target_id, type, properties)

Return type:

Dict[str, Any]

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class aiecs.application.knowledge_graph.builder.PropertyTransformation[source]

Bases: BaseModel

Property transformation configuration

Defines how a source column/value is transformed into a target property.

transformation_type: TransformationType
source_column: str | None
target_property: str
target_type: PropertyType | None
constant_value: Any | None
compute_function: str | None
compute_args: List[str] | None
classmethod validate_transformation_type(v)[source]

Validate transformation type

Parameters:

v (TransformationType)

Return type:

TransformationType

apply(row)[source]

Apply transformation to a data row

Parameters:

row (Dict[str, Any]) – Dictionary of column name -> value

Returns:

Transformed value for target property

Return type:

Any

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class aiecs.application.knowledge_graph.builder.TransformationType[source]

Bases: str, Enum

Types of property transformations

RENAME = 'rename'
TYPE_CAST = 'type_cast'
COMPUTE = 'compute'
CONSTANT = 'constant'
SKIP = 'skip'
__new__(value)
class aiecs.application.knowledge_graph.builder.StructuredDataPipeline[source]

Bases: object

Pipeline for importing structured data (CSV, JSON, SPSS, Excel) into knowledge graphs

Uses SchemaMapping to map source data columns to entity and relation types. Supports batch processing, progress tracking, and error handling.

Example

```python # Define schema mapping mapping = SchemaMapping(

entity_mappings=[
EntityMapping(

source_columns=[“id”, “name”, “age”], entity_type=”Person”, property_mapping={“id”: “id”, “name”: “name”, “age”: “age”}

)

], relation_mappings=[

RelationMapping(

source_columns=[“person_id”, “company_id”], relation_type=”WORKS_FOR”, source_entity_column=”person_id”, target_entity_column=”company_id”

)

]

)

# Create pipeline pipeline = StructuredDataPipeline(

mapping=mapping, graph_store=store

)

# Import CSV result = await pipeline.import_from_csv(“employees.csv”) print(f”Added {result.entities_added} entities, {result.relations_added} relations”) ```

__init__(mapping, graph_store, batch_size=100, progress_callback=None, skip_errors=True, enable_parallel=False, max_workers=None, auto_tune_batch_size=False, enable_streaming=False, use_bulk_writes=True, track_performance=True)[source]

Initialize structured data pipeline

Parameters:
  • mapping (SchemaMapping) – Schema mapping configuration

  • graph_store (GraphStore) – Graph storage to save entities/relations

  • batch_size (int) – Number of rows to process in each batch (ignored if auto_tune_batch_size=True)

  • progress_callback (Callable[[str, float], None] | None) – Optional callback for progress updates (message, progress_pct)

  • skip_errors (bool) – Whether to skip rows with errors and continue processing

  • enable_parallel (bool) – Enable parallel batch processing for faster imports

  • max_workers (int | None) – Maximum number of parallel workers (default: CPU count - 1)

  • auto_tune_batch_size (bool) – Automatically tune batch size based on system resources

  • enable_streaming (bool) – Enable streaming mode for large files (memory-efficient)

  • use_bulk_writes (bool) – Use bulk write operations for better performance

  • track_performance (bool) – Track detailed performance metrics

static infer_schema_from_csv(file_path, encoding='utf-8', sample_size=1000)[source]

Infer schema mapping from CSV file

Analyzes CSV structure and content to automatically generate schema mappings.

Parameters:
  • file_path (str | Path) – Path to CSV file

  • encoding (str) – File encoding (default: utf-8)

  • sample_size (int) – Number of rows to sample for inference (default: 1000)

Returns:

InferredSchema with entity and relation mappings

Return type:

InferredSchema

Example

```python # Infer schema from CSV inferred = StructuredDataPipeline.infer_schema_from_csv(“data.csv”)

# Review and modify if needed print(f”Inferred entity types: {[em.entity_type for em in inferred.entity_mappings]}”) print(f”Warnings: {inferred.warnings}”)

# Use inferred schema mapping = inferred.to_schema_mapping() pipeline = StructuredDataPipeline(mapping, graph_store) ```

static infer_schema_from_spss(file_path, encoding='utf-8', sample_size=1000)[source]

Infer schema mapping from SPSS file

Uses SPSS variable labels and value labels to generate schema mappings.

Parameters:
  • file_path (str | Path) – Path to SPSS file

  • encoding (str) – File encoding (default: utf-8)

  • sample_size (int) – Number of rows to sample for inference (default: 1000)

Returns:

InferredSchema with entity and relation mappings

Return type:

InferredSchema

static infer_schema_from_dataframe(df, entity_type_hint=None, metadata=None, sample_size=1000)[source]

Infer schema mapping from pandas DataFrame

Parameters:
  • df (pd.DataFrame) – DataFrame to analyze

  • entity_type_hint (str | None) – Optional hint for entity type name

  • metadata (Dict[str, Any] | None) – Optional metadata (e.g., SPSS variable labels)

  • sample_size (int) – Number of rows to sample for inference (default: 1000)

Returns:

InferredSchema with entity and relation mappings

Return type:

InferredSchema

static create_with_auto_reshape(file_path, graph_store, entity_type_hint=None, reshape_threshold=50, **kwargs)[source]

Create pipeline with automatic reshaping for wide format data

Detects wide format data and automatically reshapes to normalized structure before creating the pipeline.

Parameters:
  • file_path (str | Path) – Path to data file (CSV, SPSS, Excel)

  • graph_store (GraphStore) – Graph storage to save entities/relations

  • entity_type_hint (str | None) – Optional hint for entity type name

  • reshape_threshold (int) – Minimum columns to trigger reshaping (default: 50)

  • **kwargs – Additional arguments for StructuredDataPipeline

Returns:

StructuredDataPipeline configured for the data

Return type:

StructuredDataPipeline

Example

```python # Automatically detect and reshape wide format data pipeline = StructuredDataPipeline.create_with_auto_reshape(

“wide_data.csv”, graph_store, entity_type_hint=”Sample”

)

# Import reshaped data result = await pipeline.import_from_csv(“wide_data.csv”) ```

async import_from_csv(file_path, encoding='utf-8', delimiter=',', header=True)[source]

Import data from CSV file

Parameters:
  • file_path (str | Path) – Path to CSV file

  • encoding (str) – File encoding (default: utf-8)

  • delimiter (str) – CSV delimiter (default: comma)

  • header (bool) – Whether file has header row (default: True)

Returns:

ImportResult with statistics

Return type:

ImportResult

async import_from_json(file_path, encoding='utf-8', array_key=None)[source]

Import data from JSON file

Supports: - Array of objects: [{“id”: 1, “name”: “Alice”}, …] - Object with array: {“items”: [{“id”: 1, …}, …]} - Single object: {“id”: 1, “name”: “Alice”}

Parameters:
  • file_path (str | Path) – Path to JSON file

  • encoding (str) – File encoding (default: utf-8)

  • array_key (str | None) – If JSON is object with array, key containing the array

Returns:

ImportResult with statistics

Return type:

ImportResult

async import_from_csv_streaming(file_path, encoding='utf-8', delimiter=',', chunk_size=10000)[source]

Import data from CSV file using streaming mode.

Memory-efficient import for large files (>1GB). Reads file in chunks without loading entire file into memory.

Parameters:
  • file_path (str | Path) – Path to CSV file

  • encoding (str) – File encoding (default: utf-8)

  • delimiter (str) – CSV delimiter (default: comma)

  • chunk_size (int) – Number of rows per chunk (default: 10000)

Returns:

ImportResult with statistics and performance metrics

Return type:

ImportResult

async import_from_spss(file_path, encoding='utf-8', preserve_metadata=True)[source]

Import data from SPSS file (.sav, .por)

Uses pyreadstat library to read SPSS files and extract metadata. SPSS variable labels and value labels are preserved as entity properties.

Parameters:
  • file_path (str | Path) – Path to SPSS file (.sav or .por)

  • encoding (str) – File encoding (default: utf-8)

  • preserve_metadata (bool) – Whether to preserve SPSS metadata (variable labels, value labels)

Returns:

ImportResult with statistics

Return type:

ImportResult

async import_from_excel(file_path, sheet_name=0, encoding='utf-8', header=True)[source]

Import data from Excel file (.xlsx, .xls)

Supports importing from specific sheets or all sheets.

Parameters:
  • file_path (str | Path) – Path to Excel file

  • sheet_name (str | int | None) – Sheet name (str), sheet index (int), or None for all sheets (default: 0 = first sheet)

  • encoding (str) – File encoding (default: utf-8)

  • header (bool) – Whether file has header row (default: True)

Returns:

ImportResult with statistics

Return type:

ImportResult

async reshape_and_import_csv(file_path, id_vars=None, value_vars=None, var_name='variable', value_name='value', entity_type_hint=None, encoding='utf-8')[source]

Reshape wide format CSV to normalized structure and import

Automatically converts wide format data (many columns) to long format (normalized structure) before importing into the graph.

Parameters:
  • file_path (str | Path) – Path to CSV file

  • id_vars (List[str] | None) – Columns to use as identifiers (auto-detected if None)

  • value_vars (List[str] | None) – Columns to unpivot (auto-detected if None)

  • var_name (str) – Name for variable column (default: ‘variable’)

  • value_name (str) – Name for value column (default: ‘value’)

  • entity_type_hint (str | None) – Optional hint for entity type name

  • encoding (str) – File encoding (default: utf-8)

Returns:

ImportResult with statistics

Return type:

ImportResult

Example

```python # Wide format: sample_id, option1, option2, …, option200 # Will be reshaped to: sample_id, variable, value

result = await pipeline.reshape_and_import_csv(

“wide_data.csv”, id_vars=[‘sample_id’], var_name=’option_name’, value_name=’option_value’

class aiecs.application.knowledge_graph.builder.ImportResult[source]

Bases: object

Result of structured data import operation

success

Whether import completed successfully

Type:

bool

entities_added

Number of entities added to graph

Type:

int

relations_added

Number of relations added to graph

Type:

int

rows_processed

Number of rows processed

Type:

int

rows_failed

Number of rows that failed to process

Type:

int

errors

List of errors encountered

Type:

List[str]

warnings

List of warnings

Type:

List[str]

quality_report

Data quality validation report (if validation enabled)

Type:

aiecs.application.knowledge_graph.builder.data_quality.QualityReport | None

start_time

When import started

Type:

datetime.datetime | None

end_time

When import ended

Type:

datetime.datetime | None

duration_seconds

Total duration in seconds

Type:

float

performance_metrics

Detailed performance metrics (if tracking enabled)

Type:

aiecs.application.knowledge_graph.builder.import_optimizer.PerformanceMetrics | None

success: bool = True
entities_added: int = 0
relations_added: int = 0
rows_processed: int = 0
rows_failed: int = 0
errors: List[str]
warnings: List[str]
quality_report: QualityReport | None = None
start_time: datetime | None = None
end_time: datetime | None = None
duration_seconds: float = 0.0
performance_metrics: PerformanceMetrics | None = None
__init__(success=True, entities_added=0, relations_added=0, rows_processed=0, rows_failed=0, errors=<factory>, warnings=<factory>, quality_report=None, start_time=None, end_time=None, duration_seconds=0.0, performance_metrics=None)
Parameters:
  • success (bool)

  • entities_added (int)

  • relations_added (int)

  • rows_processed (int)

  • rows_failed (int)

  • errors (List[str])

  • warnings (List[str])

  • quality_report (QualityReport | None)

  • start_time (datetime | None)

  • end_time (datetime | None)

  • duration_seconds (float)

  • performance_metrics (PerformanceMetrics | None)

Return type:

None

Knowledge Graph Reasoning Module

Provides query planning, multi-hop reasoning, and inference capabilities.

class aiecs.application.knowledge_graph.reasoning.QueryPlanner[source]

Bases: object

Query Planning Engine

Translates natural language queries into structured, optimized execution plans.

Features: - Natural language to graph query translation - Query decomposition (complex queries → multiple steps) - Query optimization (reorder operations for efficiency) - Cost estimation

Example:

planner = QueryPlanner(graph_store)

# Plan a complex query
plan = planner.plan_query(
    "Who works at companies that Alice knows people at?"
)

# Optimize the plan
optimized_plan = planner.optimize_plan(
    plan,
    strategy=OptimizationStrategy.MINIMIZE_COST
)
__init__(graph_store, enable_advanced_optimization=True, schema=None)[source]

Initialize query planner

Parameters:
  • graph_store (GraphStore) – Graph storage backend for queries

  • enable_advanced_optimization (bool) – Enable advanced query optimization (default: True)

  • schema (Any | None) – Optional schema manager for logic query validation

plan_query(natural_language_query, context=None)[source]

Create an execution plan from natural language query

Parameters:
  • natural_language_query (str) – Natural language query string

  • context (Dict[str, Any] | None) – Optional context (e.g., embeddings, entity IDs)

Returns:

Query execution plan

Return type:

QueryPlan

Example:

plan = planner.plan_query(
    "Find papers similar to 'Deep Learning' and their authors"
)
optimize_plan(plan, strategy=OptimizationStrategy.BALANCED)[source]

Optimize query execution plan

Parameters:
  • plan (QueryPlan) – Original query plan

  • strategy (OptimizationStrategy) – Optimization strategy

Returns:

Optimized query plan

Return type:

QueryPlan

Example:

optimized = planner.optimize_plan(
    plan,
    strategy=OptimizationStrategy.MINIMIZE_COST
)
translate_to_graph_query(natural_language_query, context=None)[source]

Translate natural language to a single graph query

For simple queries that don’t need decomposition.

Parameters:
  • natural_language_query (str) – Natural language query

  • context (Dict[str, Any] | None) – Query context (embeddings, entity IDs, etc.)

Returns:

Single graph query

Return type:

GraphQuery

Example:

query = planner.translate_to_graph_query(
    "Find entities similar to X",
    context={"query_embedding": [0.1, 0.2, ...]}
)
update_statistics()[source]

Update query statistics from graph store

Call this periodically to keep optimizer statistics up-to-date

Return type:

None

record_execution_time(execution_time_ms)[source]

Record query execution time for statistics

Parameters:

execution_time_ms (float) – Execution time in milliseconds

Return type:

None

get_optimizer_stats()[source]

Get optimizer statistics

Returns:

Dictionary with optimizer statistics

Return type:

Dict[str, Any]

plan_logic_query(logic_query)[source]

Create execution plan from logic query DSL

This method parses a logic query (e.g., “Find(Person) WHERE age > 30”) and converts it directly to a QueryPlan.

Parameters:

logic_query (str) – Logic query string in DSL format

Returns:

QueryPlan if successful, List[ParserError] if errors occurred

Return type:

QueryPlan | List[Any]

Example:

plan = planner.plan_logic_query("Find(Person) WHERE age > 30")

if isinstance(plan, list):
    # Parsing errors
    for error in plan:
        print(f"Error at line {error.line}: {error.message}")
else:
    # Success - execute the plan
    result = await graph_store.execute_plan(plan)
supports_logic_queries()[source]

Check if logic query support is available

Returns:

True if logic queries are supported, False otherwise

Return type:

bool

class aiecs.application.knowledge_graph.reasoning.ReasoningEngine[source]

Bases: object

Multi-Hop Reasoning Engine

Executes query plans, collects evidence, and generates answers for complex multi-hop queries over knowledge graphs.

Features: - Execute query plans from QueryPlanner - Multi-hop path finding - Evidence collection and scoring - Path ranking by relevance - Answer generation from evidence

Example

```python engine = ReasoningEngine(graph_store)

# Reason over a query result = await engine.reason(

query=”What companies does Alice know people at?”, context={“start_entity_id”: “person_alice”}

)

print(f”Answer: {result.answer}”) print(f”Confidence: {result.confidence}”) print(f”Evidence: {result.evidence_count} pieces”) ```

__init__(graph_store, query_planner=None)[source]

Initialize reasoning engine

Parameters:
  • graph_store (GraphStore) – Graph storage backend

  • query_planner (QueryPlanner | None) – Query planner (creates one if not provided)

async reason(query, context=None, max_hops=3, max_evidence=20)[source]

Perform multi-hop reasoning on a query

Parameters:
  • query (str) – Natural language query

  • context (Dict[str, Any] | None) – Query context (entity IDs, embeddings, etc.)

  • max_hops (int) – Maximum number of hops for traversal

  • max_evidence (int) – Maximum number of evidence pieces to collect

Returns:

Reasoning result with evidence and answer

Return type:

ReasoningResult

async find_multi_hop_paths(start_entity_id, target_entity_id=None, max_hops=3, relation_types=None, max_paths=10)[source]

Find multi-hop paths between entities

Parameters:
  • start_entity_id (str) – Starting entity ID

  • target_entity_id (str | None) – Target entity ID (None for all reachable)

  • max_hops (int) – Maximum number of hops

  • relation_types (List[str] | None) – Allowed relation types (None for all)

  • max_paths (int) – Maximum number of paths to return

Returns:

List of paths found

Return type:

List[Path]

async collect_evidence_from_paths(paths, source='path_finding')[source]

Collect evidence from paths

Parameters:
  • paths (List[Path]) – List of paths to extract evidence from

  • source (str) – Source identifier for the evidence

Returns:

List of evidence pieces

Return type:

List[Evidence]

rank_evidence(evidence, ranking_method='combined_score')[source]

Rank evidence by relevance

Parameters:
  • evidence (List[Evidence]) – List of evidence to rank

  • ranking_method (str) – Method to use for ranking - “combined_score”: confidence * relevance - “confidence”: confidence only - “relevance”: relevance only

Returns:

Ranked evidence list

Return type:

List[Evidence]

class aiecs.application.knowledge_graph.reasoning.InferenceEngine[source]

Bases: object

Rule-Based Inference Engine

Applies logical inference rules to infer new relations from existing ones.

Features: - Transitive inference (A->B, B->C => A->C) - Symmetric inference (A->B => B->A) - Custom inference rules - Result caching - Explainability (trace inference steps)

Example

```python engine = InferenceEngine(graph_store)

# Add rules engine.add_rule(InferenceRule(

rule_id=”transitive_works_for”, rule_type=RuleType.TRANSITIVE, relation_type=”WORKS_FOR”

))

# Infer relations result = await engine.infer_relations(

relation_type=”WORKS_FOR”, max_steps=3

)

print(f”Inferred {len(result.inferred_relations)} relations”) print(result.get_explanation_string()) ```

__init__(graph_store, cache=None)[source]

Initialize inference engine

Parameters:
  • graph_store (GraphStore) – Graph storage backend

  • cache (InferenceCache | None) – Optional inference cache (creates one if not provided)

add_rule(rule)[source]

Add an inference rule

Parameters:

rule (InferenceRule) – Inference rule to add

Return type:

None

remove_rule(rule_id)[source]

Remove an inference rule

Parameters:

rule_id (str) – ID of rule to remove

Return type:

None

get_rules(relation_type=None)[source]

Get inference rules

Parameters:

relation_type (str | None) – Filter by relation type (None = all)

Returns:

List of inference rules

Return type:

List[InferenceRule]

async infer_relations(relation_type, max_steps=10, source_id=None, target_id=None, use_cache=True)[source]

Infer relations using enabled rules

Parameters:
  • relation_type (str) – Relation type to infer

  • max_steps (int) – Maximum number of inference steps

  • source_id (str | None) – Optional source entity ID filter

  • target_id (str | None) – Optional target entity ID filter

  • use_cache (bool) – Whether to use cache

Returns:

Inference result with inferred relations and steps

Return type:

InferenceResult

get_inference_trace(result)[source]

Get human-readable trace of inference steps

Parameters:

result (InferenceResult) – Inference result

Returns:

List of trace strings

Return type:

List[str]

class aiecs.application.knowledge_graph.reasoning.InferenceCache[source]

Bases: object

Cache for inference results

Stores previously computed inference results to avoid recomputation.

__init__(max_size=1000, ttl_seconds=None)[source]

Initialize inference cache

Parameters:
  • max_size (int) – Maximum number of cached entries

  • ttl_seconds (float | None) – Time-to-live in seconds (None = no expiration)

get(relation_type, source_id=None, target_id=None)[source]

Get cached inference result

Parameters:
  • relation_type (str) – Relation type

  • source_id (str | None) – Source entity ID

  • target_id (str | None) – Target entity ID

Returns:

Cached result or None

Return type:

InferenceResult | None

put(relation_type, result, source_id=None, target_id=None)[source]

Cache inference result

Parameters:
  • relation_type (str) – Relation type

  • result (InferenceResult) – Inference result to cache

  • source_id (str | None) – Source entity ID

  • target_id (str | None) – Target entity ID

Return type:

None

clear()[source]

Clear all cached results

Return type:

None

get_stats()[source]

Get cache statistics

Return type:

Dict[str, Any]

class aiecs.application.knowledge_graph.reasoning.EvidenceSynthesizer[source]

Bases: object

Evidence Synthesizer

Combines evidence from multiple sources to create more robust conclusions.

Features: - Merge overlapping evidence - Calculate combined confidence - Detect contradictions - Synthesize explanations

Example

```python synthesizer = EvidenceSynthesizer()

# Combine evidence from different sources combined = synthesizer.synthesize_evidence([ev1, ev2, ev3])

# Get most reliable evidence reliable = synthesizer.filter_by_confidence(combined, threshold=0.7) ```

__init__(confidence_threshold=0.5, contradiction_threshold=0.3)[source]

Initialize evidence synthesizer

Parameters:
  • confidence_threshold (float) – Minimum confidence for evidence

  • contradiction_threshold (float) – Threshold for detecting contradictions

synthesize_evidence(evidence_list, method='weighted_average')[source]

Synthesize evidence from multiple sources

Parameters:
  • evidence_list (List[Evidence]) – List of evidence to synthesize

  • method (str) – Synthesis method (“weighted_average”, “max”, “voting”)

Returns:

Synthesized evidence list

Return type:

List[Evidence]

filter_by_confidence(evidence_list, threshold=None)[source]

Filter evidence by confidence threshold

Parameters:
  • evidence_list (List[Evidence]) – List of evidence to filter

  • threshold (float | None) – Confidence threshold (uses default if None)

Returns:

Filtered evidence list

Return type:

List[Evidence]

detect_contradictions(evidence_list)[source]

Detect contradictions in evidence

Parameters:

evidence_list (List[Evidence]) – List of evidence to check

Returns:

List of detected contradictions

Return type:

List[Dict[str, Any]]

estimate_overall_confidence(evidence_list)[source]

Estimate overall confidence from evidence list

Considers: - Individual confidence scores - Agreement across evidence - Source diversity

Parameters:

evidence_list (List[Evidence]) – List of evidence

Returns:

Overall confidence score (0-1)

Return type:

float

rank_by_reliability(evidence_list)[source]

Rank evidence by reliability

Considers: - Confidence score - Relevance score - Source credibility

Parameters:

evidence_list (List[Evidence]) – List of evidence to rank

Returns:

Ranked evidence list (most reliable first)

Return type:

List[Evidence]

Knowledge Graph Search Application Layer

Advanced search strategies including hybrid search and text similarity utilities.

class aiecs.application.knowledge_graph.search.HybridSearchStrategy[source]

Bases: object

Hybrid Search Strategy

Combines vector similarity search with graph structure traversal to provide enhanced search results that leverage both semantic similarity and structural relationships.

Search Modes: - VECTOR_ONLY: Pure vector similarity search - GRAPH_ONLY: Pure graph traversal from seed entities - HYBRID: Combines both approaches with weighted scoring

Example

```python strategy = HybridSearchStrategy(graph_store)

config = HybridSearchConfig(

mode=SearchMode.HYBRID, vector_weight=0.6, graph_weight=0.4, max_results=10, expand_results=True

)

results = await strategy.search(

query_embedding=[0.1, 0.2, …], config=config

)

for entity, score in results:

print(f”{entity.id}: {score:.3f}”)

```

__init__(graph_store)[source]

Initialize hybrid search strategy

Parameters:

graph_store (GraphStore) – Graph storage backend

async search(query_embedding, config=None, seed_entity_ids=None)[source]

Perform hybrid search

Parameters:
  • query_embedding (List[float]) – Query vector embedding

  • config (HybridSearchConfig | None) – Search configuration (uses defaults if None)

  • seed_entity_ids (List[str] | None) – Optional seed entities for graph traversal

Returns:

List of (entity, score) tuples sorted by score descending

Return type:

List[Tuple[Entity, float]]

async search_with_expansion(query_embedding, config=None, include_paths=False)[source]

Search with result expansion and optional path tracking

Parameters:
  • query_embedding (List[float]) – Query vector

  • config (HybridSearchConfig | None) – Search configuration

  • include_paths (bool) – Whether to include paths to results

Returns:

Tuple of (results, paths) where paths is None if not requested

Return type:

Tuple[List[Tuple[Entity, float]], List[Path] | None]

class aiecs.application.knowledge_graph.search.HybridSearchConfig[source]

Bases: BaseModel

Configuration for hybrid search

mode

Search mode (vector_only, graph_only, hybrid)

Type:

aiecs.application.knowledge_graph.search.hybrid_search.SearchMode

vector_weight

Weight for vector similarity scores (0.0-1.0)

Type:

float

graph_weight

Weight for graph structure scores (0.0-1.0)

Type:

float

max_results

Maximum number of results to return

Type:

int

vector_threshold

Minimum similarity threshold for vector search

Type:

float

max_graph_depth

Maximum depth for graph traversal

Type:

int

expand_results

Whether to expand vector results with graph neighbors

Type:

bool

min_combined_score

Minimum combined score threshold

Type:

float

mode: SearchMode
vector_weight: float
graph_weight: float
max_results: int
vector_threshold: float
max_graph_depth: int
expand_results: bool
min_combined_score: float
entity_type_filter: str | None
class Config[source]

Bases: object

use_enum_values = True
model_config: ClassVar[ConfigDict] = {'use_enum_values': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class aiecs.application.knowledge_graph.search.SearchMode[source]

Bases: str, Enum

Search mode for hybrid search

VECTOR_ONLY = 'vector_only'
GRAPH_ONLY = 'graph_only'
HYBRID = 'hybrid'
__new__(value)
class aiecs.application.knowledge_graph.search.TextSimilarity[source]

Bases: object

Convenience class for text similarity operations

Provides a unified interface for various text similarity methods.

Example:

similarity = TextSimilarity()

# Jaccard similarity
score = similarity.jaccard("hello world", "world hello")

# Cosine similarity
score = similarity.cosine("machine learning", "deep learning")

# Levenshtein distance
distance = similarity.levenshtein("kitten", "sitting")

# Fuzzy matching
matches = similarity.fuzzy_match(
    "python",
    ["python3", "pyton", "java", "pythn"],
    threshold=0.7
)
__init__(tokenizer=None)[source]

Initialize TextSimilarity

Parameters:

tokenizer (Callable[[str], List[str]] | None) – Optional tokenizer function for text processing

jaccard(text1, text2)[source]

Calculate Jaccard similarity between two texts

Parameters:
Return type:

float

cosine(text1, text2)[source]

Calculate cosine similarity between two texts

Parameters:
Return type:

float

levenshtein(text1, text2)[source]

Calculate Levenshtein distance between two texts

Parameters:
Return type:

int

levenshtein_similarity(text1, text2)[source]

Calculate normalized Levenshtein similarity

Parameters:
Return type:

float

fuzzy_match(query, candidates, threshold=0.6, method='jaccard')[source]

Find fuzzy matches for a query

Parameters:
Return type:

List[Tuple[str, float]]

bm25(corpus, k1=1.5, b=0.75)[source]

Create a BM25 scorer for a corpus

Parameters:
Return type:

BM25Scorer

class aiecs.application.knowledge_graph.search.BM25Scorer[source]

Bases: object

BM25 (Best Matching 25) scorer for text similarity

BM25 is a ranking function used to estimate the relevance of documents to a given search query. It’s an improvement over TF-IDF.

Example:

scorer = BM25Scorer(corpus=[
    "The quick brown fox jumps over the lazy dog",
    "A quick brown dog jumps over a lazy fox",
    "The lazy dog sleeps all day"
])

scores = scorer.score("quick brown fox")
# Returns scores for each document in corpus
__init__(corpus, k1=1.5, b=0.75, tokenizer=None)[source]

Initialize BM25 scorer

Parameters:
  • corpus (List[str]) – List of documents to score against

  • k1 (float) – Term frequency saturation parameter (default: 1.5)

  • b (float) – Length normalization parameter (default: 0.75)

  • tokenizer (Callable[[str], List[str]] | None) – Optional tokenizer function (default: simple word split)

score(query)[source]

Score documents against query

Parameters:

query (str) – Query string

Returns:

List of BM25 scores for each document

Return type:

List[float]

get_top_n(query, n=10)[source]

Get top N documents by BM25 score

Parameters:
  • query (str) – Query string

  • n (int) – Number of top results to return

Returns:

List of (document_index, score) tuples, sorted by score descending

Return type:

List[Tuple[int, float]]

aiecs.application.knowledge_graph.search.jaccard_similarity(set1, set2)[source]

Calculate Jaccard similarity between two sets

Jaccard similarity = (size of intersection) / (size of union)

Parameters:
  • set1 (set) – First set

  • set2 (set) – Second set

Returns:

Jaccard similarity score (0.0 to 1.0)

Return type:

float

aiecs.application.knowledge_graph.search.jaccard_similarity_text(text1, text2, tokenizer=None)[source]

Calculate Jaccard similarity between two text strings

Parameters:
  • text1 (str) – First text string

  • text2 (str) – Second text string

  • tokenizer (Callable[[str], Any] | None) – Optional tokenizer function (default: word split)

Returns:

Jaccard similarity score (0.0 to 1.0)

Return type:

float

aiecs.application.knowledge_graph.search.cosine_similarity_text(text1, text2, tokenizer=None)[source]

Calculate cosine similarity between two text strings

Cosine similarity measures the cosine of the angle between two vectors in a multi-dimensional space. For text, vectors are TF-IDF representations.

Parameters:
  • text1 (str) – First text string

  • text2 (str) – Second text string

  • tokenizer (Callable[[str], List[str]] | None) – Optional tokenizer function (default: word split)

Returns:

Cosine similarity score (0.0 to 1.0)

Return type:

float

aiecs.application.knowledge_graph.search.levenshtein_distance(s1, s2)[source]

Calculate Levenshtein distance (edit distance) between two strings

Levenshtein distance is the minimum number of single-character edits (insertions, deletions, or substitutions) required to change one string into another.

Parameters:
  • s1 (str) – First string

  • s2 (str) – Second string

Returns:

Levenshtein distance (0 = identical, higher = more different)

Return type:

int

aiecs.application.knowledge_graph.search.normalized_levenshtein_similarity(s1, s2)[source]

Calculate normalized Levenshtein similarity (0.0 to 1.0)

Parameters:
  • s1 (str) – First string

  • s2 (str) – Second string

Returns:

Normalized similarity score (1.0 = identical, 0.0 = completely different)

Return type:

float

aiecs.application.knowledge_graph.search.fuzzy_match(query, candidates, threshold=0.6, method='jaccard')[source]

Find fuzzy matches for a query string in a list of candidates

Parameters:
  • query (str) – Query string to match

  • candidates (List[str]) – List of candidate strings

  • threshold (float) – Minimum similarity threshold (0.0 to 1.0)

  • method (str) – Similarity method (“jaccard”, “cosine”, “levenshtein”, “ratio”)

Returns:

List of (candidate, similarity_score) tuples above threshold, sorted by score descending

Return type:

List[Tuple[str, float]]

class aiecs.application.knowledge_graph.search.RerankerStrategy[source]

Bases: ABC

Abstract base class for reranking strategies

Each strategy computes a relevance score for entities given a query. Strategies can be combined using different combination methods.

Example:

class TextSimilarityReranker(RerankerStrategy):
    async def score(
        self,
        query: str,
        entities: List[Entity]
    ) -> List[float]:
        # Compute BM25 scores
        return scores
abstract property name: str

Strategy name for identification

abstract async score(query, entities, **kwargs)[source]

Compute relevance scores for entities

Parameters:
  • query (str) – Query text or context

  • entities (List[Entity]) – List of entities to score

  • **kwargs – Strategy-specific parameters

Returns:

List of scores (one per entity), same order as entities Scores should be in range [0.0, 1.0] for best results

Return type:

List[float]

class aiecs.application.knowledge_graph.search.ResultReranker[source]

Bases: object

Result Reranker orchestrator

Combines multiple reranking strategies to improve search result relevance.

Example:

# Create strategies
text_reranker = TextSimilarityReranker()
semantic_reranker = SemanticReranker()

# Create reranker
reranker = ResultReranker(
    strategies=[text_reranker, semantic_reranker],
    combination_method=ScoreCombinationMethod.WEIGHTED_AVERAGE,
    weights={"text": 0.6, "semantic": 0.4}
)

# Rerank results
reranked = await reranker.rerank(
    query="machine learning",
    entities=search_results
)
__init__(strategies, combination_method=ScoreCombinationMethod.WEIGHTED_AVERAGE, weights=None, normalize_scores=True, normalization_method='min_max')[source]

Initialize ResultReranker

Parameters:
  • strategies (List[RerankerStrategy]) – List of reranking strategies

  • combination_method (ScoreCombinationMethod) – Method for combining scores

  • weights (Dict[str, float] | None) – Optional weights for strategies (for weighted_average)

  • normalize_scores (bool) – Whether to normalize scores before combining

  • normalization_method (str) – Normalization method (“min_max”, “z_score”, “softmax”)

async rerank(query, entities, top_k=None, **kwargs)[source]

Rerank entities using all strategies

Parameters:
  • query (str) – Query text or context

  • entities (List[Entity]) – List of entities to rerank

  • top_k (int | None) – Optional limit on number of results

  • **kwargs – Additional parameters passed to strategies

Returns:

List of (entity, combined_score) tuples, sorted by score descending

Return type:

List[Tuple[Entity, float]]

class aiecs.application.knowledge_graph.search.ScoreCombinationMethod[source]

Bases: str, Enum

Methods for combining scores from multiple reranking strategies

WEIGHTED_AVERAGE = 'weighted_average'
RRF = 'rrf'
MAX = 'max'
MIN = 'min'
__new__(value)
aiecs.application.knowledge_graph.search.normalize_scores(scores, method='min_max')[source]

Normalize scores to [0.0, 1.0] range

Parameters:
  • scores (List[float]) – Raw scores to normalize

  • method (str) – Normalization method (“min_max”, “z_score”, “softmax”)

Returns:

Normalized scores in [0.0, 1.0] range

Return type:

List[float]

aiecs.application.knowledge_graph.search.combine_scores(score_dicts, method=ScoreCombinationMethod.WEIGHTED_AVERAGE, weights=None)[source]

Combine scores from multiple strategies

Parameters:
  • score_dicts (List[Dict[str, float]]) – List of {entity_id: score} dictionaries from each strategy

  • method (ScoreCombinationMethod) – Combination method

  • weights (Dict[str, float] | None) – Optional weights for each strategy (for weighted_average)

Returns:

combined_score}

Return type:

Combined scores as {entity_id

class aiecs.application.knowledge_graph.search.TextSimilarityReranker[source]

Bases: RerankerStrategy

Text similarity reranker using BM25 and Jaccard similarity

Combines BM25 (term-based relevance) and Jaccard (set overlap) scores to rerank entities based on text similarity to query.

Example:

reranker = TextSimilarityReranker(
    bm25_weight=0.7,
    jaccard_weight=0.3
)
scores = await reranker.score("machine learning", entities)
__init__(bm25_weight=0.7, jaccard_weight=0.3, property_keys=None)[source]

Initialize TextSimilarityReranker

Parameters:
  • bm25_weight (float) – Weight for BM25 scores (0.0-1.0)

  • jaccard_weight (float) – Weight for Jaccard scores (0.0-1.0)

  • property_keys (List[str] | None) – Optional list of property keys to search (default: all string properties)

property name: str

Strategy name for identification

async score(query, entities, **kwargs)[source]

Compute text similarity scores

Parameters:
  • query (str) – Query text

  • entities (List[Entity]) – Entities to score

  • **kwargs – Additional parameters (ignored)

Returns:

List of scores (0.0-1.0)

Return type:

List[float]

class aiecs.application.knowledge_graph.search.SemanticReranker[source]

Bases: RerankerStrategy

Semantic reranker using vector cosine similarity

Uses entity embeddings to compute semantic similarity to query embedding.

Example:

reranker = SemanticReranker()
scores = await reranker.score(
    query="machine learning",
    entities=entities,
    query_embedding=[0.1, 0.2, ...]
)
__init__()[source]

Initialize SemanticReranker

property name: str

Strategy name for identification

async score(query, entities, query_embedding=None, **kwargs)[source]

Compute semantic similarity scores

Parameters:
  • query (str) – Query text (used for fallback if no embedding)

  • entities (List[Entity]) – Entities to score

  • query_embedding (List[float] | None) – Optional query embedding vector

  • **kwargs – Additional parameters

Returns:

List of scores (0.0-1.0)

Return type:

List[float]

class aiecs.application.knowledge_graph.search.StructuralReranker[source]

Bases: RerankerStrategy

Structural reranker using graph centrality and PageRank

Scores entities based on their structural importance in the graph. Uses PageRank scores and degree centrality.

Example:

reranker = StructuralReranker(graph_store)
scores = await reranker.score("query", entities)
__init__(graph_store, pagerank_weight=0.7, degree_weight=0.3, use_cached_scores=True)[source]

Initialize StructuralReranker

Parameters:
  • graph_store (GraphStore) – Graph storage backend

  • pagerank_weight (float) – Weight for PageRank scores (0.0-1.0)

  • degree_weight (float) – Weight for degree centrality (0.0-1.0)

  • use_cached_scores (bool) – Whether to cache PageRank scores

property name: str

Strategy name for identification

async score(query, entities, **kwargs)[source]

Compute structural importance scores

Parameters:
  • query (str) – Query text (not used, but required by interface)

  • entities (List[Entity]) – Entities to score

  • **kwargs – Additional parameters

Returns:

List of scores (0.0-1.0)

Return type:

List[float]

class aiecs.application.knowledge_graph.search.HybridReranker[source]

Bases: RerankerStrategy

Hybrid reranker combining multiple signals

Combines text similarity, semantic similarity, and structural importance into a single score.

Example:

reranker = HybridReranker(
    graph_store=store,
    text_weight=0.4,
    semantic_weight=0.4,
    structural_weight=0.2
)
scores = await reranker.score(
    query="machine learning",
    entities=entities,
    query_embedding=[0.1, 0.2, ...]
)
__init__(graph_store, text_weight=0.4, semantic_weight=0.4, structural_weight=0.2)[source]

Initialize HybridReranker

Parameters:
  • graph_store (GraphStore) – Graph storage backend

  • text_weight (float) – Weight for text similarity (0.0-1.0)

  • semantic_weight (float) – Weight for semantic similarity (0.0-1.0)

  • structural_weight (float) – Weight for structural importance (0.0-1.0)

property name: str

Strategy name for identification

async score(query, entities, query_embedding=None, **kwargs)[source]

Compute hybrid scores combining all signals

Parameters:
  • query (str) – Query text

  • entities (List[Entity]) – Entities to score

  • query_embedding (List[float] | None) – Optional query embedding vector

  • **kwargs – Additional parameters

Returns:

List of scores (0.0-1.0)

Return type:

List[float]

class aiecs.application.knowledge_graph.search.CrossEncoderReranker[source]

Bases: RerankerStrategy

Cross-encoder reranker using transformer models (optional)

Uses a cross-encoder model to compute semantic relevance between query and entity text. More accurate but slower than bi-encoder.

Note: This is a placeholder implementation. For production use, integrate with a cross-encoder model library (e.g., sentence-transformers).

Example:

reranker = CrossEncoderReranker(model_name="cross-encoder/ms-marco-MiniLM-L-6-v2")
scores = await reranker.score("machine learning", entities)
__init__(model_name=None, use_gpu=False)[source]

Initialize CrossEncoderReranker

Parameters:
  • model_name (str | None) – Optional model name (default: None, uses placeholder)

  • use_gpu (bool) – Whether to use GPU (if available)

property name: str

Strategy name for identification

async score(query, entities, **kwargs)[source]

Compute cross-encoder scores

Parameters:
  • query (str) – Query text

  • entities (List[Entity]) – Entities to score

  • **kwargs – Additional parameters

Returns:

List of scores (0.0-1.0)

Return type:

List[float]