Source code for aiecs.application.knowledge_graph.builder.structured_pipeline

# /*---------------------------------------------------------------------------------------------
#  *  Copyright (c) IRETBL Corporation. All rights reserved.
#  *  Licensed under the Apache-2.0. See License.txt in the project root for license information.
#  *--------------------------------------------------------------------------------------------*/
"""
Structured Data Pipeline

Import structured data (CSV, JSON, SPSS, Excel) into knowledge graphs using schema mappings.
"""

import json
import logging
from pathlib import Path
from typing import List, Optional, Dict, Any, Callable, Union
from dataclasses import dataclass, field
from datetime import datetime

try:
    import pandas as pd

    PANDAS_AVAILABLE = True
except ImportError:
    PANDAS_AVAILABLE = False

from aiecs.infrastructure.graph_storage.base import GraphStore
from aiecs.domain.knowledge_graph.models.entity import Entity
from aiecs.domain.knowledge_graph.models.relation import Relation
from aiecs.application.knowledge_graph.builder.schema_mapping import (
    SchemaMapping,
)
from aiecs.application.knowledge_graph.builder.data_quality import (
    DataQualityValidator,
    ValidationConfig,
    QualityReport,
    RangeRule,
)
from aiecs.application.knowledge_graph.builder.import_optimizer import (
    PerformanceMetrics,
    BatchSizeOptimizer,
    MemoryTracker,
    StreamingCSVReader,
)

# Import InferredSchema for type hints (avoid circular import)
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from aiecs.application.knowledge_graph.builder.schema_inference import InferredSchema


logger = logging.getLogger(__name__)


