Source code for aiecs.application.knowledge_graph.search.text_similarity

# /*---------------------------------------------------------------------------------------------
#  *  Copyright (c) IRETBL Corporation. All rights reserved.
#  *  Licensed under the Apache-2.0. See License.txt in the project root for license information.
#  *--------------------------------------------------------------------------------------------*/
"""
Text Similarity Utilities

Provides various text similarity and matching functions for knowledge graph operations.
Includes BM25, Jaccard, cosine similarity, Levenshtein distance, and fuzzy matching.
"""

import re
import math
from typing import List, Optional, Tuple, Callable, Any
from collections import Counter
from difflib import SequenceMatcher


[docs] class BM25Scorer: """ BM25 (Best Matching 25) scorer for text similarity BM25 is a ranking function used to estimate the relevance of documents to a given search query. It's an improvement over TF-IDF. Example:: scorer = BM25Scorer(corpus=[ "The quick brown fox jumps over the lazy dog", "A quick brown dog jumps over a lazy fox", "The lazy dog sleeps all day" ]) scores = scorer.score("quick brown fox") # Returns scores for each document in corpus """
[docs] def __init__( self, corpus: List[str], k1: float = 1.5, b: float = 0.75, tokenizer: Optional[Callable[[str], List[str]]] = None, ): """ Initialize BM25 scorer Args: corpus: List of documents to score against k1: Term frequency saturation parameter (default: 1.5) b: Length normalization parameter (default: 0.75) tokenizer: Optional tokenizer function (default: simple word split) """ self.k1 = k1 self.b = b self.tokenizer = tokenizer or self._default_tokenizer # Tokenize corpus self.documents = [self.tokenizer(doc) for doc in corpus] self.doc_count = len(self.documents) # Calculate document lengths self.doc_lengths = [len(doc) for doc in self.documents] self.avg_doc_length = sum(self.doc_lengths) / self.doc_count if self.doc_count > 0 else 0 # Build term frequency dictionary self.term_freqs = [] self.doc_freqs: Counter[str] = Counter() for doc in self.documents: tf = Counter(doc) self.term_freqs.append(tf) for term in set(doc): self.doc_freqs[term] += 1 # Calculate IDF (Inverse Document Frequency) self.idf = {} for term, df in self.doc_freqs.items(): self.idf[term] = math.log((self.doc_count - df + 0.5) / (df + 0.5) + 1.0)
def _default_tokenizer(self, text: str) -> List[str]: """Default tokenizer: lowercase and split on whitespace""" return re.findall(r"\w+", text.lower())
[docs] def score(self, query: str) -> List[float]: """ Score documents against query Args: query: Query string Returns: List of BM25 scores for each document """ query_terms = self.tokenizer(query) scores = [] for i, doc in enumerate(self.documents): score = 0.0 doc_length = self.doc_lengths[i] term_freq = self.term_freqs[i] for term in query_terms: if term in term_freq: tf = term_freq[term] idf = self.idf.get(term, 0.0) # BM25 formula numerator = idf * tf * (self.k1 + 1) denominator = tf + self.k1 * (1 - self.b + self.b * (doc_length / self.avg_doc_length)) score += numerator / denominator scores.append(score) return scores
[docs] def get_top_n(self, query: str, n: int = 10) -> List[Tuple[int, float]]: """ Get top N documents by BM25 score Args: query: Query string n: Number of top results to return Returns: List of (document_index, score) tuples, sorted by score descending """ scores = self.score(query) indexed_scores = [(i, score) for i, score in enumerate(scores)] indexed_scores.sort(key=lambda x: x[1], reverse=True) return indexed_scores[:n]
[docs] def jaccard_similarity(set1: set, set2: set) -> float: """ Calculate Jaccard similarity between two sets Jaccard similarity = (size of intersection) / (size of union) Args: set1: First set set2: Second set Returns: Jaccard similarity score (0.0 to 1.0) """ if not set1 and not set2: return 1.0 intersection = len(set1 & set2) union = len(set1 | set2) if union == 0: return 0.0 return intersection / union
[docs] def jaccard_similarity_text(text1: str, text2: str, tokenizer: Optional[Callable[[str], Any]] = None) -> float: """ Calculate Jaccard similarity between two text strings Args: text1: First text string text2: Second text string tokenizer: Optional tokenizer function (default: word split) Returns: Jaccard similarity score (0.0 to 1.0) """ if tokenizer is None: def tokenizer(t): return set(re.findall(r"\w+", t.lower())) else: # Wrap tokenizer to ensure it returns a set original_tokenizer = tokenizer def tokenizer(t): return set(original_tokenizer(t)) set1 = tokenizer(text1) set2 = tokenizer(text2) return jaccard_similarity(set1, set2)
[docs] def cosine_similarity_text(text1: str, text2: str, tokenizer: Optional[Callable[[str], List[str]]] = None) -> float: """ Calculate cosine similarity between two text strings Cosine similarity measures the cosine of the angle between two vectors in a multi-dimensional space. For text, vectors are TF-IDF representations. Args: text1: First text string text2: Second text string tokenizer: Optional tokenizer function (default: word split) Returns: Cosine similarity score (0.0 to 1.0) """ if tokenizer is None: def tokenizer(t): return re.findall(r"\w+", t.lower()) tokens1 = tokenizer(text1) tokens2 = tokenizer(text2) # Build vocabulary vocab = set(tokens1) | set(tokens2) if not vocab: return 1.0 if not text1 and not text2 else 0.0 # Create term frequency vectors tf1 = Counter(tokens1) tf2 = Counter(tokens2) # Calculate dot product and magnitudes dot_product = sum(tf1.get(term, 0) * tf2.get(term, 0) for term in vocab) magnitude1 = math.sqrt(sum(tf1.get(term, 0) ** 2 for term in vocab)) magnitude2 = math.sqrt(sum(tf2.get(term, 0) ** 2 for term in vocab)) if magnitude1 == 0 or magnitude2 == 0: return 0.0 similarity = dot_product / (magnitude1 * magnitude2) # Handle floating point precision issues return min(1.0, max(0.0, similarity))
[docs] def levenshtein_distance(s1: str, s2: str) -> int: """ Calculate Levenshtein distance (edit distance) between two strings Levenshtein distance is the minimum number of single-character edits (insertions, deletions, or substitutions) required to change one string into another. Args: s1: First string s2: Second string Returns: Levenshtein distance (0 = identical, higher = more different) """ if len(s1) < len(s2): return levenshtein_distance(s2, s1) if len(s2) == 0: return len(s1) # Use dynamic programming previous_row = list(range(len(s2) + 1)) for i, c1 in enumerate(s1): current_row = [i + 1] for j, c2 in enumerate(s2): insertions = previous_row[j + 1] + 1 deletions = current_row[j] + 1 substitutions = previous_row[j] + (c1 != c2) current_row.append(min(insertions, deletions, substitutions)) previous_row = current_row return previous_row[-1]
[docs] def normalized_levenshtein_similarity(s1: str, s2: str) -> float: """ Calculate normalized Levenshtein similarity (0.0 to 1.0) Args: s1: First string s2: Second string Returns: Normalized similarity score (1.0 = identical, 0.0 = completely different) """ max_len = max(len(s1), len(s2)) if max_len == 0: return 1.0 distance = levenshtein_distance(s1, s2) return 1.0 - (distance / max_len)
[docs] def fuzzy_match( query: str, candidates: List[str], threshold: float = 0.6, method: str = "jaccard", ) -> List[Tuple[str, float]]: """ Find fuzzy matches for a query string in a list of candidates Args: query: Query string to match candidates: List of candidate strings threshold: Minimum similarity threshold (0.0 to 1.0) method: Similarity method ("jaccard", "cosine", "levenshtein", "ratio") Returns: List of (candidate, similarity_score) tuples above threshold, sorted by score descending """ results = [] for candidate in candidates: if method == "jaccard": score = jaccard_similarity_text(query, candidate) elif method == "cosine": score = cosine_similarity_text(query, candidate) elif method == "levenshtein": score = normalized_levenshtein_similarity(query, candidate) elif method == "ratio": # Use SequenceMatcher ratio (built-in fuzzy matching) score = SequenceMatcher(None, query.lower(), candidate.lower()).ratio() else: raise ValueError(f"Unknown method: {method}. Use 'jaccard', 'cosine', 'levenshtein', or 'ratio'") if score >= threshold: results.append((candidate, score)) # Sort by score descending results.sort(key=lambda x: x[1], reverse=True) return results
[docs] class TextSimilarity: """ Convenience class for text similarity operations Provides a unified interface for various text similarity methods. Example:: similarity = TextSimilarity() # Jaccard similarity score = similarity.jaccard("hello world", "world hello") # Cosine similarity score = similarity.cosine("machine learning", "deep learning") # Levenshtein distance distance = similarity.levenshtein("kitten", "sitting") # Fuzzy matching matches = similarity.fuzzy_match( "python", ["python3", "pyton", "java", "pythn"], threshold=0.7 ) """
[docs] def __init__(self, tokenizer: Optional[Callable[[str], List[str]]] = None): """ Initialize TextSimilarity Args: tokenizer: Optional tokenizer function for text processing """ self.tokenizer = tokenizer
[docs] def jaccard(self, text1: str, text2: str) -> float: """Calculate Jaccard similarity between two texts""" return jaccard_similarity_text(text1, text2, self.tokenizer)
[docs] def cosine(self, text1: str, text2: str) -> float: """Calculate cosine similarity between two texts""" return cosine_similarity_text(text1, text2, self.tokenizer)
[docs] def levenshtein(self, text1: str, text2: str) -> int: """Calculate Levenshtein distance between two texts""" return levenshtein_distance(text1, text2)
[docs] def levenshtein_similarity(self, text1: str, text2: str) -> float: """Calculate normalized Levenshtein similarity""" return normalized_levenshtein_similarity(text1, text2)
[docs] def fuzzy_match( self, query: str, candidates: List[str], threshold: float = 0.6, method: str = "jaccard", ) -> List[Tuple[str, float]]: """Find fuzzy matches for a query""" return fuzzy_match(query, candidates, threshold, method)
[docs] def bm25(self, corpus: List[str], k1: float = 1.5, b: float = 0.75) -> BM25Scorer: """Create a BM25 scorer for a corpus""" return BM25Scorer(corpus, k1=k1, b=b, tokenizer=self.tokenizer)