Knowledge Graph API Reference
Overview
This document provides a comprehensive API reference for all AIECS knowledge graph features, including domain models, storage interfaces, application services, and tools.
Table of Contents
Domain Models
Entity
Represents a node in the knowledge graph.
Module: aiecs.domain.knowledge_graph.models.entity
from aiecs.domain.knowledge_graph.models.entity import Entity
entity = Entity(
id="person_1",
entity_type="Person",
properties={
"name": "Alice Smith",
"age": 30,
"email": "alice@example.com"
},
metadata={
"source": "document_1",
"confidence": 0.95,
"created_at": "2025-11-15T10:00:00Z"
}
)
Fields:
id(str): Unique identifier for the entityentity_type(str): Type of entity (e.g., “Person”, “Company”, “Product”)properties(Dict[str, Any]): Entity attributes and valuesmetadata(Optional[Dict[str, Any]]): Additional metadata (source, confidence, timestamps)
Methods:
get_property(key: str, default: Any = None) -> Any: Get property value with defaultset_property(key: str, value: Any) -> None: Set property valuehas_property(key: str) -> bool: Check if property exists
Relation
Represents an edge connecting two entities in the knowledge graph.
Module: aiecs.domain.knowledge_graph.models.relation
from aiecs.domain.knowledge_graph.models.relation import Relation
relation = Relation(
id="rel_1",
relation_type="WORKS_FOR",
source_id="person_1",
target_id="company_1",
properties={
"role": "Engineer",
"start_date": "2020-01-01",
"department": "Engineering"
},
metadata={
"source": "document_1",
"confidence": 0.90
}
)
Fields:
id(str): Unique identifier for the relationrelation_type(str): Type of relation (e.g., “WORKS_FOR”, “KNOWS”, “LOCATED_IN”)source_id(str): ID of the source entitytarget_id(str): ID of the target entityproperties(Dict[str, Any]): Relation attributesmetadata(Optional[Dict[str, Any]]): Additional metadata
Methods:
reverse() -> Relation: Create a reversed relation (swap source and target)get_property(key: str, default: Any = None) -> Any: Get property value
Path
Represents a sequence of entities and relations forming a path in the graph.
Module: aiecs.domain.knowledge_graph.models.path
from aiecs.domain.knowledge_graph.models.path import Path
path = Path(
entities=[entity1, entity2, entity3],
relations=[relation1, relation2],
score=0.85,
metadata={"reasoning": "multi-hop inference"}
)
Fields:
entities(List[Entity]): Ordered list of entities in the pathrelations(List[Relation]): Ordered list of relations connecting entitiesscore(Optional[float]): Path relevance or confidence scoremetadata(Optional[Dict[str, Any]]): Additional path metadata
Methods:
length() -> int: Get path length (number of hops)get_entity_ids() -> List[str]: Get list of entity IDs in pathcontains_entity(entity_id: str) -> bool: Check if entity is in pathreverse() -> Path: Create reversed path
GraphQuery and GraphResult
Query specification and result container for graph operations.
Module: aiecs.domain.knowledge_graph.models.query
from aiecs.domain.knowledge_graph.models.query import (
GraphQuery,
GraphResult,
QueryType
)
# Create query
query = GraphQuery(
query_type=QueryType.HYBRID,
parameters={
"query_text": "machine learning experts",
"entity_types": ["Person"],
"top_k": 10
},
filters={
"properties.experience": {"$gte": 5}
}
)
# Result
result = GraphResult(
entities=[entity1, entity2],
relations=[relation1],
paths=[path1, path2],
metadata={
"query_time_ms": 45,
"total_results": 2
}
)
GraphQuery Fields:
query_type(QueryType): Type of query (VECTOR, TRAVERSE, HYBRID, SUBGRAPH)parameters(Dict[str, Any]): Query-specific parametersfilters(Optional[Dict[str, Any]]): Filter conditions
GraphResult Fields:
entities(List[Entity]): Entities matching the queryrelations(List[Relation]): Relations in the resultpaths(List[Path]): Paths found (for traversal queries)metadata(Optional[Dict[str, Any]]): Query execution metadata
Evidence and ReasoningResult
Evidence-based reasoning support.
Module: aiecs.domain.knowledge_graph.models.evidence
from aiecs.domain.knowledge_graph.models.evidence import (
Evidence,
EvidenceType,
ReasoningResult
)
evidence = Evidence(
evidence_type=EvidenceType.PATH,
content=path,
confidence=0.85,
source="graph_traversal",
metadata={"hops": 2}
)
result = ReasoningResult(
answer="Alice works for Tech Corp",
evidence=[evidence1, evidence2],
confidence=0.90,
reasoning_steps=["Step 1: ...", "Step 2: ..."]
)
Evidence Fields:
evidence_type(EvidenceType): Type of evidence (PATH, ENTITY, RELATION, INFERENCE)content(Any): Evidence content (Path, Entity, Relation, etc.)confidence(float): Confidence score (0.0-1.0)source(str): Source of evidencemetadata(Optional[Dict[str, Any]]): Additional metadata
ReasoningResult Fields:
answer(str): Final answer or conclusionevidence(List[Evidence]): Supporting evidenceconfidence(float): Overall confidence scorereasoning_steps(List[str]): Step-by-step reasoning trace
InferenceRule
Logical inference rules for knowledge graph reasoning.
Module: aiecs.domain.knowledge_graph.models.inference_rule
from aiecs.domain.knowledge_graph.models.inference_rule import (
InferenceRule,
RuleType
)
rule = InferenceRule(
rule_id="transitivity_works_for",
rule_type=RuleType.TRANSITIVE,
name="Company Hierarchy",
description="If A works for B and B is subsidiary of C, then A works for C",
conditions=[
{"relation_type": "WORKS_FOR"},
{"relation_type": "SUBSIDIARY_OF"}
],
conclusion={
"relation_type": "WORKS_FOR",
"properties": {"inferred": True}
},
confidence=0.80
)
Fields:
rule_id(str): Unique rule identifierrule_type(RuleType): Type of rule (TRANSITIVE, SYMMETRIC, INVERSE, CUSTOM)name(str): Human-readable rule namedescription(str): Rule descriptionconditions(List[Dict[str, Any]]): Conditions that must be metconclusion(Dict[str, Any]): Inferred conclusionconfidence(float): Rule confidence score
Schema Management
SchemaManager
Manages entity types, relation types, and property schemas with validation.
Module: aiecs.domain.knowledge_graph.schema.schema_manager
from aiecs.domain.knowledge_graph.schema import (
SchemaManager,
EntityType,
RelationType,
PropertySchema,
PropertyType
)
manager = SchemaManager()
# Define entity type
person_type = EntityType(
name="Person",
description="A person entity",
properties={
"name": PropertySchema(
name="name",
property_type=PropertyType.STRING,
required=True
),
"age": PropertySchema(
name="age",
property_type=PropertyType.INTEGER,
min_value=0,
max_value=150
)
}
)
manager.create_entity_type(person_type)
# Validate entity
is_valid = manager.validate_entity("Person", {"name": "Alice", "age": 30})
Methods:
create_entity_type(entity_type: EntityType) -> None: Register entity typeget_entity_type(name: str) -> Optional[EntityType]: Get entity type by namecreate_relation_type(relation_type: RelationType) -> None: Register relation typeget_relation_type(name: str) -> Optional[RelationType]: Get relation type by namevalidate_entity(entity_type: str, properties: Dict) -> bool: Validate entity propertiesvalidate_relation(relation_type: str, source_type: str, target_type: str) -> bool: Validate relationlist_entity_types() -> List[str]: List all entity type nameslist_relation_types() -> List[str]: List all relation type names
EntityType and RelationType
Schema definitions for entities and relations.
Module: aiecs.domain.knowledge_graph.schema.types
from aiecs.domain.knowledge_graph.schema import EntityType, RelationType
# Entity type
entity_type = EntityType(
name="Company",
description="A business organization",
properties={...},
metadata={"version": "1.0"}
)
# Relation type
relation_type = RelationType(
name="WORKS_FOR",
description="Employment relationship",
source_entity_types=["Person"],
target_entity_types=["Company"],
properties={...}
)
PropertySchema
Property definition with validation rules.
Module: aiecs.domain.knowledge_graph.schema.property_schema
from aiecs.domain.knowledge_graph.schema import PropertySchema, PropertyType
property_schema = PropertySchema(
name="salary",
property_type=PropertyType.FLOAT,
required=False,
min_value=0.0,
max_value=1000000.0,
description="Annual salary in USD"
)
PropertyType Enum:
STRING: Text valuesINTEGER: Whole numbersFLOAT: Decimal numbersBOOLEAN: True/False valuesDATE: Date valuesDATETIME: Date and time valuesLIST: List of valuesDICT: Dictionary/object values
Storage Interfaces
GraphStore (Two-Tier Interface)
Abstract base class for all graph storage backends.
Module: aiecs.infrastructure.graph_storage.base
from aiecs.infrastructure.graph_storage.base import GraphStore
class CustomGraphStore(GraphStore):
# Implement Tier 1 methods (required)
async def initialize(self) -> None: ...
async def close(self) -> None: ...
async def add_entity(self, entity: Entity) -> None: ...
async def get_entity(self, entity_id: str) -> Optional[Entity]: ...
async def add_relation(self, relation: Relation) -> None: ...
async def get_relation(self, relation_id: str) -> Optional[Relation]: ...
async def get_neighbors(self, entity_id: str, ...) -> List[Entity]: ...
# Tier 2 methods work automatically with defaults!
# Optionally override for performance optimization
Tier 1 Methods (Required):
initialize() -> None: Initialize storage backendclose() -> None: Close connections and cleanupadd_entity(entity: Entity) -> None: Add entity to graphget_entity(entity_id: str) -> Optional[Entity]: Retrieve entity by IDadd_relation(relation: Relation) -> None: Add relation to graphget_relation(relation_id: str) -> Optional[Relation]: Retrieve relation by IDget_neighbors(entity_id: str, direction: str, relation_types: Optional[List[str]]) -> List[Entity]: Get neighboring entities
Tier 2 Methods (Has Defaults, Can Optimize):
traverse(start_entity_id: str, ...) -> List[Path]: Multi-hop graph traversalfind_paths(start_id: str, end_id: str, ...) -> List[Path]: Find paths between entitiessubgraph_query(center_id: str, radius: int, ...) -> GraphResult: Extract subgraphvector_search(embedding: List[float], top_k: int, ...) -> List[Entity]: Semantic searchexecute_query(query: GraphQuery) -> GraphResult: Execute generic query
InMemoryGraphStore
Fast, networkx-based in-memory storage for development and testing.
Module: aiecs.infrastructure.graph_storage.in_memory
from aiecs.infrastructure.graph_storage.in_memory import InMemoryGraphStore
store = InMemoryGraphStore()
await store.initialize()
# Use all GraphStore methods
await store.add_entity(entity)
neighbors = await store.get_neighbors("entity_1")
paths = await store.traverse("entity_1", max_depth=3)
await store.close()
Use Cases:
Development and testing
Small graphs (< 100K nodes)
Prototyping
Temporary knowledge
Performance: Very fast for small graphs, all operations in-memory
SQLiteGraphStore
File-based persistent storage for single-user applications.
Module: aiecs.infrastructure.graph_storage.sqlite
from aiecs.infrastructure.graph_storage.sqlite import SQLiteGraphStore
store = SQLiteGraphStore(db_path="knowledge.db")
await store.initialize()
# Optimized Tier 2 methods using SQL
paths = await store.traverse("entity_1", max_depth=5) # Uses recursive CTE
await store.close()
Use Cases:
Production applications
Persistent storage
Medium-sized graphs (< 1M nodes)
Single-process applications
Performance: Optimized Tier 2 methods using SQL recursive CTEs
PostgreSQLGraphStore
Production-grade storage with pgvector support for large-scale applications.
Module: aiecs.infrastructure.graph_storage.postgresql
from aiecs.infrastructure.graph_storage.postgresql import PostgreSQLGraphStore
store = PostgreSQLGraphStore(
connection_string="postgresql://user:pass@localhost/db"
)
await store.initialize()
# Optimized for scale
results = await store.vector_search(embedding, top_k=100) # Uses pgvector
paths = await store.traverse("entity_1", max_depth=10) # Optimized CTE
await store.close()
Use Cases:
Production deployments
Large graphs (10M+ nodes)
Multi-user applications
Concurrent access
Performance:
pgvector for fast semantic search
Optimized recursive CTEs for traversal
Connection pooling
Concurrent query support
Runnable Pattern
GraphRunnable
Base class for composable graph operations.
from aiecs.application.knowledge_graph.runnable import GraphRunnable
class MyOperation(GraphRunnable):
async def invoke(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
# Your operation logic
return result
Methods:
invoke(input_data)- Execute the operationpipe(other)- Chain operationsbatch(inputs)- Process multiple inputsstream(input_data)- Stream results
Example:
# Chain operations
pipeline = operation1.pipe(operation2).pipe(operation3)
result = await pipeline.invoke(data)
# Batch processing
results = await operation.batch([data1, data2, data3])
Knowledge Fusion
KnowledgeFusion
Merge duplicate entities across documents.
from aiecs.application.knowledge_graph.fusion import KnowledgeFusion
fusion = KnowledgeFusion(
graph_store: GraphStore,
similarity_threshold: float = 0.85,
conflict_resolution_strategy: str = "most_complete"
)
Parameters:
graph_store- Graph storage backendsimilarity_threshold- Similarity threshold for merging (0.0-1.0)conflict_resolution_strategy- Strategy for resolving conflicts:"most_complete"- Prefer non-empty, longer values (default)"most_recent"- Prefer most recent timestamp"most_confident"- Prefer most confident source"longest"- Prefer longest string value"keep_all"- Keep all conflicting values
Methods:
fuse_cross_document_entities(entity_types=None)
Merge duplicate entities across documents.
Parameters:
entity_types(Optional[List[str]]) - Filter by entity types
Returns:
{
"success": bool,
"entities_analyzed": int,
"entities_merged": int,
"merge_groups": int,
"conflicts_resolved": int
}
Example:
fusion = KnowledgeFusion(store, similarity_threshold=0.85)
stats = await fusion.fuse_cross_document_entities(entity_types=["Person"])
print(f"Merged {stats['entities_merged']} entities")
track_entity_provenance(entity_id)
Get provenance information for an entity.
Returns: List of source document IDs
Result Reranking
ResultReranker
Rerank search results for improved relevance.
from aiecs.application.knowledge_graph.search.reranker import ResultReranker
from aiecs.application.knowledge_graph.search.reranker_strategies import (
TextSimilarityReranker,
SemanticReranker,
StructuralReranker,
HybridReranker
)
reranker = ResultReranker(strategies=[
TextSimilarityReranker(),
SemanticReranker(),
StructuralReranker()
])
Methods:
rerank(query, entities, top_k)
Rerank entities by relevance.
Parameters:
query(str) - Search queryentities(List[Entity]) - Entities to reranktop_k(int) - Number of results to return
Returns: List[Entity] - Reranked entities
Example:
reranker = ResultReranker(strategies=[HybridReranker()])
results = await reranker.rerank(
query="machine learning",
entities=search_results,
top_k=20
)
Reranking Strategies
TextSimilarityReranker
BM25-based text similarity reranking.
strategy = TextSimilarityReranker()
Performance: 50-100ms latency
SemanticReranker
Deep semantic similarity using embeddings.
strategy = SemanticReranker()
Performance: 100-200ms latency
StructuralReranker
Graph structure importance (centrality, PageRank).
strategy = StructuralReranker()
Performance: 80-150ms latency
HybridReranker
Combines all signals for best results.
strategy = HybridReranker(
text_weight=0.4,
semantic_weight=0.4,
structural_weight=0.2
)
Performance: 150-300ms latency
Logical Query Parsing
LogicFormParser
Convert natural language to structured logical queries.
from aiecs.application.knowledge_graph.reasoning.logic_form_parser import LogicFormParser
parser = LogicFormParser()
logical_query = parser.parse("Find all people who work for companies in San Francisco")
Returns:
LogicalQuery(
query_type=QueryType.FIND,
variables=[Variable(name="?person"), Variable(name="?company")],
predicates=[
Predicate(name="WORKS_FOR", arguments=["?person", "?company"]),
Predicate(name="LOCATED_IN", arguments=["?company", "San Francisco"])
],
constraints=[...]
)
Methods:
to_dict()- Convert to dictionary representation
Schema Caching
SchemaCache
High-performance caching for schema operations.
from aiecs.domain.knowledge_graph.schema.schema_cache import SchemaCache
cache = SchemaCache(
ttl_seconds=3600, # 1 hour
max_size=1000
)
Methods:
get(key)- Get cached valueset(key, value)- Set cached valueinvalidate(key)- Invalidate cache entryclear()- Clear all cacheget_metrics()- Get cache metrics
Metrics:
{
"hits": int,
"misses": int,
"total_requests": int,
"hit_rate": float,
"size": int
}
Performance: 3-5x speedup, 70-95% hit rate
Query Optimization
QueryOptimizer
Cost-based query optimization.
from aiecs.application.knowledge_graph.reasoning.query_optimizer import QueryOptimizer
optimizer = QueryOptimizer(strategy="balanced")
optimized_plan = optimizer.optimize(query_plan)
Strategies:
"cost"- Minimize computational cost"latency"- Minimize query latency"balanced"- Balance cost and latency (default)
Performance: 40-70% faster query execution
Structured Data Pipeline
StructuredDataPipeline
Import CSV, JSON, SPSS, and Excel data into knowledge graphs with automatic schema inference, data reshaping, statistical aggregation, and quality validation.
from aiecs.application.knowledge_graph.builder.structured_pipeline import (
StructuredDataPipeline,
ImportResult,
PerformanceMetrics
)
from aiecs.application.knowledge_graph.builder.schema_mapping import (
SchemaMapping,
EntityMapping,
RelationMapping
)
pipeline = StructuredDataPipeline(
mapping=schema_mapping,
graph_store=store,
batch_size=100,
skip_errors=True,
enable_parallel=False, # Enable parallel processing
validation_config=None, # Data quality validation
auto_tune_batch_size=False # Auto-tune batch size
)
Parameters:
mapping(SchemaMapping): Schema mapping configuration (optional if using inference)graph_store(GraphStore): Graph storage backendbatch_size(int): Number of rows per batch (default: 100)skip_errors(bool): Continue on errors (default: True)enable_parallel(bool): Enable parallel batch processing (default: False)max_workers(int): Number of worker processes (default: CPU count - 1)validation_config(ValidationConfig): Data quality validation rulesauto_tune_batch_size(bool): Auto-tune batch size (default: False)streaming(bool): Enable streaming mode for large files (default: False)
Methods:
import_from_csv(file_path, encoding="utf-8", delimiter=",", header=True) -> ImportResult
Import data from CSV file.
Example:
result = await pipeline.import_from_csv("data.csv")
print(f"Added {result.entities_added} entities")
import_from_json(file_path, encoding="utf-8", array_key=None) -> ImportResult
Import data from JSON file.
Example:
result = await pipeline.import_from_json("data.json")
import_from_spss(file_path) -> ImportResult
Import data from SPSS (.sav, .por) file.
Example:
result = await pipeline.import_from_spss("survey_data.sav")
# SPSS metadata (variable labels, value labels) are automatically preserved
import_from_excel(file_path, sheet_name=None) -> ImportResult
Import data from Excel (.xlsx, .xls) file.
Example:
# Import from specific sheet
result = await pipeline.import_from_excel("workbook.xlsx", sheet_name="Sheet1")
# Import from all sheets
result = await pipeline.import_from_excel("workbook.xlsx", sheet_name=None)
import_from_file(file_path, **kwargs) -> ImportResult
Auto-detect file format and import.
Example:
# Automatically detects format from extension
result = await pipeline.import_from_file("data.sav") # SPSS
result = await pipeline.import_from_file("data.xlsx") # Excel
result = await pipeline.import_from_file("data.csv") # CSV
import_from_dataframe(df) -> ImportResult
Import data from pandas DataFrame.
Example:
import pandas as pd
df = pd.read_csv("data.csv")
result = await pipeline.import_from_dataframe(df)
create_with_auto_inference(file_path, graph_store, entity_type_hint=None, **kwargs) -> StructuredDataPipeline
Create pipeline with automatic schema inference.
Example:
pipeline = await StructuredDataPipeline.create_with_auto_inference(
file_path="data.csv",
graph_store=store,
entity_type_hint="Employee"
)
result = await pipeline.import_from_file("data.csv")
create_with_auto_reshape(file_path, graph_store, entity_type_hint=None, reshape_threshold=50, **kwargs) -> StructuredDataPipeline
Create pipeline with automatic wide format detection and reshaping suggestion.
Example:
pipeline = await StructuredDataPipeline.create_with_auto_reshape(
file_path="wide_data.csv",
graph_store=store,
reshape_threshold=50
)
reshape_and_import(file_path, id_vars, value_vars, entity_type, variable_type, relation_type, **kwargs) -> ImportResult
Reshape wide format data and import with normalized structure.
Example:
result = await pipeline.reshape_and_import(
file_path="wide_data.csv",
id_vars=["sample_id"],
value_vars=[f"option_{i}" for i in range(1, 201)],
entity_type="Sample",
variable_type="Option",
relation_type="HAS_VALUE"
)
Returns: ImportResult with:
success(bool): Whether import completed successfullyentities_added(int): Number of entities addedrelations_added(int): Number of relations addedrows_processed(int): Number of rows processedrows_failed(int): Number of rows that failederrors(List[str]): List of errorswarnings(List[str]): List of warningsquality_report(QualityReport): Data quality validation report (if enabled)performance_metrics(PerformanceMetrics): Performance metrics (if enabled)duration_seconds(float): Total duration
SchemaInference
Automatically infer schema mappings from data structure.
Module: aiecs.application.knowledge_graph.builder.schema_inference
from aiecs.application.knowledge_graph.builder.schema_inference import SchemaInference
inference = SchemaInference(sample_size=1000)
Methods:
infer_from_csv(file_path) -> InferredSchema
Infer schema from CSV file.
Example:
inferred = inference.infer_from_csv("data.csv")
mapping = inferred.to_schema_mapping()
infer_from_spss(file_path) -> InferredSchema
Infer schema from SPSS file (uses variable labels and value labels).
Example:
inferred = inference.infer_from_spss("survey_data.sav")
# Uses SPSS variable labels as property names
# Uses SPSS value labels for categorical data
infer_from_excel(file_path, sheet_name=None) -> InferredSchema
Infer schema from Excel file.
Example:
inferred = inference.infer_from_excel("workbook.xlsx", sheet_name="Sheet1")
infer_from_dataframe(df, entity_type_hint=None, metadata=None) -> InferredSchema
Infer schema from pandas DataFrame.
Example:
import pandas as pd
df = pd.read_csv("data.csv")
inferred = inference.infer_from_dataframe(df, entity_type_hint="Employee")
merge_with_user_mapping(inferred, user_mapping) -> SchemaMapping
Merge inferred schema with user-provided mappings.
Example:
user_mapping = SchemaMapping(entity_mappings=[...])
merged = inference.merge_with_user_mapping(inferred, user_mapping)
Returns: InferredSchema with:
entity_mappings(List[EntityMapping]): Inferred entity mappingsrelation_mappings(List[RelationMapping]): Inferred relation mappingsconfidence_scores(Dict[str, float]): Confidence scores (0-1)warnings(List[str]): Warnings about inferenceto_schema_mapping(): Convert to SchemaMapping
DataReshaping
Reshape data between wide and long formats for normalized graph structures.
Module: aiecs.application.knowledge_graph.builder.data_reshaping
from aiecs.application.knowledge_graph.builder.data_reshaping import DataReshaping
reshaping = DataReshaping()
Methods:
melt_wide_to_long(df, id_vars, value_vars, var_name="variable", value_name="value") -> ReshapeResult
Convert wide format to long format.
Example:
result = reshaping.melt_wide_to_long(
df=df_wide,
id_vars=["sample_id"],
value_vars=[f"option_{i}" for i in range(1, 201)],
var_name="option_id",
value_name="value"
)
pivot_long_to_wide(df, index, columns, values) -> ReshapeResult
Convert long format to wide format.
Example:
result = reshaping.pivot_long_to_wide(
df=df_long,
index="sample_id",
columns="option_id",
values="value"
)
detect_wide_format(df, threshold_columns=50) -> bool
Detect if DataFrame is in wide format.
Example:
is_wide = reshaping.detect_wide_format(df, threshold_columns=50)
generate_normalized_mapping(id_column, entity_type, variable_type, relation_type) -> SchemaMapping
Generate schema mapping for normalized structure.
Example:
mapping = reshaping.generate_normalized_mapping(
id_column="sample_id",
entity_type="Sample",
variable_type="Option",
relation_type="HAS_VALUE"
)
Returns: ReshapeResult with:
data(DataFrame): Reshaped DataFrameoriginal_shape(tuple): Original (rows, cols) shapenew_shape(tuple): New (rows, cols) shapeid_columns(List[str]): ID columns usedvariable_column(str): Variable column name (for melt)value_column(str): Value column name (for melt)warnings(List[str]): Warnings
DataQualityValidator
Validate data quality during import.
Module: aiecs.application.knowledge_graph.builder.data_quality
from aiecs.application.knowledge_graph.builder.data_quality import (
DataQualityValidator,
ValidationConfig,
RangeRule,
OutlierRule,
QualityReport
)
ValidationConfig:
validation_config = ValidationConfig(
rules={
"EntityType": {
"property_name": RangeRule(min=0.0, max=1.0)
}
},
outlier_detection={
"EntityType": {
"property_name": OutlierRule(method="zscore", threshold=3.0)
}
},
required_properties={
"EntityType": ["property1", "property2"]
},
fail_on_violations=False
)
Methods:
validate_row(row, row_idx) -> bool
Validate a single row.
Example:
validator = DataQualityValidator(validation_config)
is_valid = validator.validate_row(row, row_idx=0)
validate_dataframe(df) -> QualityReport
Validate entire DataFrame.
Example:
report = validator.validate_dataframe(df)
print(f"Violations: {len(report.range_violations)}")
print(f"Completeness: {report.completeness}")
Returns: QualityReport with:
range_violations(List[QualityViolation]): Range validation violationsoutliers(List[Outlier]): Detected outlierscompleteness(Dict[str, float]): Completeness percentage per propertysummary(Dict[str, Any]): Summary statistics
PerformanceMetrics
Track import performance metrics.
Module: aiecs.application.knowledge_graph.builder.import_optimizer
from aiecs.application.knowledge_graph.builder.import_optimizer import PerformanceMetrics
metrics = result.performance_metrics
Attributes:
total_time(float): Total import time in secondsread_time(float): File reading timetransform_time(float): Data transformation timewrite_time(float): Graph store write timerows_per_second(float): Import throughputpeak_memory_mb(float): Peak memory usage in MBoptimal_batch_size(int): Optimal batch size used
Example:
if result.performance_metrics:
metrics = result.performance_metrics
print(f"Throughput: {metrics.rows_per_second:.0f} rows/sec")
print(f"Peak memory: {metrics.peak_memory_mb:.1f} MB")
Methods:
import_from_csv(file_path)
Import data from CSV file.
Returns:
ImportResult(
success=bool,
entities_added=int,
relations_added=int,
rows_processed=int,
rows_failed=int,
duration_seconds=float,
errors=List[str],
warnings=List[str]
)
Performance: 100-300 rows/second
import_from_json(file_path)
Import data from JSON file.
Performance: 100-250 records/second
Tools
KnowledgeGraphBuilderTool
Build knowledge graphs from various sources.
from aiecs.tools.knowledge_graph import KnowledgeGraphBuilderTool
builder = KnowledgeGraphBuilderTool()
await builder._initialize()
Actions:
build_from_text- Extract from textbuild_from_document- Process documentsbuild_from_structured_data- Import CSV/JSONget_stats- Get graph statistics
See Graph Builder Tool for details.
GraphSearchTool
Search knowledge graphs with reranking.
from aiecs.tools.knowledge_graph import GraphSearchTool
tool = GraphSearchTool()
await tool._initialize()
Modes:
vector- Vector similarity searchgraph- Graph traversal searchhybrid- Combined search
Reranking:
enable_reranking- Enable/disable rerankingrerank_strategy- text, semantic, structural, hybrid
See Graph Search Tool for details.
GraphReasoningTool
Logical reasoning and query parsing.
from aiecs.tools.knowledge_graph import GraphReasoningTool
tool = GraphReasoningTool()
await tool._initialize()
Modes:
query_plan- Plan query executionmulti_hop- Multi-hop reasoninginference- Logical inferencelogical_query- Parse to logical formfull_reasoning- Complete pipeline
See Graph Reasoning Tool for details.