[docs] @dataclass class ImportResult: """ Result of structured data import operation Attributes: success: Whether import completed successfully entities_added: Number of entities added to graph relations_added: Number of relations added to graph rows_processed: Number of rows processed rows_failed: Number of rows that failed to process errors: List of errors encountered warnings: List of warnings quality_report: Data quality validation report (if validation enabled) start_time: When import started end_time: When import ended duration_seconds: Total duration in seconds performance_metrics: Detailed performance metrics (if tracking enabled) """ success: bool = True entities_added: int = 0 relations_added: int = 0 rows_processed: int = 0 rows_failed: int = 0 errors: List[str] = field(default_factory=list) warnings: List[str] = field(default_factory=list) quality_report: Optional[QualityReport] = None start_time: Optional[datetime] = None end_time: Optional[datetime] = None duration_seconds: float = 0.0 performance_metrics: Optional[PerformanceMetrics] = None
class AggregationAccumulator: """ Accumulator for incremental statistical aggregation Computes statistics incrementally as data is processed in batches. """ def __init__(self): self.count = 0 self.sum = 0.0 self.sum_sq = 0.0 # Sum of squares for variance/std self.min_val = float("inf") self.max_val = float("-inf") self.values = [] # For median (if needed) def add(self, value: Any): """Add a value to the accumulator""" if value is None: return try: num_val = float(value) except (ValueError, TypeError): return self.count += 1 self.sum += num_val self.sum_sq += num_val * num_val self.min_val = min(self.min_val, num_val) self.max_val = max(self.max_val, num_val) self.values.append(num_val) def get_mean(self) -> Optional[float]: """Get mean value""" if self.count == 0: return None return self.sum / self.count def get_std(self) -> Optional[float]: """Get standard deviation (sample std with Bessel's correction)""" if self.count < 2: return None mean = self.get_mean() if mean is None: return None # Use sample variance formula: sum((x - mean)^2) / (n - 1) # Which equals: (sum(x^2) - n*mean^2) / (n - 1) variance = (self.sum_sq - self.count * mean * mean) / (self.count - 1) return variance**0.5 if variance >= 0 else 0.0 def get_variance(self) -> Optional[float]: """Get variance (sample variance with Bessel's correction)""" if self.count < 2: return None mean = self.get_mean() if mean is None: return None # Use sample variance formula: (sum(x^2) - n*mean^2) / (n - 1) return (self.sum_sq - self.count * mean * mean) / (self.count - 1) def get_min(self) -> Optional[float]: """Get minimum value""" if self.count == 0: return None return self.min_val def get_max(self) -> Optional[float]: """Get maximum value""" if self.count == 0: return None return self.max_val def get_sum(self) -> Optional[float]: """Get sum""" if self.count == 0: return None return self.sum def get_count(self) -> int: """Get count""" return self.count def get_median(self) -> Optional[float]: """Get median value""" if self.count == 0: return None sorted_vals = sorted(self.values) mid = self.count // 2 if self.count % 2 == 0: return float((sorted_vals[mid - 1] + sorted_vals[mid]) / 2) return float(sorted_vals[mid])
[docs] class StructuredDataPipeline: """ Pipeline for importing structured data (CSV, JSON, SPSS, Excel) into knowledge graphs Uses SchemaMapping to map source data columns to entity and relation types. Supports batch processing, progress tracking, and error handling. Example: ```python # Define schema mapping mapping = SchemaMapping( entity_mappings=[ EntityMapping( source_columns=["id", "name", "age"], entity_type="Person", property_mapping={"id": "id", "name": "name", "age": "age"} ) ], relation_mappings=[ RelationMapping( source_columns=["person_id", "company_id"], relation_type="WORKS_FOR", source_entity_column="person_id", target_entity_column="company_id" ) ] ) # Create pipeline pipeline = StructuredDataPipeline( mapping=mapping, graph_store=store ) # Import CSV result = await pipeline.import_from_csv("employees.csv") print(f"Added {result.entities_added} entities, {result.relations_added} relations") ``` """
[docs] def __init__( self, mapping: SchemaMapping, graph_store: GraphStore, batch_size: int = 100, progress_callback: Optional[Callable[[str, float], None]] = None, skip_errors: bool = True, enable_parallel: bool = False, max_workers: Optional[int] = None, auto_tune_batch_size: bool = False, enable_streaming: bool = False, use_bulk_writes: bool = True, track_performance: bool = True, ): """ Initialize structured data pipeline Args: mapping: Schema mapping configuration graph_store: Graph storage to save entities/relations batch_size: Number of rows to process in each batch (ignored if auto_tune_batch_size=True) progress_callback: Optional callback for progress updates (message, progress_pct) skip_errors: Whether to skip rows with errors and continue processing enable_parallel: Enable parallel batch processing for faster imports max_workers: Maximum number of parallel workers (default: CPU count - 1) auto_tune_batch_size: Automatically tune batch size based on system resources enable_streaming: Enable streaming mode for large files (memory-efficient) use_bulk_writes: Use bulk write operations for better performance track_performance: Track detailed performance metrics """ # Validate mapping validation_errors = mapping.validate_mapping() if validation_errors: raise ValueError(f"Invalid schema mapping: {validation_errors}") self.mapping = mapping self.graph_store = graph_store self.batch_size = batch_size self.progress_callback = progress_callback self.skip_errors = skip_errors # Performance optimization settings self.enable_parallel = enable_parallel self.max_workers = max_workers self.auto_tune_batch_size = auto_tune_batch_size self.enable_streaming = enable_streaming self.use_bulk_writes = use_bulk_writes self.track_performance = track_performance # Initialize optimizers self._batch_optimizer = BatchSizeOptimizer() if auto_tune_batch_size else None self._memory_tracker = MemoryTracker() if track_performance else None # Initialize aggregation tracking self._aggregation_accumulators: Dict[str, Dict[str, Any]] = {} # entity_type -> {property -> accumulator} # Initialize data quality validator if validation config is provided self.validator: Optional[DataQualityValidator] = None if mapping.validation_config: self.validator = self._create_validator_from_config(mapping.validation_config) if not PANDAS_AVAILABLE: logger.warning("pandas not available. CSV import will use basic CSV reader. " "Install pandas for better performance: pip install pandas")
[docs] @staticmethod def infer_schema_from_csv( file_path: Union[str, Path], encoding: str = "utf-8", sample_size: int = 1000, ) -> "InferredSchema": """ Infer schema mapping from CSV file Analyzes CSV structure and content to automatically generate schema mappings. Args: file_path: Path to CSV file encoding: File encoding (default: utf-8) sample_size: Number of rows to sample for inference (default: 1000) Returns: InferredSchema with entity and relation mappings Example: ```python # Infer schema from CSV inferred = StructuredDataPipeline.infer_schema_from_csv("data.csv") # Review and modify if needed print(f"Inferred entity types: {[em.entity_type for em in inferred.entity_mappings]}") print(f"Warnings: {inferred.warnings}") # Use inferred schema mapping = inferred.to_schema_mapping() pipeline = StructuredDataPipeline(mapping, graph_store) ``` """ from aiecs.application.knowledge_graph.builder.schema_inference import SchemaInference inference = SchemaInference(sample_size=sample_size) return inference.infer_from_csv(file_path, encoding=encoding)
[docs] @staticmethod def infer_schema_from_spss( file_path: Union[str, Path], encoding: str = "utf-8", sample_size: int = 1000, ) -> "InferredSchema": """ Infer schema mapping from SPSS file Uses SPSS variable labels and value labels to generate schema mappings. Args: file_path: Path to SPSS file encoding: File encoding (default: utf-8) sample_size: Number of rows to sample for inference (default: 1000) Returns: InferredSchema with entity and relation mappings """ from aiecs.application.knowledge_graph.builder.schema_inference import SchemaInference inference = SchemaInference(sample_size=sample_size) return inference.infer_from_spss(file_path, encoding=encoding)
[docs] @staticmethod def infer_schema_from_dataframe( df: "pd.DataFrame", entity_type_hint: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, sample_size: int = 1000, ) -> "InferredSchema": """ Infer schema mapping from pandas DataFrame Args: df: DataFrame to analyze entity_type_hint: Optional hint for entity type name metadata: Optional metadata (e.g., SPSS variable labels) sample_size: Number of rows to sample for inference (default: 1000) Returns: InferredSchema with entity and relation mappings """ from aiecs.application.knowledge_graph.builder.schema_inference import SchemaInference inference = SchemaInference(sample_size=sample_size) return inference.infer_from_dataframe(df, entity_type_hint=entity_type_hint, metadata=metadata)
[docs] @staticmethod def create_with_auto_reshape( file_path: Union[str, Path], graph_store: GraphStore, entity_type_hint: Optional[str] = None, reshape_threshold: int = 50, **kwargs, ) -> "StructuredDataPipeline": """ Create pipeline with automatic reshaping for wide format data Detects wide format data and automatically reshapes to normalized structure before creating the pipeline. Args: file_path: Path to data file (CSV, SPSS, Excel) graph_store: Graph storage to save entities/relations entity_type_hint: Optional hint for entity type name reshape_threshold: Minimum columns to trigger reshaping (default: 50) **kwargs: Additional arguments for StructuredDataPipeline Returns: StructuredDataPipeline configured for the data Example: ```python # Automatically detect and reshape wide format data pipeline = StructuredDataPipeline.create_with_auto_reshape( "wide_data.csv", graph_store, entity_type_hint="Sample" ) # Import reshaped data result = await pipeline.import_from_csv("wide_data.csv") ``` """ from aiecs.application.knowledge_graph.builder.data_reshaping import DataReshaping from aiecs.application.knowledge_graph.builder.schema_inference import SchemaInference if not PANDAS_AVAILABLE: raise ImportError("pandas is required for automatic reshaping") # Load data to analyze file_path_str = str(file_path) if file_path_str.endswith(".csv"): df = pd.read_csv(file_path, nrows=1000) # Sample for analysis elif file_path_str.endswith((".sav", ".por")): import pyreadstat df, _ = pyreadstat.read_sav(file_path_str, row_limit=1000) elif file_path_str.endswith((".xlsx", ".xls")): df = pd.read_excel(file_path, nrows=1000) else: raise ValueError(f"Unsupported file format: {file_path}") # Check if data is in wide format is_wide = DataReshaping.detect_wide_format(df, threshold_columns=reshape_threshold) if is_wide: logger.info(f"Detected wide format data ({df.shape[1]} columns). Suggesting normalized structure.") # Suggest melt configuration melt_config = DataReshaping.suggest_melt_config(df) logger.info(f"Suggested melt config: id_vars={melt_config['id_vars']}, " f"{len(melt_config['value_vars'])} value columns") # For wide format, we'll need to reshape during import # For now, infer schema from original data inference = SchemaInference() inferred = inference.infer_from_dataframe(df, entity_type_hint=entity_type_hint) # Add warning about wide format inferred.warnings.append(f"Wide format detected ({df.shape[1]} columns). " f"Consider using reshape_and_import() for normalized structure.") mapping = inferred.to_schema_mapping() else: # Normal format - infer schema directly inference = SchemaInference() inferred = inference.infer_from_dataframe(df, entity_type_hint=entity_type_hint) mapping = inferred.to_schema_mapping() return StructuredDataPipeline(mapping=mapping, graph_store=graph_store, **kwargs)
[docs] async def import_from_csv( self, file_path: Union[str, Path], encoding: str = "utf-8", delimiter: str = ",", header: bool = True, ) -> ImportResult: """ Import data from CSV file Args: file_path: Path to CSV file encoding: File encoding (default: utf-8) delimiter: CSV delimiter (default: comma) header: Whether file has header row (default: True) Returns: ImportResult with statistics """ result = ImportResult(start_time=datetime.now()) try: # Read CSV file if PANDAS_AVAILABLE: df = pd.read_csv( file_path, encoding=encoding, sep=delimiter, header=0 if header else None, ) # Run data quality validation if validator is configured if self.validator: # Determine ID column for validation id_column = None for entity_mapping in self.mapping.entity_mappings: if entity_mapping.id_column: id_column = entity_mapping.id_column break quality_report = self.validator.validate_dataframe(df, id_column=id_column) result.quality_report = quality_report # Log quality issues if quality_report.violations: logger.warning(f"Data quality validation found {len(quality_report.violations)} violations") for violation in quality_report.violations[:5]: # Log first 5 logger.warning(f" {violation.message}") if len(quality_report.violations) > 5: logger.warning(f" ... and {len(quality_report.violations) - 5} more violations") # Fail import if configured and validation failed if not quality_report.passed: result.success = False result.errors.append(f"Data quality validation failed: {len(quality_report.violations)} violations") return result rows = df.to_dict("records") else: # Fallback to basic CSV reader import csv rows = [] with open(file_path, "r", encoding=encoding) as f: reader = csv.DictReader(f) if header else csv.reader(f) if header: for row in reader: rows.append(row) else: # No header - use column indices for row in reader: rows.append({str(i): val for i, val in enumerate(row)}) # Process rows result = await self._process_rows(rows, result) except Exception as e: error_msg = f"Failed to import CSV file {file_path}: {e}" logger.error(error_msg, exc_info=True) result.success = False result.errors.append(error_msg) finally: result.end_time = datetime.now() if result.start_time: result.duration_seconds = (result.end_time - result.start_time).total_seconds() return result
[docs] async def import_from_json( self, file_path: Union[str, Path], encoding: str = "utf-8", array_key: Optional[str] = None, ) -> ImportResult: """ Import data from JSON file Supports: - Array of objects: [{"id": 1, "name": "Alice"}, ...] - Object with array: {"items": [{"id": 1, ...}, ...]} - Single object: {"id": 1, "name": "Alice"} Args: file_path: Path to JSON file encoding: File encoding (default: utf-8) array_key: If JSON is object with array, key containing the array Returns: ImportResult with statistics """ result = ImportResult(start_time=datetime.now()) try: # Read JSON file with open(file_path, "r", encoding=encoding) as f: data = json.load(f) # Extract rows if isinstance(data, list): rows = data elif isinstance(data, dict): if array_key: rows = data.get(array_key, []) if not isinstance(rows, list): raise ValueError(f"Key '{array_key}' does not contain an array") else: # Single object - wrap in list rows = [data] else: raise ValueError(f"JSON file must contain array or object, got {type(data)}") # Process rows result = await self._process_rows(rows, result) except Exception as e: error_msg = f"Failed to import JSON file {file_path}: {e}" logger.error(error_msg, exc_info=True) result.success = False result.errors.append(error_msg) finally: result.end_time = datetime.now() if result.start_time: result.duration_seconds = (result.end_time - result.start_time).total_seconds() return result
[docs] async def import_from_csv_streaming( self, file_path: Union[str, Path], encoding: str = "utf-8", delimiter: str = ",", chunk_size: int = 10000, ) -> ImportResult: """ Import data from CSV file using streaming mode. Memory-efficient import for large files (>1GB). Reads file in chunks without loading entire file into memory. Args: file_path: Path to CSV file encoding: File encoding (default: utf-8) delimiter: CSV delimiter (default: comma) chunk_size: Number of rows per chunk (default: 10000) Returns: ImportResult with statistics and performance metrics """ import time result = ImportResult(start_time=datetime.now()) # Initialize performance metrics metrics = PerformanceMetrics() if self.track_performance else None if metrics: metrics.start_time = time.time() if self._memory_tracker: self._memory_tracker.start_tracking() metrics.initial_memory_mb = self._memory_tracker.initial_memory_mb try: if not PANDAS_AVAILABLE: raise ImportError("pandas is required for streaming CSV import") # Count total rows for progress tracking streaming_reader = StreamingCSVReader( str(file_path), chunk_size=chunk_size, encoding=encoding, delimiter=delimiter, ) total_rows = streaming_reader.count_rows() if metrics: metrics.total_rows = total_rows processed_rows = 0 batch_count = 0 # Process file in chunks async for chunk_df in streaming_reader.read_chunks(): read_start = time.time() rows = chunk_df.to_dict("records") if metrics: metrics.read_time_seconds += time.time() - read_start # Update progress if self.progress_callback: progress_pct = (processed_rows / total_rows) * 100 if total_rows > 0 else 0 self.progress_callback( f"Streaming chunk {batch_count + 1}: {processed_rows}/{total_rows} rows", progress_pct, ) # Process chunk transform_start = time.time() for row in rows: try: row_entities = await self._row_to_entities(row) row_relations = await self._row_to_relations(row) # Add entities and relations if self.use_bulk_writes and hasattr(self.graph_store, "add_entities_bulk"): added = await self.graph_store.add_entities_bulk(row_entities) result.entities_added += added else: for entity in row_entities: try: await self.graph_store.add_entity(entity) result.entities_added += 1 except ValueError: pass if self.use_bulk_writes and hasattr(self.graph_store, "add_relations_bulk"): added = await self.graph_store.add_relations_bulk(row_relations) result.relations_added += added else: for relation in row_relations: try: await self.graph_store.add_relation(relation) result.relations_added += 1 except ValueError: pass result.rows_processed += 1 except Exception as e: result.rows_failed += 1 if not self.skip_errors: raise result.warnings.append(f"Row error: {e}") if metrics: metrics.transform_time_seconds += time.time() - transform_start processed_rows += len(rows) batch_count += 1 # Update memory tracking if self._memory_tracker: self._memory_tracker.update() # Finalize metrics if metrics: metrics.end_time = time.time() metrics.batch_count = batch_count if self._memory_tracker: metrics.peak_memory_mb = self._memory_tracker.peak_memory_mb metrics.calculate_throughput() result.performance_metrics = metrics except Exception as e: error_msg = f"Failed to import CSV file (streaming): {e}" logger.error(error_msg, exc_info=True) result.success = False result.errors.append(error_msg) finally: result.end_time = datetime.now() if result.start_time: result.duration_seconds = (result.end_time - result.start_time).total_seconds() return result
[docs] async def import_from_spss( self, file_path: Union[str, Path], encoding: str = "utf-8", preserve_metadata: bool = True, ) -> ImportResult: """ Import data from SPSS file (.sav, .por) Uses pyreadstat library to read SPSS files and extract metadata. SPSS variable labels and value labels are preserved as entity properties. Args: file_path: Path to SPSS file (.sav or .por) encoding: File encoding (default: utf-8) preserve_metadata: Whether to preserve SPSS metadata (variable labels, value labels) Returns: ImportResult with statistics """ result = ImportResult(start_time=datetime.now()) try: # Import pyreadstat try: import pyreadstat except ImportError: raise ImportError("pyreadstat is required for SPSS import. " "Install with: pip install pyreadstat") if not PANDAS_AVAILABLE: raise ImportError("pandas is required for SPSS import. Install with: pip install pandas") # Read SPSS file df, meta = pyreadstat.read_sav(str(file_path), encoding=encoding) # Convert DataFrame to list of dictionaries rows = df.to_dict("records") # If preserve_metadata is True, add SPSS metadata to each row if preserve_metadata and meta: # Extract metadata spss_metadata = { "column_names": meta.column_names if hasattr(meta, "column_names") else [], "column_labels": meta.column_labels if hasattr(meta, "column_labels") else [], "variable_value_labels": meta.variable_value_labels if hasattr(meta, "variable_value_labels") else {}, } # Store metadata in result for reference if spss_metadata.get("column_labels"): result.warnings.append(f"SPSS metadata preserved: {len(spss_metadata['column_labels'])} variable labels") # Add metadata to each row's properties for row in rows: row["_spss_metadata"] = spss_metadata # Process rows result = await self._process_rows(rows, result) except Exception as e: error_msg = f"Failed to import SPSS file {file_path}: {e}" logger.error(error_msg, exc_info=True) result.success = False result.errors.append(error_msg) finally: result.end_time = datetime.now() if result.start_time: result.duration_seconds = (result.end_time - result.start_time).total_seconds() return result
[docs] async def import_from_excel( self, file_path: Union[str, Path], sheet_name: Union[str, int, None] = 0, encoding: str = "utf-8", header: bool = True, ) -> ImportResult: """ Import data from Excel file (.xlsx, .xls) Supports importing from specific sheets or all sheets. Args: file_path: Path to Excel file sheet_name: Sheet name (str), sheet index (int), or None for all sheets (default: 0 = first sheet) encoding: File encoding (default: utf-8) header: Whether file has header row (default: True) Returns: ImportResult with statistics """ result = ImportResult(start_time=datetime.now()) try: if not PANDAS_AVAILABLE: raise ImportError("pandas is required for Excel import. Install with: pip install pandas openpyxl") # Read Excel file if sheet_name is None: # Read all sheets excel_data = pd.read_excel( file_path, sheet_name=None, # Returns dict of sheet_name -> DataFrame header=0 if header else None, ) # Process each sheet all_rows = [] for sheet_name_key, df in excel_data.items(): sheet_rows = df.to_dict("records") # Add sheet name to each row for reference for row in sheet_rows: row["_excel_sheet"] = sheet_name_key all_rows.extend(sheet_rows) rows = all_rows result.warnings.append(f"Imported {len(excel_data)} sheets from Excel file") else: # Read specific sheet df = pd.read_excel( file_path, sheet_name=sheet_name, header=0 if header else None, ) rows = df.to_dict("records") # Process rows result = await self._process_rows(rows, result) except Exception as e: error_msg = f"Failed to import Excel file {file_path}: {e}" logger.error(error_msg, exc_info=True) result.success = False result.errors.append(error_msg) finally: result.end_time = datetime.now() if result.start_time: result.duration_seconds = (result.end_time - result.start_time).total_seconds() return result
[docs] async def reshape_and_import_csv( self, file_path: Union[str, Path], id_vars: Optional[List[str]] = None, value_vars: Optional[List[str]] = None, var_name: str = "variable", value_name: str = "value", entity_type_hint: Optional[str] = None, encoding: str = "utf-8", ) -> ImportResult: """ Reshape wide format CSV to normalized structure and import Automatically converts wide format data (many columns) to long format (normalized structure) before importing into the graph. Args: file_path: Path to CSV file id_vars: Columns to use as identifiers (auto-detected if None) value_vars: Columns to unpivot (auto-detected if None) var_name: Name for variable column (default: 'variable') value_name: Name for value column (default: 'value') entity_type_hint: Optional hint for entity type name encoding: File encoding (default: utf-8) Returns: ImportResult with statistics Example: ```python # Wide format: sample_id, option1, option2, ..., option200 # Will be reshaped to: sample_id, variable, value result = await pipeline.reshape_and_import_csv( "wide_data.csv", id_vars=['sample_id'], var_name='option_name', value_name='option_value' ) ``` """ from aiecs.application.knowledge_graph.builder.data_reshaping import DataReshaping result = ImportResult(start_time=datetime.now()) try: if not PANDAS_AVAILABLE: raise ImportError("pandas is required for reshaping") # Read CSV df = pd.read_csv(file_path, encoding=encoding) # Auto-detect melt configuration if not provided if id_vars is None: melt_config = DataReshaping.suggest_melt_config(df) id_vars = melt_config["id_vars"] if value_vars is None: value_vars = melt_config["value_vars"] result.warnings.append(f"Auto-detected id_vars: {id_vars}") # Reshape data reshape_result = DataReshaping.melt( df, id_vars=id_vars, value_vars=value_vars, var_name=var_name, value_name=value_name, dropna=True, ) result.warnings.extend(reshape_result.warnings or []) result.warnings.append(f"Reshaped from {reshape_result.original_shape} to {reshape_result.new_shape}") # Convert reshaped data to rows rows = reshape_result.data.to_dict("records") # Process rows result = await self._process_rows(rows, result) except Exception as e: error_msg = f"Failed to reshape and import CSV {file_path}: {e}" logger.error(error_msg, exc_info=True) result.success = False result.errors.append(error_msg) finally: result.end_time = datetime.now() if result.start_time: result.duration_seconds = (result.end_time - result.start_time).total_seconds() return result
async def _process_rows(self, rows: List[Dict[str, Any]], result: ImportResult) -> ImportResult: """ Process rows and convert to entities/relations Args: rows: List of row dictionaries result: ImportResult to update Returns: Updated ImportResult """ import time total_rows = len(rows) if total_rows == 0: result.warnings.append("No rows to process") return result # Initialize performance metrics if tracking enabled metrics = None if self.track_performance: metrics = PerformanceMetrics() metrics.start_time = time.time() metrics.total_rows = total_rows if self._memory_tracker: self._memory_tracker.start_tracking() metrics.initial_memory_mb = self._memory_tracker.initial_memory_mb # Determine batch size (auto-tune if enabled) batch_size = self.batch_size if self._batch_optimizer is not None: # Estimate column count from first row column_count = len(rows[0]) if rows else 10 batch_size = self._batch_optimizer.estimate_batch_size(column_count) logger.debug(f"Auto-tuned batch size: {batch_size}") # Process in batches batch_count = 0 for batch_start in range(0, total_rows, batch_size): batch_time_start = time.time() if metrics else 0 batch_end = min(batch_start + batch_size, total_rows) batch_rows = rows[batch_start:batch_end] # Update progress if self.progress_callback: progress_pct = (batch_end / total_rows) * 100 self.progress_callback( f"Processing rows {batch_start+1}-{batch_end} of {total_rows}", progress_pct, ) # Process batch batch_result = await self._process_batch(batch_rows) batch_count += 1 # Update result result.entities_added += batch_result.entities_added result.relations_added += batch_result.relations_added result.rows_processed += batch_result.rows_processed result.rows_failed += batch_result.rows_failed result.errors.extend(batch_result.errors) result.warnings.extend(batch_result.warnings) # Record batch time for adaptive tuning if self._batch_optimizer is not None: batch_time = time.time() - batch_time_start self._batch_optimizer.record_batch_time(batch_time, len(batch_rows)) # Adjust batch size for next iteration batch_size = self._batch_optimizer.adjust_batch_size() # Update memory tracking if self._memory_tracker: self._memory_tracker.update() # Finalize performance metrics if metrics: metrics.end_time = time.time() metrics.batch_count = batch_count if self._memory_tracker: metrics.peak_memory_mb = self._memory_tracker.peak_memory_mb metrics.calculate_throughput() result.performance_metrics = metrics # Apply aggregations after all batches processed if self.mapping.aggregations: aggregation_results = await self._apply_aggregations() # Store aggregated values as summary entities for entity_type, properties in aggregation_results.items(): try: # Create a summary entity with aggregated statistics summary_entity = Entity( id=f"{entity_type}_summary", entity_type=f"{entity_type}Summary", properties=properties, ) # Try to add the summary entity (may already exist from previous import) try: await self.graph_store.add_entity(summary_entity) result.entities_added += 1 except ValueError: # Entity already exists, try to update if method exists if hasattr(self.graph_store, "update_entity"): await self.graph_store.update_entity(summary_entity) else: # For stores without update_entity, just skip pass result.warnings.append(f"Applied aggregations to {entity_type}: {list(properties.keys())}") except Exception as e: result.warnings.append(f"Failed to apply aggregations for {entity_type}: {e}") return result async def _process_batch(self, rows: List[Dict[str, Any]]) -> ImportResult: """ Process a batch of rows Args: rows: List of row dictionaries Returns: ImportResult for this batch """ batch_result = ImportResult() batch_result.rows_processed = len(rows) # Collect entities and relations entities_to_add: List[Entity] = [] relations_to_add: List[Relation] = [] for i, row in enumerate(rows): try: # Convert row to entities row_entities = await self._row_to_entities(row) entities_to_add.extend(row_entities) # Convert row to relations row_relations = await self._row_to_relations(row) relations_to_add.extend(row_relations) except Exception as e: error_msg = f"Failed to process row {i+1}: {e}" logger.warning(error_msg, exc_info=True) batch_result.rows_failed += 1 if self.skip_errors: batch_result.warnings.append(error_msg) else: batch_result.errors.append(error_msg) raise # Update aggregation accumulators if self.mapping.aggregations: self._update_aggregations(rows) # Add entities to graph store (use bulk writes if enabled) if self.use_bulk_writes and hasattr(self.graph_store, "add_entities_bulk"): try: added = await self.graph_store.add_entities_bulk(entities_to_add) batch_result.entities_added = added except Exception as e: error_msg = f"Bulk entity add failed: {e}" logger.warning(error_msg) batch_result.warnings.append(error_msg) if not self.skip_errors: raise else: for entity in entities_to_add: try: await self.graph_store.add_entity(entity) batch_result.entities_added += 1 except Exception as e: error_msg = f"Failed to add entity {entity.id}: {e}" logger.warning(error_msg) batch_result.warnings.append(error_msg) if not self.skip_errors: raise # Add relations to graph store (use bulk writes if enabled) if self.use_bulk_writes and hasattr(self.graph_store, "add_relations_bulk"): try: added = await self.graph_store.add_relations_bulk(relations_to_add) batch_result.relations_added = added except Exception as e: error_msg = f"Bulk relation add failed: {e}" logger.warning(error_msg) batch_result.warnings.append(error_msg) if not self.skip_errors: raise else: for relation in relations_to_add: try: await self.graph_store.add_relation(relation) batch_result.relations_added += 1 except Exception as e: error_msg = f"Failed to add relation {relation.id}: {e}" logger.warning(error_msg) batch_result.warnings.append(error_msg) if not self.skip_errors: raise return batch_result async def _row_to_entities(self, row: Dict[str, Any]) -> List[Entity]: """ Convert a row to entities based on entity mappings Args: row: Dictionary of column name -> value Returns: List of Entity objects """ entities = [] for entity_mapping in self.mapping.entity_mappings: try: # Map row to entity using mapping entity_data = entity_mapping.map_row_to_entity(row) # Create Entity object # Merge metadata into properties since Entity doesn't have a metadata field properties = entity_data["properties"].copy() properties["_metadata"] = { "source": "structured_data_import", "imported_at": datetime.now().isoformat(), } entity = Entity( id=entity_data["id"], entity_type=entity_data["type"], properties=properties, ) entities.append(entity) except Exception as e: error_msg = f"Failed to map row to entity type '{entity_mapping.entity_type}': {e}" logger.warning(error_msg) if not self.skip_errors: raise ValueError(error_msg) return entities async def _row_to_relations(self, row: Dict[str, Any]) -> List[Relation]: """ Convert a row to relations based on relation mappings Args: row: Dictionary of column name -> value Returns: List of Relation objects """ relations = [] for relation_mapping in self.mapping.relation_mappings: try: # Map row to relation using mapping relation_data = relation_mapping.map_row_to_relation(row) # Create Relation object # Merge metadata into properties since Relation doesn't have a metadata field rel_properties = relation_data["properties"].copy() rel_properties["_metadata"] = { "source": "structured_data_import", "imported_at": datetime.now().isoformat(), } relation = Relation( id=f"{relation_data['source_id']}_{relation_data['type']}_{relation_data['target_id']}", relation_type=relation_data["type"], source_id=relation_data["source_id"], target_id=relation_data["target_id"], properties=rel_properties, ) relations.append(relation) except Exception as e: error_msg = f"Failed to map row to relation type '{relation_mapping.relation_type}': {e}" logger.warning(error_msg) if not self.skip_errors: raise ValueError(error_msg) return relations def _update_aggregations(self, rows: List[Dict[str, Any]]): """ Update aggregation accumulators with batch data Args: rows: List of row dictionaries """ from aiecs.application.knowledge_graph.builder.schema_mapping import AggregationFunction # noqa: F401 for entity_agg in self.mapping.aggregations: entity_type = entity_agg.entity_type # Initialize accumulator for this entity type if needed if entity_type not in self._aggregation_accumulators: self._aggregation_accumulators[entity_type] = {} for agg_config in entity_agg.aggregations: target_prop = agg_config.target_property # Initialize accumulator for this property if needed if target_prop not in self._aggregation_accumulators[entity_type]: self._aggregation_accumulators[entity_type][target_prop] = AggregationAccumulator() accumulator = self._aggregation_accumulators[entity_type][target_prop] # Add values from rows for row in rows: value = row.get(agg_config.source_property) if value is not None: accumulator.add(value) async def _apply_aggregations(self) -> Dict[str, Dict[str, Any]]: """ Apply aggregations and return computed statistics Returns: Dictionary of entity_type -> {property -> value} """ from aiecs.application.knowledge_graph.builder.schema_mapping import AggregationFunction results: Dict[str, Any] = {} for entity_agg in self.mapping.aggregations: entity_type = entity_agg.entity_type if entity_type not in self._aggregation_accumulators: continue if entity_type not in results: results[entity_type] = {} for agg_config in entity_agg.aggregations: target_prop = agg_config.target_property if target_prop not in self._aggregation_accumulators[entity_type]: continue accumulator = self._aggregation_accumulators[entity_type][target_prop] # Compute aggregated value based on function if agg_config.function == AggregationFunction.MEAN: value = accumulator.get_mean() elif agg_config.function == AggregationFunction.STD: value = accumulator.get_std() elif agg_config.function == AggregationFunction.MIN: value = accumulator.get_min() elif agg_config.function == AggregationFunction.MAX: value = accumulator.get_max() elif agg_config.function == AggregationFunction.SUM: value = accumulator.get_sum() elif agg_config.function == AggregationFunction.COUNT: value = accumulator.get_count() elif agg_config.function == AggregationFunction.MEDIAN: value = accumulator.get_median() elif agg_config.function == AggregationFunction.VARIANCE: value = accumulator.get_variance() else: value = None if value is not None: results[entity_type][target_prop] = value return results def _create_validator_from_config(self, config: Dict[str, Any]) -> DataQualityValidator: """ Create DataQualityValidator from configuration dictionary Args: config: Validation configuration dictionary Returns: Configured DataQualityValidator """ # Parse range rules range_rules = {} if "range_rules" in config: for prop, rule_dict in config["range_rules"].items(): range_rules[prop] = RangeRule(min_value=rule_dict.get("min"), max_value=rule_dict.get("max")) # Parse required properties required_properties = set(config.get("required_properties", [])) # Create validation config validation_config = ValidationConfig( range_rules=range_rules, required_properties=required_properties, detect_outliers=config.get("detect_outliers", False), fail_on_violations=config.get("fail_on_violations", False), max_violation_rate=config.get("max_violation_rate", 0.1), ) return DataQualityValidator(validation_config)