StructuredDataPipeline Usage Guide

This guide explains how to use the StructuredDataPipeline to import structured data (CSV, JSON, SPSS, Excel) into knowledge graphs.

Table of Contents

  1. Quick Start

  2. Basic Usage

  3. CSV Import

  4. JSON Import

  5. SPSS Import

  6. Excel Import

  7. Automatic Schema Inference

  8. Data Reshaping

  9. Statistical Aggregation

  10. Data Quality Validation

  11. Batch Processing

  12. Error Handling

  13. Advanced Features

  14. Performance Tips

Quick Start

from aiecs.application.knowledge_graph.builder.schema_mapping import (
    SchemaMapping,
    EntityMapping
)
from aiecs.application.knowledge_graph.builder.structured_pipeline import StructuredDataPipeline
from aiecs.infrastructure.graph_storage.in_memory import InMemoryGraphStore

# 1. Create schema mapping
mapping = SchemaMapping(
    entity_mappings=[
        EntityMapping(
            source_columns=["id", "name"],
            entity_type="Person",
            property_mapping={"id": "id", "name": "name"},
            id_column="id"
        )
    ]
)

# 2. Initialize graph store
store = InMemoryGraphStore()
await store.initialize()

# 3. Create pipeline
pipeline = StructuredDataPipeline(mapping=mapping, graph_store=store)

# 4. Import CSV
result = await pipeline.import_from_csv("data.csv")
print(f"Added {result.entities_added} entities, {result.relations_added} relations")

Basic Usage

Step 1: Define Schema Mapping

First, define how your data maps to the knowledge graph:

from aiecs.application.knowledge_graph.builder.schema_mapping import (
    SchemaMapping,
    EntityMapping,
    RelationMapping
)

mapping = SchemaMapping(
    entity_mappings=[
        EntityMapping(
            source_columns=["id", "name", "email"],
            entity_type="Person",
            property_mapping={"id": "id", "name": "name", "email": "email"},
            id_column="id"
        )
    ],
    relation_mappings=[
        RelationMapping(
            source_columns=["person_id", "company_id"],
            relation_type="WORKS_FOR",
            source_entity_column="person_id",
            target_entity_column="company_id"
        )
    ]
)

Step 2: Initialize Graph Store

from aiecs.infrastructure.graph_storage.in_memory import InMemoryGraphStore

store = InMemoryGraphStore()
await store.initialize()

Step 3: Create Pipeline

from aiecs.application.knowledge_graph.builder.structured_pipeline import StructuredDataPipeline

pipeline = StructuredDataPipeline(
    mapping=mapping,
    graph_store=store,
    batch_size=100,  # Optional: process in batches
    skip_errors=True  # Optional: continue on errors
)

Step 4: Import Data

# Import CSV
result = await pipeline.import_from_csv("data.csv")

# Or import JSON
result = await pipeline.import_from_json("data.json")

Step 5: Check Results

print(f"Entities added: {result.entities_added}")
print(f"Relations added: {result.relations_added}")
print(f"Rows processed: {result.rows_processed}")
print(f"Errors: {len(result.errors)}")
print(f"Warnings: {len(result.warnings)}")

# Check for errors
if result.errors:
    for error in result.errors:
        print(f"Error: {error}")

CSV Import

Basic CSV Import

result = await pipeline.import_from_csv(
    file_path="employees.csv",
    delimiter=",",  # Optional: default is ","
    encoding="utf-8"  # Optional: default is "utf-8"
)

CSV with Custom Delimiter

# Tab-separated values
result = await pipeline.import_from_csv(
    file_path="data.tsv",
    delimiter="\t"
)

# Semicolon-separated values
result = await pipeline.import_from_csv(
    file_path="data.csv",
    delimiter=";"
)

CSV with Headers

The pipeline automatically detects headers. If your CSV doesn’t have headers, specify column names:

# CSV without headers
result = await pipeline.import_from_csv(
    file_path="data_no_headers.csv",
    has_headers=False,
    column_names=["id", "name", "email"]  # Specify column names
)

CSV with Different Encoding

# Handle non-UTF-8 files
result = await pipeline.import_from_csv(
    file_path="data_latin1.csv",
    encoding="latin-1"
)

