JSON-to-Graph Tutorial
This tutorial walks you through importing JSON data into a knowledge graph step by step.
Scenario
You have product catalog data in JSON format and want to import it into a knowledge graph to enable graph-based queries and recommendations.
Sample Data Formats
Format 1: JSON Array
products.json:
[
{
"product_id": "P001",
"name": "Laptop",
"category": "Electronics",
"price": "999.99",
"in_stock": "true",
"tags": ["computers", "portable"]
},
{
"product_id": "P002",
"name": "Mouse",
"category": "Electronics",
"price": "29.99",
"in_stock": "true",
"tags": ["accessories", "input"]
}
]
Format 2: JSON Lines (NDJSON)
products.jsonl:
{"product_id": "P001", "name": "Laptop", "category": "Electronics", "price": "999.99"}
{"product_id": "P002", "name": "Mouse", "category": "Electronics", "price": "29.99"}
Format 3: Object with Array Property
products_wrapped.json:
{
"products": [
{"product_id": "P001", "name": "Laptop", "category": "Electronics"},
{"product_id": "P002", "name": "Mouse", "category": "Electronics"}
]
}
Step 1: Create Schema Mapping
Define how JSON fields map to knowledge graph entities:
from aiecs.application.knowledge_graph.builder.schema_mapping import (
SchemaMapping,
EntityMapping,
PropertyTransformation,
TransformationType
)
from aiecs.domain.knowledge_graph.schema.property_schema import PropertyType
mapping = SchemaMapping(
entity_mappings=[
EntityMapping(
source_columns=["product_id", "name", "category", "price", "in_stock", "tags"],
entity_type="Product",
property_mapping={
"product_id": "id",
"name": "name",
"category": "category",
"tags": "tags"
},
transformations=[
# Cast price to float
PropertyTransformation(
transformation_type=TransformationType.TYPE_CAST,
source_column="price",
target_property="price",
target_type=PropertyType.FLOAT
),
# Cast in_stock to boolean
PropertyTransformation(
transformation_type=TransformationType.TYPE_CAST,
source_column="in_stock",
target_property="available",
target_type=PropertyType.BOOLEAN
),
# Parse tags JSON string to list
PropertyTransformation(
transformation_type=TransformationType.TYPE_CAST,
source_column="tags",
target_property="tags",
target_type=PropertyType.LIST
)
],
id_column="product_id"
)
]
)
Step 2: Initialize Graph Store
from aiecs.infrastructure.graph_storage.in_memory import InMemoryGraphStore
store = InMemoryGraphStore()
await store.initialize()
Step 3: Create Pipeline
from aiecs.application.knowledge_graph.builder.structured_pipeline import StructuredDataPipeline
pipeline = StructuredDataPipeline(
mapping=mapping,
graph_store=store
)
Step 4: Import JSON
Import JSON Array
result = await pipeline.import_from_json("products.json")
print(f"Added {result.entities_added} products")
Import JSON Lines
result = await pipeline.import_from_json(
file_path="products.jsonl",
json_format="jsonl"
)
Import Object with Array Property
result = await pipeline.import_from_json(
file_path="products_wrapped.json",
json_format="object_array",
array_key="products"
)
Import from String
json_string = '[{"product_id": "P001", "name": "Laptop"}]'
result = await pipeline.import_from_json(
file_path=None,
json_data=json_string
)
Complete Example: JSON Array
import asyncio
from aiecs.application.knowledge_graph.builder.schema_mapping import (
SchemaMapping,
EntityMapping,
PropertyTransformation,
TransformationType
)
from aiecs.application.knowledge_graph.builder.structured_pipeline import StructuredDataPipeline
from aiecs.infrastructure.graph_storage.in_memory import InMemoryGraphStore
from aiecs.domain.knowledge_graph.schema.property_schema import PropertyType
async def main():
# Step 1: Define mapping
mapping = SchemaMapping(
entity_mappings=[
EntityMapping(
source_columns=["product_id", "name", "category", "price", "in_stock"],
entity_type="Product",
property_mapping={
"product_id": "id",
"name": "name",
"category": "category"
},
transformations=[
PropertyTransformation(
transformation_type=TransformationType.TYPE_CAST,
source_column="price",
target_property="price",
target_type=PropertyType.FLOAT
),
PropertyTransformation(
transformation_type=TransformationType.TYPE_CAST,
source_column="in_stock",
target_property="available",
target_type=PropertyType.BOOLEAN
)
],
id_column="product_id"
)
]
)
# Step 2: Initialize store
store = InMemoryGraphStore()
await store.initialize()
# Step 3: Create pipeline
pipeline = StructuredDataPipeline(
mapping=mapping,
graph_store=store
)
# Step 4: Import JSON
print("📥 Importing JSON...")
result = await pipeline.import_from_json("products.json")
print(f"\n✅ Import complete!")
print(f" Products added: {result.entities_added}")
print(f" Items processed: {result.rows_processed}")
# Step 5: Query graph
print("\n📊 Querying graph...")
products = await store.get_entities_by_type("Product")
print(f"Total products: {len(products)}")
for product in products[:3]: # Show first 3
print(f" - {product.properties['name']} (${product.properties['price']})")
await store.close()
if __name__ == "__main__":
asyncio.run(main())
Advanced: Nested JSON Structures
For nested JSON, flatten first or use multiple mappings:
products_nested.json:
[
{
"product": {
"id": "P001",
"name": "Laptop"
},
"category": {
"id": "C001",
"name": "Electronics"
}
}
]
Option 1: Flatten in Preprocessing
import json
# Load and flatten
with open("products_nested.json") as f:
data = json.load(f)
flattened = []
for item in data:
flattened.append({
"product_id": item["product"]["id"],
"product_name": item["product"]["name"],
"category_id": item["category"]["id"],
"category_name": item["category"]["name"]
})
# Save flattened JSON
with open("products_flat.json", "w") as f:
json.dump(flattened, f)
# Now import flattened JSON
result = await pipeline.import_from_json("products_flat.json")
Option 2: Multiple Mappings
mapping = SchemaMapping(
entity_mappings=[
EntityMapping(
source_columns=["product_id", "product_name"],
entity_type="Product",
property_mapping={"product_id": "id", "product_name": "name"},
id_column="product_id"
),
EntityMapping(
source_columns=["category_id", "category_name"],
entity_type="Category",
property_mapping={"category_id": "id", "category_name": "name"},
id_column="category_id"
)
],
relation_mappings=[
RelationMapping(
source_columns=["product_id", "category_id"],
relation_type="BELONGS_TO",
source_entity_column="product_id",
target_entity_column="category_id"
)
]
)
Advanced: Relations from JSON
products_with_relations.json:
[
{
"product_id": "P001",
"name": "Laptop",
"category_id": "C001",
"related_products": ["P002", "P003"]
}
]
Mapping with Relations:
from aiecs.application.knowledge_graph.builder.schema_mapping import RelationMapping
mapping = SchemaMapping(
entity_mappings=[
EntityMapping(
source_columns=["product_id", "name", "category_id"],
entity_type="Product",
property_mapping={"product_id": "id", "name": "name"},
id_column="product_id"
),
EntityMapping(
source_columns=["category_id"],
entity_type="Category",
property_mapping={"category_id": "id"},
id_column="category_id"
)
],
relation_mappings=[
# Product to Category
RelationMapping(
source_columns=["product_id", "category_id"],
relation_type="BELONGS_TO",
source_entity_column="product_id",
target_entity_column="category_id"
)
]
)
# Note: For related_products array, you'd need to:
# 1. Expand array into multiple rows, OR
# 2. Process relations separately after import
Handling Complex JSON Types
Arrays
If a field contains a JSON array string:
{
"product_id": "P001",
"tags": "[\"electronics\", \"computers\"]"
}
Use PropertyType.LIST:
PropertyTransformation(
transformation_type=TransformationType.TYPE_CAST,
source_column="tags",
target_property="tags",
target_type=PropertyType.LIST
)
Objects
If a field contains a JSON object string:
{
"product_id": "P001",
"metadata": "{\"source\": \"api\", \"version\": 1}"
}
Use PropertyType.DICT:
PropertyTransformation(
transformation_type=TransformationType.TYPE_CAST,
source_column="metadata",
target_property="metadata",
target_type=PropertyType.DICT
)
Troubleshooting
Issue: “JSON decode error”
Solution: Validate JSON format:
import json
# Validate JSON
with open("products.json") as f:
try:
data = json.load(f)
print(f"✅ Valid JSON with {len(data)} items")
except json.JSONDecodeError as e:
print(f"❌ Invalid JSON: {e}")
Issue: “Array key not found”
Solution: Check array key matches:
# Check structure
with open("products_wrapped.json") as f:
data = json.load(f)
print(f"Top-level keys: {data.keys()}")
# Should contain the array_key you specified
Issue: “Field not found”
Solution: Check field names match:
# Check first item structure
with open("products.json") as f:
data = json.load(f)
if data:
print(f"Fields in first item: {data[0].keys()}")
Performance Tips
1. Use JSON Lines for Large Files
JSON Lines (NDJSON) is more memory-efficient for large datasets:
# ✅ Good: JSON Lines
result = await pipeline.import_from_json(
file_path="large_file.jsonl",
json_format="jsonl"
)
# ⚠️ Less efficient: JSON Array (loads entire file)
result = await pipeline.import_from_json("large_file.json")
2. Batch Processing
pipeline = StructuredDataPipeline(
mapping=mapping,
graph_store=store,
batch_size=1000 # Process in batches
)
3. Validate Before Import
import json
# Quick validation
with open("products.json") as f:
data = json.load(f)
print(f"Items: {len(data)}")
if data:
print(f"Sample fields: {list(data[0].keys())}")
Next Steps
See Schema Mapping Guide for detailed mapping options
See StructuredDataPipeline Guide for advanced usage
See CSV-to-Graph Tutorial for CSV import