Source code for neer_match_utilities.feature_selection

"""
Feature selection utilities for similarity-based entity matching.

This module provides tools for selecting the most informative similarity features
from a potentially large set of candidates. It implements a two-stage feature
selection process:

1. **Correlation-based filtering**: Removes highly correlated features, keeping
   the one most correlated with the target variable.
2. **Elastic net regularization**: Uses penalized logistic regression to identify
   features that contribute unique predictive information.

The feature selector is designed to handle extreme class imbalance, which is
common in entity matching tasks where true matches are rare.
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Dict, List, Tuple, Optional, Any, Iterable

import numpy as np
import pandas as pd

from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import get_scorer
from joblib import Parallel, delayed

import warnings
from sklearn.exceptions import ConvergenceWarning

# Suppress sklearn warnings about deprecated parameters and convergence issues
warnings.filterwarnings(
    "ignore",
    category=FutureWarning,
    message=".*'penalty' was deprecated.*"
)
warnings.filterwarnings("ignore", category=ConvergenceWarning)

try:
    from tqdm.auto import tqdm
except ImportError:
    tqdm = None

from neer_match_utilities.similarity_features import SimilarityFeatures
from neer_match.similarity_map import SimilarityMap


SimilarityMapDict = Dict[str, List[str]]

from contextlib import contextmanager
import joblib

[docs] @contextmanager def tqdm_joblib(tqdm_object): """ Context manager to integrate tqdm progress bars with joblib parallel execution. Parameters ---------- tqdm_object : tqdm.tqdm or None Progress bar object to update during parallel execution. """ if tqdm_object is None: yield return class TqdmBatchCompletionCallback(joblib.parallel.BatchCompletionCallBack): def __call__(self, *args, **kwargs): tqdm_object.update(n=self.batch_size) return super().__call__(*args, **kwargs) old_callback = joblib.parallel.BatchCompletionCallBack joblib.parallel.BatchCompletionCallBack = TqdmBatchCompletionCallback try: yield finally: joblib.parallel.BatchCompletionCallBack = old_callback tqdm_object.close()
[docs] @dataclass class FeatureSelectionResult: """ Result object returned by FeatureSelector.execute(). Attributes ---------- updated_similarity_map : Dict[str, List[str]] Reduced similarity map containing only selected features. Keys are variable names, values are lists of similarity concept names. selected_feature_columns : List[str] Names of selected feature columns in the format 'col_{var}_{var}_{similarity}'. selected_pairs : List[Tuple[str, str]] List of (variable, similarity_concept) tuples that were selected. coef_by_feature : pd.Series Coefficients from the final elastic net model, sorted by absolute value. Useful for understanding feature importance. meta : Dict[str, Any] Metadata about the selection process, including: - method: Feature selection method used - scoring: Scoring metric used for cross-validation - cv: Number of cross-validation folds - n_features_in: Number of input features - n_features_selected: Number of features selected - did_fallback: Whether fallback to original map occurred """ updated_similarity_map: SimilarityMapDict selected_feature_columns: List[str] selected_pairs: List[Tuple[str, str]] coef_by_feature: pd.Series meta: Dict[str, Any]
[docs] class FeatureSelector: """ Supervised feature selector for similarity-based entity matching. This class implements a two-stage feature selection process optimized for entity matching tasks with extreme class imbalance: **Stage 1: Correlation-based filtering** (optional) Removes redundant features by identifying groups of highly correlated features and keeping only the one most correlated with the target. **Stage 2: Elastic net regularization** Uses penalized logistic regression with L1/L2 penalties to identify features that contribute unique predictive information. Cross-validation is used to select optimal regularization parameters. The selector returns an updated similarity map containing only the features that passed both selection stages. Parameters ---------- similarity_map : Dict[str, List[str]] or SimilarityMap Mapping from variable names to lists of similarity concepts. Example: {'name': ['jaro_winkler', 'levenshtein'], 'address': ['cosine']} training_data : Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame] Three-tuple of (left_df, right_df, matches_df) used for feature selection. id_left_col, id_right_col : str, default='id' Column names containing entity IDs in left and right dataframes. matches_id_left, matches_id_right : str, default='left', 'right' Column names in matches_df identifying left/right entity IDs. match_col : str, default='match' Name of the binary match indicator column (1=match, 0=non-match). matches_are_indices : bool, default=True If True, treat match IDs as integer row indices. If False, treat as entity IDs. method : str, default='elastic_net' Feature selection method. Currently only 'elastic_net' is supported. scoring : str, default='average_precision' Scoring metric for cross-validation. Options: 'f1', 'roc_auc', 'average_precision', 'neg_log_loss'. For imbalanced data, 'average_precision' is recommended. cv : int, default=5 Number of cross-validation folds. l1_ratios : tuple, default=(0.8, 0.9, 1.0) L1 penalty ratios to try. 1.0 = pure Lasso, 0.0 = pure Ridge. Cs : int or Iterable[float], default=20 Regularization strengths to try. If int, generates `Cs` values logarithmically spaced from 0.01 to 1000. Higher C = less regularization. class_weight : str or None, default='balanced' Class weighting strategy. 'balanced' adjusts weights inversely proportional to class frequencies, recommended for imbalanced data. max_iter : int, default=5000 Maximum iterations for elastic net solver. random_state : int, default=42 Random seed for reproducibility. n_jobs : int, default=-1 Number of parallel jobs. -1 uses all available processors. min_coef_threshold : float, default=0.0 Minimum absolute coefficient value for feature retention. Features with abs(coef) < threshold are dropped. Set to 0.0 to keep all non-zero features. max_correlation : float or None, default=None Correlation threshold for Stage 1 filtering. Features with pairwise correlation > threshold are candidates for removal. Example: 0.95. If None, Stage 1 is skipped. always_keep : Dict[str, List[str]] or None, default=None Features to always retain regardless of selection results. Example: {'surname': ['jaro_winkler']} always keeps surname jaro_winkler. preferred_separators : tuple, default=('__', '|', ':', '-', '_') Separators to try when parsing feature names (internal use). Attributes ---------- similarity_map : Dict[str, List[str]] The input similarity map. left_train, right_train, matches_train : pd.DataFrame Training datasets for feature selection. Examples -------- >>> from neer_match_utilities import FeatureSelector >>> selector = FeatureSelector( ... similarity_map={'name': ['jaro_winkler', 'levenshtein', 'cosine'], ... 'address': ['jaro_winkler', 'levenshtein']}, ... training_data=(left_df, right_df, matches_df), ... max_correlation=0.95, ... min_coef_threshold=0.01 ... ) >>> result = selector.execute() >>> print(result.updated_similarity_map) {'name': ['jaro_winkler'], 'address': ['levenshtein']} """
[docs] def __init__( self, similarity_map: SimilarityMapDict, training_data: Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame], *, # these must match how you build df_train in your pipeline id_left_col: str = "id", id_right_col: str = "id", matches_id_left: str = "left", matches_id_right: str = "right", match_col: str = "match", matches_are_indices: bool = True, # selection configuration method: str = "elastic_net", # currently only elastic_net scoring: str = "average_precision", # "f1" | "roc_auc" | "average_precision" | "neg_log_loss" cv: int = 5, l1_ratios = (0.8, 0.9, 1.0), Cs: int | Iterable[float] = 20, class_weight: Optional[str] = "balanced", # consider "balanced" if match class is rare max_iter: int = 5000, random_state: int = 42, n_jobs: int = -1, # thresholding for feature selection min_coef_threshold: float = 0.0, # drop features with abs(coef) < threshold # correlation-based pre-filtering max_correlation: Optional[float] = None, # if set (e.g., 0.95), drop correlated features first # if you want to always keep certain similarities always_keep: Optional[Dict[str, List[str]]] = None, # if feature name parsing fails, you can tell the parser a separator preference preferred_separators: Tuple[str, ...] = ("__", "|", ":", "-", "_"), ): self.similarity_map: SimilarityMapDict = dict(similarity_map) self.left_train, self.right_train, self.matches_train = training_data self.id_left_col = id_left_col self.id_right_col = id_right_col self.matches_id_left = matches_id_left self.matches_id_right = matches_id_right self.match_col = match_col self.matches_are_indices = matches_are_indices self.method = method self.scoring = scoring self.cv = cv self.l1_ratios = list(l1_ratios) self.Cs = Cs self.class_weight = class_weight self.max_iter = max_iter self.random_state = random_state self.n_jobs = n_jobs self.min_coef_threshold = min_coef_threshold self.max_correlation = max_correlation self.always_keep = always_keep or {} self.preferred_separators = preferred_separators
def _make_Cs_grid(self) -> np.ndarray: """ Generate grid of regularization parameter values for cross-validation. Returns ------- np.ndarray Array of C values to try during cross-validation. If Cs was specified as an int, returns logarithmically spaced values from 0.01 to 1000. Otherwise returns the provided values. Notes ----- Higher C values mean less regularization (more features kept). Lower C values mean stronger regularization (more aggressive dropping). """ if isinstance(self.Cs, int): return np.logspace(-2, 3, self.Cs) return np.asarray(list(self.Cs), dtype=float) def _drop_correlated_features(self, X: pd.DataFrame, y: np.ndarray) -> pd.DataFrame: """ Remove redundant features based on pairwise correlations (Stage 1). For each group of features with pairwise correlation exceeding `self.max_correlation`, this method keeps only the feature most correlated with the target variable and drops the rest. Parameters ---------- X : pd.DataFrame Feature matrix (rows=samples, columns=features). y : np.ndarray Target variable (binary: 0=non-match, 1=match). Returns ------- pd.DataFrame Feature matrix with correlated features removed. Notes ----- If `self.max_correlation` is None, no filtering is performed and X is returned unchanged. """ if self.max_correlation is None: return X print(f"[FeatureSelector] Checking for correlations > {self.max_correlation}") # Compute pairwise feature correlations corr_matrix = X.corr().abs() # Compute correlation of each feature with target variable target_corr = X.corrwith(pd.Series(y, index=X.index)).abs() # Extract upper triangle to avoid duplicate pairs upper_tri = corr_matrix.where( np.triu(np.ones(corr_matrix.shape), k=1).astype(bool) ) to_drop = set() dropped_details = [] for col1 in upper_tri.columns: if col1 in to_drop: continue # Identify features highly correlated with col1 correlated_features = upper_tri.index[upper_tri[col1] > self.max_correlation].tolist() if correlated_features: # From the correlated group, select the feature most predictive of target candidates = [col1] + [f for f in correlated_features if f not in to_drop] if len(candidates) > 1: # Keep feature with highest target correlation best_feature = max(candidates, key=lambda f: target_corr.get(f, 0.0)) # Mark others for removal for candidate in candidates: if candidate != best_feature: to_drop.add(candidate) dropped_details.append( f"{candidate} (target_corr={target_corr.get(candidate, 0.0):.3f}) " f"-> kept {best_feature} (target_corr={target_corr.get(best_feature, 0.0):.3f})" ) if to_drop: print(f"[FeatureSelector] Stage 1 - Dropping {len(to_drop)} correlated features: {sorted(to_drop)}") print(f"[FeatureSelector] Stage 1 - Details (kept most predictive):") for detail in dropped_details[:5]: print(f" - {detail}") if len(dropped_details) > 5: print(f" ... and {len(dropped_details) - 5} more") X = X.drop(columns=list(to_drop)) else: print(f"[FeatureSelector] Stage 1 - No highly correlated features found") return X def _print_correlation_summary(self, X: pd.DataFrame, top_n: int = 10): """ Print diagnostic information about feature correlations. Parameters ---------- X : pd.DataFrame Feature matrix. top_n : int, default=10 Number of top correlations to display. """ corr_matrix = X.corr().abs() # Extract upper triangle to avoid duplicate pairs upper_tri = corr_matrix.where( np.triu(np.ones(corr_matrix.shape), k=1).astype(bool) ) # Collect all pairwise correlations correlations = [] for col in upper_tri.columns: for idx in upper_tri.index: val = upper_tri.loc[idx, col] if pd.notna(val): correlations.append((val, idx, col)) if correlations: correlations.sort(reverse=True) print(f"\n[FeatureSelector] Top {top_n} feature correlations:") for i, (corr_val, feat1, feat2) in enumerate(correlations[:top_n], 1): print(f" {i}. {feat1} <-> {feat2}: {corr_val:.3f}") print() def _selected_cols_to_similarity_map(self, selected_cols: list[str]) -> dict[str, list[str]]: """ Convert selected feature column names back to similarity map format. Parameters ---------- selected_cols : list[str] List of selected feature column names (format: 'col_{var}_{var}_{similarity}'). Returns ------- dict[str, list[str]] Updated similarity map containing only selected features. Variables with no selected features are excluded. """ selected_set = set(selected_cols) selected: dict[str, set[str]] = {} # Parse feature names to extract (variable, similarity) pairs for lcol, rcol, sim in SimilarityMap(self.similarity_map): col_name = f"col_{lcol}_{rcol}_{sim}" if col_name in selected_set: selected.setdefault(lcol, set()).add(sim) # Merge in features marked as always_keep for var, sims in (self.always_keep or {}).items(): selected.setdefault(var, set()).update(sims) # Preserve original ordering of similarities within each variable updated: dict[str, list[str]] = {} for var, sims in self.similarity_map.items(): keep = [s for s in sims if s in selected.get(var, set())] if keep: updated[var] = keep return updated
[docs] def execute(self) -> FeatureSelectionResult: """ Execute the two-stage feature selection process. This method performs: 1. Builds pairwise similarity features from training data 2. Cleans data (removes constant columns, handles missing values) 3. **Stage 1** (optional): Correlation-based filtering 4. Scales features for regularization 5. **Stage 2**: Elastic net cross-validation and feature selection 6. Applies coefficient thresholding (if configured) 7. Converts selected features back to similarity map format Returns ------- FeatureSelectionResult Object containing the updated similarity map, selected features, coefficients, and metadata about the selection process. Raises ------ ValueError If the method is not 'elastic_net', or if no usable features remain after cleaning or correlation filtering, or if there are too few positive examples for cross-validation. Notes ----- The method prints detailed diagnostic information during execution: - Top feature correlations - Features dropped in Stage 1 (correlation filtering) - Cross-validation progress and best parameters - Features dropped in Stage 2 (elastic net) - Top coefficients by absolute value - Final summary statistics Examples -------- >>> result = selector.execute() >>> print(f"Selected {len(result.selected_feature_columns)} features") >>> print(f"Updated similarity map: {result.updated_similarity_map}") """ if self.method != "elastic_net": raise ValueError(f"Unsupported method={self.method}. Only 'elastic_net' is implemented.") # Step 1: Build similarity feature dataframe from training data smap_obj = SimilarityMap(self.similarity_map) feats = SimilarityFeatures(similarity_map=smap_obj) df_train = feats.pairwise_similarity_dataframe( left=self.left_train, right=self.right_train, matches=self.matches_train, left_id_col=self.id_left_col, right_id_col=self.id_right_col, match_col=self.match_col, matches_id_left=self.matches_id_left, matches_id_right=self.matches_id_right, matches_are_indices=self.matches_are_indices, ) if self.match_col not in df_train.columns: raise ValueError( f"Expected match label column '{self.match_col}' in df_train, " f"but columns are: {list(df_train.columns)[:30]}..." ) # Step 2: Extract target variable and feature matrix y = df_train[self.match_col].astype(int).to_numpy() # Verify sufficient positive examples for cross-validation pos = int(y.sum()) if pos < self.cv: raise ValueError(f"Not enough positives for cv={self.cv}: pos={pos}. Reduce cv or use more labeled matches.") X = df_train.drop(columns=[self.match_col]) # Filter to similarity feature columns only X = X[[c for c in X.columns if c.startswith("col_")]] # Ensure all features are numeric X = X.select_dtypes(include=[np.number]).copy() # Handle infinity and missing values X = X.replace([np.inf, -np.inf], np.nan).fillna(0.0) # Remove constant columns (no information) nunique = X.nunique(dropna=False) X = X.loc[:, nunique > 1] if X.shape[1] == 0: raise ValueError("After cleaning, no usable similarity feature columns remain.") # Diagnostic: print top feature correlations self._print_correlation_summary(X) # Step 3 (Stage 1): Correlation-based pre-filtering (optional) X = self._drop_correlated_features(X, y) if X.shape[1] == 0: raise ValueError("After correlation filtering, no usable similarity feature columns remain.") # Step 4: Standardize features (required for elastic net) scaler = StandardScaler() X_scaled = scaler.fit_transform(X) # Step 5 (Stage 2): Elastic net cross-validation # Build parameter grid for exhaustive search Cs_grid = self._make_Cs_grid() l1_grid = list(self.l1_ratios) skf = StratifiedKFold(n_splits=self.cv, shuffle=True, random_state=self.random_state) scorer = get_scorer(self.scoring) # Pre-compute CV folds for reproducibility folds = list(skf.split(X_scaled, y)) def fit_and_score(l1_ratio: float, C: float, fold_idx: int): """Train and evaluate one configuration on one CV fold.""" train_idx, val_idx = folds[fold_idx] X_tr, X_va = X_scaled[train_idx], X_scaled[val_idx] y_tr, y_va = y[train_idx], y[val_idx] clf = LogisticRegression( solver="saga", penalty="elasticnet", l1_ratio=l1_ratio, C=C, max_iter=self.max_iter, tol=1e-2, class_weight=self.class_weight, random_state=self.random_state, ) clf.fit(X_tr, y_tr) score = scorer(clf, X_va, y_va) return (l1_ratio, C, fold_idx, float(score)) # Generate all parameter × fold combinations tasks = [(l1, C, k) for l1 in l1_grid for C in Cs_grid for k in range(len(folds))] total_fits = len(tasks) # Execute parallel CV with progress bar pbar = tqdm(total=total_fits, desc="[FeatureSelector] CV fits", unit="fit") if tqdm is not None else None with tqdm_joblib(pbar): results = Parallel(n_jobs=self.n_jobs, prefer="processes")( delayed(fit_and_score)(l1, C, k) for (l1, C, k) in tasks ) # Aggregate cross-validation scores scores = {} counts = {} for l1, C, _, sc in results: key = (l1, C) scores[key] = scores.get(key, 0.0) + sc counts[key] = counts.get(key, 0) + 1 mean_scores = {k: scores[k] / counts[k] for k in scores} best_params = max(mean_scores.items(), key=lambda kv: kv[1])[0] best_l1, best_C = best_params best_score = mean_scores[best_params] print(f"[FeatureSelector] Stage 2 - Best params: l1_ratio={best_l1}, C={best_C}, score={best_score:.6f}") # Step 6: Refit final model on full training set with best parameters final_model = LogisticRegression( solver="saga", penalty="elasticnet", l1_ratio=best_l1, C=best_C, max_iter=20000, tol=1e-3, class_weight=self.class_weight, random_state=self.random_state, ) final_model.fit(X_scaled, y) coef = pd.Series(final_model.coef_.ravel(), index=X.columns) # Step 7: Apply coefficient threshold to select final features if self.min_coef_threshold > 0: mask = coef.abs() >= self.min_coef_threshold selected_feature_columns = coef.index[mask].tolist() dropped_by_threshold = coef.index[~mask].tolist() print(f"[FeatureSelector] Stage 2 - Applied min_coef_threshold={self.min_coef_threshold}") if dropped_by_threshold: print(f"[FeatureSelector] Stage 2 - Dropping {len(dropped_by_threshold)} features with weak coefficients: {dropped_by_threshold}") else: selected_feature_columns = coef.index[coef != 0].tolist() dropped_by_elasticnet = coef.index[coef == 0].tolist() if dropped_by_elasticnet: print(f"[FeatureSelector] Stage 2 - Elastic net zeroed {len(dropped_by_elasticnet)} features: {dropped_by_elasticnet}") print(f"\n[FeatureSelector] Summary: features_in={X.shape[1]} selected_cols={len(selected_feature_columns)}") # Diagnostic: print top coefficients top_coefs = coef.abs().sort_values(ascending=False).head(15) print(f"\n[FeatureSelector] Top 15 coefficients by absolute value:") for feat in top_coefs.index: marker = "✓" if feat in selected_feature_columns else "✗" print(f" {marker} {feat}: {coef[feat]:+.4f}") print() print(f"[FeatureSelector] pos={int(y.sum())} neg={int((1-y).sum())} " f"features_in={X.shape[1]} selected_cols={len(selected_feature_columns)}") # Step 8: Build metadata dictionary meta = { "method": self.method, "scoring": self.scoring, "cv": self.cv, "l1_ratios": self.l1_ratios, "Cs": self.Cs, "class_weight": self.class_weight, "n_features_in": int(X.shape[1]), "n_features_selected": int(len(selected_feature_columns)), } meta["did_fallback"] = False # Step 9: Convert selected features back to similarity map format if len(selected_feature_columns) == 0: # Fallback: no features selected, keep original map updated = dict(self.similarity_map) meta["did_fallback"] = True else: updated = self._selected_cols_to_similarity_map(selected_feature_columns) if not updated: # Fallback: conversion produced empty map updated = dict(self.similarity_map) meta["did_fallback"] = True selected_pairs = [(var, sim) for var, sims in updated.items() for sim in sims] print(f"[FeatureSelector] did_fallback={meta['did_fallback']}") return FeatureSelectionResult( updated_similarity_map=updated, selected_feature_columns=selected_feature_columns, selected_pairs=selected_pairs, coef_by_feature=coef.sort_values(key=lambda s: s.abs(), ascending=False), meta=meta, )