SPSS Import

Basic SPSS Import

Import data directly from SPSS (.sav) files:

result = await pipeline.import_from_spss("survey_data.sav")
print(f"Added {result.entities_added} entities")

SPSS Import with Schema Mapping

mapping = SchemaMapping(
    entity_mappings=[
        EntityMapping(
            source_columns=["respondent_id", "age", "gender"],
            entity_type="Respondent",
            property_mapping={
                "respondent_id": "id",
                "age": "age",
                "gender": "gender"
            },
            id_column="respondent_id"
        )
    ]
)

pipeline = StructuredDataPipeline(mapping=mapping, graph_store=store)
result = await pipeline.import_from_spss("survey_data.sav")

SPSS Metadata Preservation

SPSS files contain variable labels and value labels that are automatically preserved:

result = await pipeline.import_from_spss("survey_data.sav")

# Variable labels are stored in entity properties as "_spss_variable_labels"
# Value labels are stored as "_spss_value_labels"
entity = await store.get_entity("respondent_001")
print(entity.properties.get("_spss_variable_labels", {}))

Auto-detect File Format

Use import_from_file() to automatically detect and import any supported format:

# Automatically detects format from extension
result = await pipeline.import_from_file("data.sav")  # SPSS
result = await pipeline.import_from_file("data.xlsx")  # Excel
result = await pipeline.import_from_file("data.csv")  # CSV
result = await pipeline.import_from_file("data.json")  # JSON

Excel Import

Basic Excel Import

Import data from Excel files:

result = await pipeline.import_from_excel("employees.xlsx")
print(f"Added {result.entities_added} entities")

Excel Import with Specific Sheet

# Import from specific sheet
result = await pipeline.import_from_excel(
    "workbook.xlsx",
    sheet_name="Sheet1"
)

Excel Import with Multiple Sheets

# Import from all sheets
result = await pipeline.import_from_excel(
    "workbook.xlsx",
    sheet_name=None  # None = all sheets
)

# Or import sheets sequentially
for sheet in ["Sheet1", "Sheet2", "Sheet3"]:
    result = await pipeline.import_from_excel(
        "workbook.xlsx",
        sheet_name=sheet
    )
    print(f"{sheet}: {result.entities_added} entities")

Excel Data Types

Excel data types (dates, numbers, text) are automatically handled:

# Dates are converted to ISO format strings
# Numbers are preserved as numeric types
# Text is preserved as strings
result = await pipeline.import_from_excel("data.xlsx")

Automatic Schema Inference

Infer Schema from Data

Automatically generate schema mappings from data structure:

from aiecs.application.knowledge_graph.builder.schema_inference import SchemaInference

# Infer schema from CSV
inference = SchemaInference()
inferred = inference.infer_from_csv("data.csv")

# Review inferred schema
print(f"Inferred {len(inferred.entity_mappings)} entity types")
print(f"Inferred {len(inferred.relation_mappings)} relation types")
print(f"Confidence scores: {inferred.confidence_scores}")

# Use inferred schema
mapping = inferred.to_schema_mapping()
pipeline = StructuredDataPipeline(mapping=mapping, graph_store=store)
result = await pipeline.import_from_csv("data.csv")

Infer Schema from SPSS

SPSS files provide rich metadata for better inference:

inference = SchemaInference()
inferred = inference.infer_from_spss("survey_data.sav")

# SPSS variable labels are used as property names
# SPSS value labels are preserved for categorical data
mapping = inferred.to_schema_mapping()

Partial Schema Inference

Provide some mappings and let the system infer the rest:

# User provides partial mapping
user_mapping = SchemaMapping(
    entity_mappings=[
        EntityMapping(
            source_columns=["id", "name"],
            entity_type="Person",
            property_mapping={"id": "id", "name": "name"},
            id_column="id"
        )
    ]
)

# Infer remaining mappings
inference = SchemaInference()
inferred = inference.infer_from_csv("data.csv")

# Merge user-provided and inferred mappings
merged = inference.merge_with_user_mapping(inferred, user_mapping)

Import with Auto-Inference

Use create_with_auto_inference() for one-step import:

pipeline = await StructuredDataPipeline.create_with_auto_inference(
    file_path="data.csv",
    graph_store=store,
    entity_type_hint="Employee"  # Optional hint
)

result = await pipeline.import_from_file("data.csv")

Data Reshaping

Wide-to-Long Conversion for Normalized Structures

Convert wide format data (many columns) to normalized graph structure:

from aiecs.application.knowledge_graph.builder.data_reshaping import DataReshaping

# Wide format: 1000 rows × 200 columns
# Convert to normalized: Sample entities + Option entities + HAS_VALUE relations

reshaping = DataReshaping()
reshape_result = reshaping.melt_wide_to_long(
    df=df_wide,
    id_vars=["sample_id"],
    value_vars=[f"option_{i:03d}" for i in range(1, 201)],
    var_name="option_id",
    value_name="value"
)

# Generate normalized schema mapping
mapping = reshaping.generate_normalized_mapping(
    id_column="sample_id",
    entity_type="Sample",
    variable_type="Option",
    relation_type="HAS_VALUE"
)

# Import normalized data
pipeline = StructuredDataPipeline(mapping=mapping, graph_store=store)
result = await pipeline.import_from_dataframe(reshape_result.data)

Automatic Reshaping During Import

Detect and reshape wide format data automatically:

# Pipeline automatically detects wide format and suggests normalization
pipeline = await StructuredDataPipeline.create_with_auto_reshape(
    file_path="wide_data.csv",
    graph_store=store,
    reshape_threshold=50  # Threshold for wide format detection
)

result = await pipeline.import_from_file("wide_data.csv")

Reshape and Import in One Step

# Reshape wide format and import with normalized structure
result = await pipeline.reshape_and_import(
    file_path="wide_data.csv",
    id_vars=["sample_id"],
    value_vars=option_columns,
    entity_type="Sample",
    variable_type="Option",
    relation_type="HAS_VALUE"
)

Statistical Aggregation

Compute Statistics During Import

Compute mean, standard deviation, and other statistics during import:

from aiecs.application.knowledge_graph.builder.schema_mapping import AggregationConfig

mapping = SchemaMapping(
    entity_mappings=[
        EntityMapping(
            source_columns=["sample_id"] + [f"option_{i}" for i in range(1, 201)],
            entity_type="Sample",
            property_mapping={"sample_id": "id"},
            id_column="sample_id"
        )
    ],
    aggregations={
        "Sample": {
            "option_values": {
                "mean": "avg_value",
                "std": "std_value",
                "min": "min_value",
                "max": "max_value"
            }
        }
    }
)

pipeline = StructuredDataPipeline(mapping=mapping, graph_store=store)
result = await pipeline.import_from_csv("data.csv")

# Aggregated values are stored as entity properties
sample = await store.get_entity("sample_001")
print(f"Average: {sample.properties['avg_value']}")
print(f"Std Dev: {sample.properties['std_value']}")

Grouped Aggregation

Compute statistics per group:

mapping = SchemaMapping(
    aggregations={
        "Employee": {
            "salary": {
                "mean": "avg_salary",
                "std": "std_salary"
            }
        }
    },
    group_by="department"  # Group by department
)

# Aggregated values are stored on group entities

Data Quality Validation

Range Validation

Validate numeric values are within specified ranges:

from aiecs.application.knowledge_graph.builder.data_quality import (
    DataQualityValidator,
    ValidationConfig,
    RangeRule
)

validation_config = ValidationConfig(
    rules={
        "Sample": {
            "option_1": RangeRule(min=0.0, max=1.0),
            "option_2": RangeRule(min=-10.0, max=10.0)
        }
    },
    fail_on_violations=False  # Continue import, log violations
)

pipeline = StructuredDataPipeline(
    mapping=mapping,
    graph_store=store,
    validation_config=validation_config
)

result = await pipeline.import_from_csv("data.csv")

# Check quality report
if result.quality_report:
    print(f"Range violations: {result.quality_report.range_violations}")
    print(f"Completeness: {result.quality_report.completeness}")

Outlier Detection

Detect and flag outliers:

