StructuredDataPipeline Usage Guide
This guide explains how to use the StructuredDataPipeline to import structured data (CSV, JSON, SPSS, Excel) into knowledge graphs.
Table of Contents
Quick Start
from aiecs.application.knowledge_graph.builder.schema_mapping import (
SchemaMapping,
EntityMapping
)
from aiecs.application.knowledge_graph.builder.structured_pipeline import StructuredDataPipeline
from aiecs.infrastructure.graph_storage.in_memory import InMemoryGraphStore
# 1. Create schema mapping
mapping = SchemaMapping(
entity_mappings=[
EntityMapping(
source_columns=["id", "name"],
entity_type="Person",
property_mapping={"id": "id", "name": "name"},
id_column="id"
)
]
)
# 2. Initialize graph store
store = InMemoryGraphStore()
await store.initialize()
# 3. Create pipeline
pipeline = StructuredDataPipeline(mapping=mapping, graph_store=store)
# 4. Import CSV
result = await pipeline.import_from_csv("data.csv")
print(f"Added {result.entities_added} entities, {result.relations_added} relations")
Basic Usage
Step 1: Define Schema Mapping
First, define how your data maps to the knowledge graph:
from aiecs.application.knowledge_graph.builder.schema_mapping import (
SchemaMapping,
EntityMapping,
RelationMapping
)
mapping = SchemaMapping(
entity_mappings=[
EntityMapping(
source_columns=["id", "name", "email"],
entity_type="Person",
property_mapping={"id": "id", "name": "name", "email": "email"},
id_column="id"
)
],
relation_mappings=[
RelationMapping(
source_columns=["person_id", "company_id"],
relation_type="WORKS_FOR",
source_entity_column="person_id",
target_entity_column="company_id"
)
]
)
Step 2: Initialize Graph Store
from aiecs.infrastructure.graph_storage.in_memory import InMemoryGraphStore
store = InMemoryGraphStore()
await store.initialize()
Step 3: Create Pipeline
from aiecs.application.knowledge_graph.builder.structured_pipeline import StructuredDataPipeline
pipeline = StructuredDataPipeline(
mapping=mapping,
graph_store=store,
batch_size=100, # Optional: process in batches
skip_errors=True # Optional: continue on errors
)
Step 4: Import Data
# Import CSV
result = await pipeline.import_from_csv("data.csv")
# Or import JSON
result = await pipeline.import_from_json("data.json")
Step 5: Check Results
print(f"Entities added: {result.entities_added}")
print(f"Relations added: {result.relations_added}")
print(f"Rows processed: {result.rows_processed}")
print(f"Errors: {len(result.errors)}")
print(f"Warnings: {len(result.warnings)}")
# Check for errors
if result.errors:
for error in result.errors:
print(f"Error: {error}")
CSV Import
Basic CSV Import
result = await pipeline.import_from_csv(
file_path="employees.csv",
delimiter=",", # Optional: default is ","
encoding="utf-8" # Optional: default is "utf-8"
)
CSV with Custom Delimiter
# Tab-separated values
result = await pipeline.import_from_csv(
file_path="data.tsv",
delimiter="\t"
)
# Semicolon-separated values
result = await pipeline.import_from_csv(
file_path="data.csv",
delimiter=";"
)
CSV with Headers
The pipeline automatically detects headers. If your CSV doesn’t have headers, specify column names:
# CSV without headers
result = await pipeline.import_from_csv(
file_path="data_no_headers.csv",
has_headers=False,
column_names=["id", "name", "email"] # Specify column names
)
CSV with Different Encoding
# Handle non-UTF-8 files
result = await pipeline.import_from_csv(
file_path="data_latin1.csv",
encoding="latin-1"
)
SPSS Import
Basic SPSS Import
Import data directly from SPSS (.sav) files:
result = await pipeline.import_from_spss("survey_data.sav")
print(f"Added {result.entities_added} entities")
SPSS Import with Schema Mapping
mapping = SchemaMapping(
entity_mappings=[
EntityMapping(
source_columns=["respondent_id", "age", "gender"],
entity_type="Respondent",
property_mapping={
"respondent_id": "id",
"age": "age",
"gender": "gender"
},
id_column="respondent_id"
)
]
)
pipeline = StructuredDataPipeline(mapping=mapping, graph_store=store)
result = await pipeline.import_from_spss("survey_data.sav")
SPSS Metadata Preservation
SPSS files contain variable labels and value labels that are automatically preserved:
result = await pipeline.import_from_spss("survey_data.sav")
# Variable labels are stored in entity properties as "_spss_variable_labels"
# Value labels are stored as "_spss_value_labels"
entity = await store.get_entity("respondent_001")
print(entity.properties.get("_spss_variable_labels", {}))
Auto-detect File Format
Use import_from_file() to automatically detect and import any supported format:
# Automatically detects format from extension
result = await pipeline.import_from_file("data.sav") # SPSS
result = await pipeline.import_from_file("data.xlsx") # Excel
result = await pipeline.import_from_file("data.csv") # CSV
result = await pipeline.import_from_file("data.json") # JSON
Excel Import
Basic Excel Import
Import data from Excel files:
result = await pipeline.import_from_excel("employees.xlsx")
print(f"Added {result.entities_added} entities")
Excel Import with Specific Sheet
# Import from specific sheet
result = await pipeline.import_from_excel(
"workbook.xlsx",
sheet_name="Sheet1"
)
Excel Import with Multiple Sheets
# Import from all sheets
result = await pipeline.import_from_excel(
"workbook.xlsx",
sheet_name=None # None = all sheets
)
# Or import sheets sequentially
for sheet in ["Sheet1", "Sheet2", "Sheet3"]:
result = await pipeline.import_from_excel(
"workbook.xlsx",
sheet_name=sheet
)
print(f"{sheet}: {result.entities_added} entities")
Excel Data Types
Excel data types (dates, numbers, text) are automatically handled:
# Dates are converted to ISO format strings
# Numbers are preserved as numeric types
# Text is preserved as strings
result = await pipeline.import_from_excel("data.xlsx")
Automatic Schema Inference
Infer Schema from Data
Automatically generate schema mappings from data structure:
from aiecs.application.knowledge_graph.builder.schema_inference import SchemaInference
# Infer schema from CSV
inference = SchemaInference()
inferred = inference.infer_from_csv("data.csv")
# Review inferred schema
print(f"Inferred {len(inferred.entity_mappings)} entity types")
print(f"Inferred {len(inferred.relation_mappings)} relation types")
print(f"Confidence scores: {inferred.confidence_scores}")
# Use inferred schema
mapping = inferred.to_schema_mapping()
pipeline = StructuredDataPipeline(mapping=mapping, graph_store=store)
result = await pipeline.import_from_csv("data.csv")
Infer Schema from SPSS
SPSS files provide rich metadata for better inference:
inference = SchemaInference()
inferred = inference.infer_from_spss("survey_data.sav")
# SPSS variable labels are used as property names
# SPSS value labels are preserved for categorical data
mapping = inferred.to_schema_mapping()
Partial Schema Inference
Provide some mappings and let the system infer the rest:
# User provides partial mapping
user_mapping = SchemaMapping(
entity_mappings=[
EntityMapping(
source_columns=["id", "name"],
entity_type="Person",
property_mapping={"id": "id", "name": "name"},
id_column="id"
)
]
)
# Infer remaining mappings
inference = SchemaInference()
inferred = inference.infer_from_csv("data.csv")
# Merge user-provided and inferred mappings
merged = inference.merge_with_user_mapping(inferred, user_mapping)
Import with Auto-Inference
Use create_with_auto_inference() for one-step import:
pipeline = await StructuredDataPipeline.create_with_auto_inference(
file_path="data.csv",
graph_store=store,
entity_type_hint="Employee" # Optional hint
)
result = await pipeline.import_from_file("data.csv")
Data Reshaping
Wide-to-Long Conversion for Normalized Structures
Convert wide format data (many columns) to normalized graph structure:
from aiecs.application.knowledge_graph.builder.data_reshaping import DataReshaping
# Wide format: 1000 rows × 200 columns
# Convert to normalized: Sample entities + Option entities + HAS_VALUE relations
reshaping = DataReshaping()
reshape_result = reshaping.melt_wide_to_long(
df=df_wide,
id_vars=["sample_id"],
value_vars=[f"option_{i:03d}" for i in range(1, 201)],
var_name="option_id",
value_name="value"
)
# Generate normalized schema mapping
mapping = reshaping.generate_normalized_mapping(
id_column="sample_id",
entity_type="Sample",
variable_type="Option",
relation_type="HAS_VALUE"
)
# Import normalized data
pipeline = StructuredDataPipeline(mapping=mapping, graph_store=store)
result = await pipeline.import_from_dataframe(reshape_result.data)
Automatic Reshaping During Import
Detect and reshape wide format data automatically:
# Pipeline automatically detects wide format and suggests normalization
pipeline = await StructuredDataPipeline.create_with_auto_reshape(
file_path="wide_data.csv",
graph_store=store,
reshape_threshold=50 # Threshold for wide format detection
)
result = await pipeline.import_from_file("wide_data.csv")
Reshape and Import in One Step
# Reshape wide format and import with normalized structure
result = await pipeline.reshape_and_import(
file_path="wide_data.csv",
id_vars=["sample_id"],
value_vars=option_columns,
entity_type="Sample",
variable_type="Option",
relation_type="HAS_VALUE"
)
Statistical Aggregation
Compute Statistics During Import
Compute mean, standard deviation, and other statistics during import:
from aiecs.application.knowledge_graph.builder.schema_mapping import AggregationConfig
mapping = SchemaMapping(
entity_mappings=[
EntityMapping(
source_columns=["sample_id"] + [f"option_{i}" for i in range(1, 201)],
entity_type="Sample",
property_mapping={"sample_id": "id"},
id_column="sample_id"
)
],
aggregations={
"Sample": {
"option_values": {
"mean": "avg_value",
"std": "std_value",
"min": "min_value",
"max": "max_value"
}
}
}
)
pipeline = StructuredDataPipeline(mapping=mapping, graph_store=store)
result = await pipeline.import_from_csv("data.csv")
# Aggregated values are stored as entity properties
sample = await store.get_entity("sample_001")
print(f"Average: {sample.properties['avg_value']}")
print(f"Std Dev: {sample.properties['std_value']}")
Grouped Aggregation
Compute statistics per group:
mapping = SchemaMapping(
aggregations={
"Employee": {
"salary": {
"mean": "avg_salary",
"std": "std_salary"
}
}
},
group_by="department" # Group by department
)
# Aggregated values are stored on group entities
Data Quality Validation
Range Validation
Validate numeric values are within specified ranges:
from aiecs.application.knowledge_graph.builder.data_quality import (
DataQualityValidator,
ValidationConfig,
RangeRule
)
validation_config = ValidationConfig(
rules={
"Sample": {
"option_1": RangeRule(min=0.0, max=1.0),
"option_2": RangeRule(min=-10.0, max=10.0)
}
},
fail_on_violations=False # Continue import, log violations
)
pipeline = StructuredDataPipeline(
mapping=mapping,
graph_store=store,
validation_config=validation_config
)
result = await pipeline.import_from_csv("data.csv")
# Check quality report
if result.quality_report:
print(f"Range violations: {result.quality_report.range_violations}")
print(f"Completeness: {result.quality_report.completeness}")
Outlier Detection
Detect and flag outliers:
validation_config = ValidationConfig(
outlier_detection={
"Sample": {
"option_1": {"method": "zscore", "threshold": 3.0}
}
},
exclude_outliers=False # Flag but don't exclude
)
pipeline = StructuredDataPipeline(
mapping=mapping,
graph_store=store,
validation_config=validation_config
)
result = await pipeline.import_from_csv("data.csv")
# Check outliers
if result.quality_report:
print(f"Outliers detected: {len(result.quality_report.outliers)}")
Completeness Checks
Check for missing values:
validation_config = ValidationConfig(
required_properties={
"Sample": ["sample_id", "option_1", "option_2"]
}
)
result = await pipeline.import_from_csv("data.csv")
# Check completeness
if result.quality_report:
completeness = result.quality_report.completeness
for prop, pct in completeness.items():
print(f"{prop}: {pct:.1f}% complete")
JSON Import
JSON Array Import
Import from a JSON array:
[
{"id": "1", "name": "Alice", "email": "alice@example.com"},
{"id": "2", "name": "Bob", "email": "bob@example.com"}
]
result = await pipeline.import_from_json("data.json")
JSON Lines (NDJSON) Import
Import from newline-delimited JSON:
{"id": "1", "name": "Alice"}
{"id": "2", "name": "Bob"}
result = await pipeline.import_from_json(
file_path="data.jsonl",
json_format="jsonl" # Specify format
)
JSON Object with Array Property
Import from a JSON object containing an array:
{
"employees": [
{"id": "1", "name": "Alice"},
{"id": "2", "name": "Bob"}
]
}
result = await pipeline.import_from_json(
file_path="data.json",
json_format="object_array",
array_key="employees" # Key containing the array
)
JSON from String
Import directly from a JSON string:
json_string = '[{"id": "1", "name": "Alice"}]'
result = await pipeline.import_from_json(
file_path=None,
json_data=json_string
)
Batch Processing
For large files, process in batches to manage memory:
pipeline = StructuredDataPipeline(
mapping=mapping,
graph_store=store,
batch_size=1000 # Process 1000 rows at a time
)
result = await pipeline.import_from_csv("large_file.csv")
Benefits:
Lower memory usage
Progress tracking
Better error recovery
Error Handling
Skip Errors (Default)
By default, the pipeline skips rows with errors and continues:
pipeline = StructuredDataPipeline(
mapping=mapping,
graph_store=store,
skip_errors=True # Default
)
result = await pipeline.import_from_csv("data.csv")
# Check errors
if result.errors:
print(f"Skipped {len(result.errors)} rows with errors")
for error in result.errors[:5]: # Show first 5
print(f" - {error}")
Fail on First Error
Stop processing on first error:
pipeline = StructuredDataPipeline(
mapping=mapping,
graph_store=store,
skip_errors=False # Stop on first error
)
try:
result = await pipeline.import_from_csv("data.csv")
except Exception as e:
print(f"Import failed: {e}")
Validate Before Import
Validate mapping and data before importing:
# Validate mapping
mapping_errors = mapping.validate()
if mapping_errors:
print(f"Mapping errors: {mapping_errors}")
return
# Validate CSV columns
import pandas as pd
df = pd.read_csv("data.csv", nrows=1) # Read first row only
required_columns = set()
for entity_mapping in mapping.entity_mappings:
required_columns.update(entity_mapping.source_columns)
missing = required_columns - set(df.columns)
if missing:
print(f"Missing columns: {missing}")
return
# Safe to import
result = await pipeline.import_from_csv("data.csv")
Advanced Features
Progress Callback
Track import progress:
async def progress_callback(current: int, total: int):
percentage = (current / total) * 100
print(f"Progress: {current}/{total} ({percentage:.1f}%)")
pipeline = StructuredDataPipeline(
mapping=mapping,
graph_store=store,
progress_callback=progress_callback
)
result = await pipeline.import_from_csv("large_file.csv")
Custom Metadata
Add metadata to imported entities and relations:
# Metadata is automatically added:
# {
# "source": "structured_data_import",
# "imported_at": "2024-01-15T10:30:00"
# }
# You can add custom metadata by modifying the pipeline
# (requires extending StructuredDataPipeline)
Multiple Imports
Import multiple files:
files = ["employees.csv", "departments.csv", "projects.csv"]
for file_path in files:
result = await pipeline.import_from_csv(file_path)
print(f"{file_path}: {result.entities_added} entities, {result.relations_added} relations")
Incremental Import
Add new data without duplicates:
# First import
result1 = await pipeline.import_from_csv("initial_data.csv")
print(f"Initial: {result1.entities_added} entities")
# Later, import updates (duplicates are handled by GraphStore)
result2 = await pipeline.import_from_csv("updates.csv")
print(f"Updates: {result2.entities_added} entities")
Performance Optimization
Parallel Processing
Enable parallel batch processing for faster imports:
pipeline = StructuredDataPipeline(
mapping=mapping,
graph_store=store,
enable_parallel=True, # Enable parallel processing
max_workers=4 # Number of worker processes (default: CPU count - 1)
)
result = await pipeline.import_from_csv("large_file.csv")
print(f"Throughput: {result.performance_metrics.rows_per_second:.0f} rows/sec")
When to use:
Large datasets (>10,000 rows)
Multi-core systems
CPU-bound transformations
Performance: 2-3x speedup on multi-core systems
Bulk Write Operations
Use bulk writes for faster storage:
# Bulk writes are automatically used when available
# No code changes needed - works transparently
pipeline = StructuredDataPipeline(
mapping=mapping,
graph_store=store,
batch_size=1000 # Larger batches = better bulk write performance
)
result = await pipeline.import_from_csv("large_file.csv")
Performance: 80%+ overhead reduction for bulk operations
Streaming Import for Large Files
Import files larger than available memory:
pipeline = StructuredDataPipeline(
mapping=mapping,
graph_store=store,
streaming=True # Enable streaming mode
)
# Works with files >1GB without loading entire file into memory
result = await pipeline.import_from_csv("very_large_file.csv")
Batch Size Auto-Tuning
Let the system automatically optimize batch size:
pipeline = StructuredDataPipeline(
mapping=mapping,
graph_store=store,
auto_tune_batch_size=True # Auto-tune based on memory and schema
)
result = await pipeline.import_from_csv("data.csv")
print(f"Used batch size: {result.performance_metrics.optimal_batch_size}")
Performance Metrics
Track import performance:
result = await pipeline.import_from_csv("data.csv")
if result.performance_metrics:
metrics = result.performance_metrics
print(f"Total time: {metrics.total_time:.2f}s")
print(f"Read time: {metrics.read_time:.2f}s")
print(f"Transform time: {metrics.transform_time:.2f}s")
print(f"Write time: {metrics.write_time:.2f}s")
print(f"Throughput: {metrics.rows_per_second:.0f} rows/sec")
print(f"Peak memory: {metrics.peak_memory_mb:.1f} MB")
Performance Tips
1. Use Batch Processing
# ✅ Good: Process in batches
pipeline = StructuredDataPipeline(
mapping=mapping,
graph_store=store,
batch_size=1000
)
# ❌ Avoid: Process entire file at once (for large files)
pipeline = StructuredDataPipeline(
mapping=mapping,
graph_store=store,
batch_size=None # Processes entire file
)
2. Optimize Schema Mapping
# ✅ Good: Only include needed columns
EntityMapping(
source_columns=["id", "name"], # Only needed columns
entity_type="Person",
...
)
# ❌ Avoid: Include unnecessary columns
EntityMapping(
source_columns=["id", "name", "unused1", "unused2"], # Extra columns
entity_type="Person",
...
)
3. Use Appropriate Storage Backend
# ✅ For development/testing: InMemoryGraphStore
store = InMemoryGraphStore()
# ✅ For production: SQLiteGraphStore or PostgreSQLGraphStore
from aiecs.infrastructure.graph_storage.sqlite import SQLiteGraphStore
store = SQLiteGraphStore(db_path="kg.db")
await store.initialize()
4. Pre-process Large Files
For very large files, consider preprocessing:
# Split large CSV into smaller files
import pandas as pd
df = pd.read_csv("large_file.csv")
chunk_size = 10000
for i, chunk in enumerate(pd.read_csv("large_file.csv", chunksize=chunk_size)):
chunk_file = f"chunk_{i}.csv"
chunk.to_csv(chunk_file, index=False)
result = await pipeline.import_from_csv(chunk_file)
print(f"Chunk {i}: {result.entities_added} entities")
5. Validate Before Import
Validate data structure before importing:
# Quick validation
import pandas as pd
df = pd.read_csv("data.csv", nrows=10) # Sample first 10 rows
print(f"Columns: {df.columns.tolist()}")
print(f"Sample data:\n{df.head()}")
# Check for required columns
required = ["id", "name"]
missing = set(required) - set(df.columns)
if missing:
raise ValueError(f"Missing columns: {missing}")
# Now import full file
result = await pipeline.import_from_csv("data.csv")
Complete Example
import asyncio
from aiecs.application.knowledge_graph.builder.schema_mapping import (
SchemaMapping,
EntityMapping,
RelationMapping,
PropertyTransformation,
TransformationType
)
from aiecs.application.knowledge_graph.builder.structured_pipeline import StructuredDataPipeline
from aiecs.infrastructure.graph_storage.in_memory import InMemoryGraphStore
from aiecs.domain.knowledge_graph.schema.property_schema import PropertyType
async def main():
# 1. Define mapping
mapping = SchemaMapping(
entity_mappings=[
EntityMapping(
source_columns=["emp_id", "name", "email", "salary"],
entity_type="Employee",
property_mapping={"emp_id": "id", "name": "name", "email": "email"},
transformations=[
PropertyTransformation(
transformation_type=TransformationType.TYPE_CAST,
source_column="salary",
target_property="salary",
target_type=PropertyType.INTEGER
)
],
id_column="emp_id"
),
EntityMapping(
source_columns=["dept_id", "dept_name"],
entity_type="Department",
property_mapping={"dept_id": "id", "dept_name": "name"},
id_column="dept_id"
)
],
relation_mappings=[
RelationMapping(
source_columns=["emp_id", "dept_id"],
relation_type="WORKS_IN",
source_entity_column="emp_id",
target_entity_column="dept_id"
)
]
)
# 2. Initialize store
store = InMemoryGraphStore()
await store.initialize()
# 3. Create pipeline
pipeline = StructuredDataPipeline(
mapping=mapping,
graph_store=store,
batch_size=100
)
# 4. Import CSV
result = await pipeline.import_from_csv("employees.csv")
# 5. Check results
print(f"✅ Import complete!")
print(f" Entities added: {result.entities_added}")
print(f" Relations added: {result.relations_added}")
print(f" Rows processed: {result.rows_processed}")
if result.errors:
print(f"⚠️ Errors: {len(result.errors)}")
for error in result.errors[:5]:
print(f" - {error}")
if result.warnings:
print(f"⚠️ Warnings: {len(result.warnings)}")
for warning in result.warnings[:5]:
print(f" - {warning}")
# 6. Query graph
employees = await store.get_entities_by_type("Employee")
print(f"\n📊 Found {len(employees)} employees in graph")
await store.close()
if __name__ == "__main__":
asyncio.run(main())
Next Steps
See Schema Mapping Guide for detailed mapping configuration
See CSV-to-Graph Tutorial for complete CSV example
See JSON-to-Graph Tutorial for complete JSON example
See Example Scripts for complete working examples:
18_spss_import_with_inference.py- SPSS import with automatic schema inference19_wide_format_normalization.py- Reshape wide format to normalized structure20_statistical_aggregation.py- Statistical aggregation during import21_data_quality_validation.py- Data quality validation