validation_config = ValidationConfig(
    outlier_detection={
        "Sample": {
            "option_1": {"method": "zscore", "threshold": 3.0}
        }
    },
    exclude_outliers=False  # Flag but don't exclude
)

pipeline = StructuredDataPipeline(
    mapping=mapping,
    graph_store=store,
    validation_config=validation_config
)

result = await pipeline.import_from_csv("data.csv")

# Check outliers
if result.quality_report:
    print(f"Outliers detected: {len(result.quality_report.outliers)}")

Completeness Checks

Check for missing values:

validation_config = ValidationConfig(
    required_properties={
        "Sample": ["sample_id", "option_1", "option_2"]
    }
)

result = await pipeline.import_from_csv("data.csv")

# Check completeness
if result.quality_report:
    completeness = result.quality_report.completeness
    for prop, pct in completeness.items():
        print(f"{prop}: {pct:.1f}% complete")

JSON Import

JSON Array Import

Import from a JSON array:

[
  {"id": "1", "name": "Alice", "email": "alice@example.com"},
  {"id": "2", "name": "Bob", "email": "bob@example.com"}
]
result = await pipeline.import_from_json("data.json")

JSON Lines (NDJSON) Import

Import from newline-delimited JSON:

{"id": "1", "name": "Alice"}
{"id": "2", "name": "Bob"}
result = await pipeline.import_from_json(
    file_path="data.jsonl",
    json_format="jsonl"  # Specify format
)

JSON Object with Array Property

Import from a JSON object containing an array:

{
  "employees": [
    {"id": "1", "name": "Alice"},
    {"id": "2", "name": "Bob"}
  ]
}
result = await pipeline.import_from_json(
    file_path="data.json",
    json_format="object_array",
    array_key="employees"  # Key containing the array
)

JSON from String

Import directly from a JSON string:

json_string = '[{"id": "1", "name": "Alice"}]'
result = await pipeline.import_from_json(
    file_path=None,
    json_data=json_string
)

Batch Processing

For large files, process in batches to manage memory:

pipeline = StructuredDataPipeline(
    mapping=mapping,
    graph_store=store,
    batch_size=1000  # Process 1000 rows at a time
)

result = await pipeline.import_from_csv("large_file.csv")

Benefits:

  • Lower memory usage

  • Progress tracking

  • Better error recovery

Error Handling

Skip Errors (Default)

By default, the pipeline skips rows with errors and continues:

pipeline = StructuredDataPipeline(
    mapping=mapping,
    graph_store=store,
    skip_errors=True  # Default
)

result = await pipeline.import_from_csv("data.csv")

# Check errors
if result.errors:
    print(f"Skipped {len(result.errors)} rows with errors")
    for error in result.errors[:5]:  # Show first 5
        print(f"  - {error}")

Fail on First Error

Stop processing on first error:

pipeline = StructuredDataPipeline(
    mapping=mapping,
    graph_store=store,
    skip_errors=False  # Stop on first error
)

try:
    result = await pipeline.import_from_csv("data.csv")
except Exception as e:
    print(f"Import failed: {e}")

Validate Before Import

Validate mapping and data before importing:

# Validate mapping
mapping_errors = mapping.validate()
if mapping_errors:
    print(f"Mapping errors: {mapping_errors}")
    return

# Validate CSV columns
import pandas as pd
df = pd.read_csv("data.csv", nrows=1)  # Read first row only
required_columns = set()
for entity_mapping in mapping.entity_mappings:
    required_columns.update(entity_mapping.source_columns)

missing = required_columns - set(df.columns)
if missing:
    print(f"Missing columns: {missing}")
    return

# Safe to import
result = await pipeline.import_from_csv("data.csv")

Advanced Features

Progress Callback

Track import progress:

async def progress_callback(current: int, total: int):
    percentage = (current / total) * 100
    print(f"Progress: {current}/{total} ({percentage:.1f}%)")

pipeline = StructuredDataPipeline(
    mapping=mapping,
    graph_store=store,
    progress_callback=progress_callback
)

result = await pipeline.import_from_csv("large_file.csv")

Custom Metadata

Add metadata to imported entities and relations:

# Metadata is automatically added:
# {
#     "source": "structured_data_import",
#     "imported_at": "2024-01-15T10:30:00"
# }

# You can add custom metadata by modifying the pipeline
# (requires extending StructuredDataPipeline)

Multiple Imports

Import multiple files:

files = ["employees.csv", "departments.csv", "projects.csv"]

for file_path in files:
    result = await pipeline.import_from_csv(file_path)
    print(f"{file_path}: {result.entities_added} entities, {result.relations_added} relations")

Incremental Import

Add new data without duplicates:

# First import
result1 = await pipeline.import_from_csv("initial_data.csv")
print(f"Initial: {result1.entities_added} entities")

# Later, import updates (duplicates are handled by GraphStore)
result2 = await pipeline.import_from_csv("updates.csv")
print(f"Updates: {result2.entities_added} entities")

Performance Optimization

Parallel Processing

Enable parallel batch processing for faster imports:

pipeline = StructuredDataPipeline(
    mapping=mapping,
    graph_store=store,
    enable_parallel=True,  # Enable parallel processing
    max_workers=4  # Number of worker processes (default: CPU count - 1)
)

result = await pipeline.import_from_csv("large_file.csv")
print(f"Throughput: {result.performance_metrics.rows_per_second:.0f} rows/sec")

When to use:

  • Large datasets (>10,000 rows)

  • Multi-core systems

  • CPU-bound transformations

Performance: 2-3x speedup on multi-core systems

Bulk Write Operations

Use bulk writes for faster storage:

# Bulk writes are automatically used when available
# No code changes needed - works transparently

pipeline = StructuredDataPipeline(
    mapping=mapping,
    graph_store=store,
    batch_size=1000  # Larger batches = better bulk write performance
)

result = await pipeline.import_from_csv("large_file.csv")

Performance: 80%+ overhead reduction for bulk operations

Streaming Import for Large Files

Import files larger than available memory:

pipeline = StructuredDataPipeline(
    mapping=mapping,
    graph_store=store,
    streaming=True  # Enable streaming mode
)

# Works with files >1GB without loading entire file into memory
result = await pipeline.import_from_csv("very_large_file.csv")

Batch Size Auto-Tuning

Let the system automatically optimize batch size:

pipeline = StructuredDataPipeline(
    mapping=mapping,
    graph_store=store,
    auto_tune_batch_size=True  # Auto-tune based on memory and schema
)

result = await pipeline.import_from_csv("data.csv")
print(f"Used batch size: {result.performance_metrics.optimal_batch_size}")

Performance Metrics

Track import performance:

result = await pipeline.import_from_csv("data.csv")

if result.performance_metrics:
    metrics = result.performance_metrics
    print(f"Total time: {metrics.total_time:.2f}s")
    print(f"Read time: {metrics.read_time:.2f}s")
    print(f"Transform time: {metrics.transform_time:.2f}s")
    print(f"Write time: {metrics.write_time:.2f}s")
    print(f"Throughput: {metrics.rows_per_second:.0f} rows/sec")
    print(f"Peak memory: {metrics.peak_memory_mb:.1f} MB")

Performance Tips

1. Use Batch Processing

# ✅ Good: Process in batches
pipeline = StructuredDataPipeline(
    mapping=mapping,
    graph_store=store,
    batch_size=1000
)

# ❌ Avoid: Process entire file at once (for large files)
pipeline = StructuredDataPipeline(
    mapping=mapping,
    graph_store=store,
    batch_size=None  # Processes entire file
)

2. Optimize Schema Mapping

# ✅ Good: Only include needed columns
EntityMapping(
    source_columns=["id", "name"],  # Only needed columns
    entity_type="Person",
    ...
)

# ❌ Avoid: Include unnecessary columns
EntityMapping(
    source_columns=["id", "name", "unused1", "unused2"],  # Extra columns
    entity_type="Person",
    ...
)

3. Use Appropriate Storage Backend

# ✅ For development/testing: InMemoryGraphStore
store = InMemoryGraphStore()

# ✅ For production: SQLiteGraphStore or PostgreSQLGraphStore
from aiecs.infrastructure.graph_storage.sqlite import SQLiteGraphStore
store = SQLiteGraphStore(db_path="kg.db")
await store.initialize()

4. Pre-process Large Files

For very large files, consider preprocessing:

# Split large CSV into smaller files
import pandas as pd

df = pd.read_csv("large_file.csv")
chunk_size = 10000

for i, chunk in enumerate(pd.read_csv("large_file.csv", chunksize=chunk_size)):
    chunk_file = f"chunk_{i}.csv"
    chunk.to_csv(chunk_file, index=False)
    
    result = await pipeline.import_from_csv(chunk_file)
    print(f"Chunk {i}: {result.entities_added} entities")

5. Validate Before Import

Validate data structure before importing:

# Quick validation
import pandas as pd

df = pd.read_csv("data.csv", nrows=10)  # Sample first 10 rows
print(f"Columns: {df.columns.tolist()}")
print(f"Sample data:\n{df.head()}")

# Check for required columns
required = ["id", "name"]
missing = set(required) - set(df.columns)
if missing:
    raise ValueError(f"Missing columns: {missing}")

# Now import full file
result = await pipeline.import_from_csv("data.csv")

Complete Example

import asyncio
from aiecs.application.knowledge_graph.builder.schema_mapping import (
    SchemaMapping,
    EntityMapping,
    RelationMapping,
    PropertyTransformation,
    TransformationType
)
from aiecs.application.knowledge_graph.builder.structured_pipeline import StructuredDataPipeline
from aiecs.infrastructure.graph_storage.in_memory import InMemoryGraphStore
from aiecs.domain.knowledge_graph.schema.property_schema import PropertyType

async def main():
    # 1. Define mapping
    mapping = SchemaMapping(
        entity_mappings=[
            EntityMapping(
                source_columns=["emp_id", "name", "email", "salary"],
                entity_type="Employee",
                property_mapping={"emp_id": "id", "name": "name", "email": "email"},
                transformations=[
                    PropertyTransformation(
                        transformation_type=TransformationType.TYPE_CAST,
                        source_column="salary",
                        target_property="salary",
                        target_type=PropertyType.INTEGER
                    )
                ],
                id_column="emp_id"
            ),
            EntityMapping(
                source_columns=["dept_id", "dept_name"],
                entity_type="Department",
                property_mapping={"dept_id": "id", "dept_name": "name"},
                id_column="dept_id"
            )
        ],
        relation_mappings=[
            RelationMapping(
                source_columns=["emp_id", "dept_id"],
                relation_type="WORKS_IN",
                source_entity_column="emp_id",
                target_entity_column="dept_id"
            )
        ]
    )
    
    # 2. Initialize store
    store = InMemoryGraphStore()
    await store.initialize()
    
    # 3. Create pipeline
    pipeline = StructuredDataPipeline(
        mapping=mapping,
        graph_store=store,
        batch_size=100
    )
    
    # 4. Import CSV
    result = await pipeline.import_from_csv("employees.csv")
    
    # 5. Check results
    print(f"✅ Import complete!")
    print(f"   Entities added: {result.entities_added}")
    print(f"   Relations added: {result.relations_added}")
    print(f"   Rows processed: {result.rows_processed}")
    
    if result.errors:
        print(f"⚠️  Errors: {len(result.errors)}")
        for error in result.errors[:5]:
            print(f"   - {error}")
    
    if result.warnings:
        print(f"⚠️  Warnings: {len(result.warnings)}")
        for warning in result.warnings[:5]:
            print(f"   - {warning}")
    
    # 6. Query graph
    employees = await store.get_entities_by_type("Employee")
    print(f"\n📊 Found {len(employees)} employees in graph")
    
    await store.close()

if __name__ == "__main__":
    asyncio.run(main())

Next Steps

  • See Schema Mapping Guide for detailed mapping configuration

  • See CSV-to-Graph Tutorial for complete CSV example

  • See JSON-to-Graph Tutorial for complete JSON example

  • See Example Scripts for complete working examples:

    • 18_spss_import_with_inference.py - SPSS import with automatic schema inference

    • 19_wide_format_normalization.py - Reshape wide format to normalized structure

    • 20_statistical_aggregation.py - Statistical aggregation during import

    • 21_data_quality_validation.py - Data quality validation