from __future__ import annotations
from dataclasses import dataclass, field
import numpy as np
import pandas as pd
from neer_match import similarity_map as _sim
from neer_match.similarity_map import SimilarityMap
from neer_match_utilities.custom_similarities import CustomSimilarities
CustomSimilarities() # monkey-patch once, globally
from neer_match_utilities.similarity_features import SimilarityFeatures
import statsmodels.api as sm
from statsmodels.tools.sm_exceptions import ConvergenceWarning
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import (
confusion_matrix,
accuracy_score,
precision_score,
recall_score,
f1_score,
matthews_corrcoef,
)
# Surpress certain warnings globally in this module
import warnings
warnings.filterwarnings("ignore", category=ConvergenceWarning)
np.seterr(over='ignore', divide='ignore', invalid='ignore')
warnings.filterwarnings("ignore", category=RuntimeWarning)
[docs]
class SuggestMixin:
"""
Adds a NeerMatch-like .suggest(left, right, count, verbose) API to baseline models.
Requires:
- self.predict_proba(df_pairs) implemented
- self.similarity_map set to a SimilarityMap (or dict) describing features to compute
"""
similarity_map: SimilarityMap | dict | None = None # dynamically attached by loader
[docs]
def suggest(
self,
left: pd.DataFrame,
right: pd.DataFrame,
*,
count: int = 10,
verbose: int = 0,
left_id_col: str | None = None,
right_id_col: str | None = None,
) -> pd.DataFrame:
"""
Return top-k candidate matches per left record (like neer_match DL models).
Output columns:
- left: integer row index into `left` (0..len(left)-1)
- right: integer row index into `right` (0..len(right)-1)
- prediction: match probability
"""
if self.similarity_map is None:
raise ValueError(
"Baseline model has no similarity_map attached. "
"Load it via ModelBaseline.load(...) or pass similarity_map when saving."
)
# normalize similarity map to SimilarityMap object
smap = self.similarity_map
if isinstance(smap, dict):
smap = SimilarityMap(smap)
elif not isinstance(smap, SimilarityMap):
raise TypeError("similarity_map must be a dict or SimilarityMap")
# We want neer_match-like indices: 0..n-1
left_tmp = left.reset_index(drop=True).copy()
right_tmp = right.reset_index(drop=True).copy()
# pick ID cols (internal)
# If user provides real IDs, keep them; otherwise use row index ids.
if left_id_col is None:
left_tmp["_row_id"] = np.arange(len(left_tmp), dtype=int)
left_id_col = "_row_id"
if right_id_col is None:
right_tmp["_row_id"] = np.arange(len(right_tmp), dtype=int)
right_id_col = "_row_id"
feats = SimilarityFeatures(similarity_map=smap)
empty_matches = pd.DataFrame({"left": [], "right": []})
df_pairs = feats.pairwise_similarity_dataframe(
left=left_tmp,
right=right_tmp,
matches=empty_matches,
left_id_col=left_id_col,
right_id_col=right_id_col,
match_col="match",
matches_id_left="left",
matches_id_right="right",
matches_are_indices=False,
)
proba = self.predict_proba(df_pairs)
df_pairs["prediction"] = proba
# Identify the output ID columns produced by SimilarityFeatures
# It may suffix _left/_right if names collide.
if left_id_col == right_id_col:
out_left = f"{left_id_col}_left"
out_right = f"{right_id_col}_right"
else:
out_left = left_id_col
out_right = right_id_col
out = df_pairs[[out_left, out_right, "prediction"]].rename(
columns={out_left: "left", out_right: "right"}
)
# Top-k per left
out = out.sort_values(["left", "prediction"], ascending=[True, False])
out = out.groupby("left", as_index=False).head(count).reset_index(drop=True)
if verbose:
print(f"[baseline.suggest] left={len(left_tmp)} right={len(right_tmp)} pairs={len(df_pairs)}")
return out
[docs]
@dataclass
class LogitMatchingModel(SuggestMixin):
"""
Logistic regression baseline on similarity features using statsmodels.
This class is designed as an alternative to the DL/NS models in `neer_match`,
using statsmodels' Logit on top of the similarity features produced by
`AlternativeModels`.
It supports:
- evaluation with TP, FP, TN, FN, Accuracy, Recall, Precision, F1, MCC,
- full inference via `summary()`.
"""
result: sm.discrete.discrete_model.BinaryResultsWrapper | None = field(
default=None, init=False
)
feature_cols: list[str] = field(default_factory=list, init=False)
[docs]
def fit(
self,
df: pd.DataFrame,
match_col: str = "match",
feature_prefix: str = "col_",
) -> "LogitMatchingModel":
"""
Fit logistic regression on a pairwise similarity DataFrame.
Parameters
----------
df : pd.DataFrame
(Possibly subsampled) DataFrame produced by AlternativeModels.
match_col : str, default "match"
Name of the binary target column.
feature_prefix : str, default "col\_"
Prefix of feature columns (similarity features).
"""
if match_col not in df.columns:
raise KeyError(f"Match column '{match_col}' not found in df.columns")
# 1. Select feature columns
feature_cols = sorted([c for c in df.columns if c.startswith(feature_prefix)])
X = df[feature_cols]
y = df[match_col].to_numpy().astype(int)
# 2. Drop constant (zero-variance) columns – they cause singularities and add no info
std = X.std(ddof=0)
nonconstant_cols = std[std > 0].index.tolist()
if len(nonconstant_cols) < len(feature_cols):
# Optional: you could log / print which columns were dropped
# dropped = sorted(set(feature_cols) - set(nonconstant_cols))
# print(f"Dropping constant features: {dropped}")
X = X[nonconstant_cols]
feature_cols = nonconstant_cols
self.feature_cols = feature_cols
# 3. Add constant for intercept term
X_sm = sm.add_constant(X, has_constant="add")
# 4. Fit classical MLE logit, with regularized fallback
model = sm.Logit(y, X_sm)
try:
# Try unpenalized MLE first
self.result = model.fit(disp=0)
except np.linalg.LinAlgError:
# If Hessian is singular (separation / collinearity), try progressively stronger regularization
for alpha in [1e-3, 1e-2, 1e-1, 1.0, 10.0]:
try:
self.result = model.fit_regularized(
alpha=alpha,
L1_wt=0.0, # 0 → pure L2 (ridge)
maxiter=1000,
disp=0
)
if self.result is not None:
print(f"[LogitMatchingModel] Used regularization alpha={alpha} due to singular matrix")
break
except (np.linalg.LinAlgError, ValueError):
continue
else:
raise RuntimeError(
"Could not fit logit model even with strong regularization. Possible causes:\n"
" - Perfect separation: One or more features perfectly predict matches\n"
" - Multicollinearity: Features are linear combinations of each other\n"
"Try: (1) More aggressive feature selection, (2) Remove problematic features, "
"(3) Increase training data size"
)
return self
def _check_fitted(self):
if self.result is None:
raise RuntimeError("LogitMatchingModel is not fitted yet. Call `fit()` first.")
def predict_proba(
self,
df: pd.DataFrame,
feature_prefix: str = "col_",
) -> np.ndarray:
self._check_fitted()
if self.feature_cols:
feature_cols = self.feature_cols
else:
feature_cols = sorted([c for c in df.columns if c.startswith(feature_prefix)])
X = df[feature_cols]
X_sm = sm.add_constant(X, has_constant="add")
proba = self.result.predict(X_sm)
return np.asarray(proba)
def evaluate(
self,
df: pd.DataFrame,
match_col: str = "match",
feature_prefix: str = "col_",
threshold: float = 0.5,
) -> dict:
if match_col not in df.columns:
raise KeyError(f"Match column '{match_col}' not found in DataFrame.")
self._check_fitted()
y_true = df[match_col].to_numpy().astype(int)
proba = self.predict_proba(df, feature_prefix=feature_prefix)
y_hat = (proba >= threshold).astype(int)
tn, fp, fn, tp = confusion_matrix(y_true, y_hat, labels=[0, 1]).ravel()
acc = accuracy_score(y_true, y_hat)
prec = precision_score(y_true, y_hat, zero_division=0)
rec = recall_score(y_true, y_hat, zero_division=0)
f1 = f1_score(y_true, y_hat, zero_division=0)
mcc = matthews_corrcoef(y_true, y_hat) if tp + fp + tn + fn > 0 else 0.0
return {
"TP": int(tp),
"FP": int(fp),
"TN": int(tn),
"FN": int(fn),
"Accuracy": float(acc),
"Recall": float(rec),
"Precision": float(prec),
"F1": float(f1),
"MCC": float(mcc),
}
def summary(self):
self._check_fitted()
return self.result.summary()
[docs]
@dataclass
class ProbitMatchingModel(SuggestMixin):
"""
Probit regression baseline on similarity features using statsmodels.
Same interface as LogitMatchingModel, but using a normal CDF link.
"""
result: sm.discrete.discrete_model.BinaryResultsWrapper | None = field(
default=None, init=False
)
feature_cols: list[str] = field(default_factory=list, init=False)
[docs]
def fit(
self,
df: pd.DataFrame,
match_col: str = "match",
feature_prefix: str = "col_",
) -> "ProbitMatchingModel":
"""
Fit probit regression on a pairwise similarity DataFrame.
Parameters
----------
df : pd.DataFrame
(Possibly subsampled) DataFrame produced by AlternativeModels.
match_col : str, default "match"
Name of the binary target column.
feature_prefix : str, default "col\_"
Prefix of feature columns (similarity features).
"""
if match_col not in df.columns:
raise KeyError(f"Match column '{match_col}' not found in df.columns")
# 1. Select feature columns
feature_cols = sorted([c for c in df.columns if c.startswith(feature_prefix)])
X = df[feature_cols]
y = df[match_col].to_numpy().astype(int)
# 2. Drop constant (zero-variance) columns
std = X.std(ddof=0)
nonconstant_cols = std[std > 0].index.tolist()
if len(nonconstant_cols) < len(feature_cols):
X = X[nonconstant_cols]
feature_cols = nonconstant_cols
self.feature_cols = feature_cols
# 3. Add constant for intercept term
X_sm = sm.add_constant(X, has_constant="add")
# 4. Probit with ridge regularization (helps with separation)
model = sm.Probit(y, X_sm)
# Try progressively stronger regularization until fitting succeeds
for alpha in [1e-4, 1e-3, 1e-2, 1e-1, 1.0, 10.0]:
try:
self.result = model.fit_regularized(
alpha=alpha,
L1_wt=0.0, # 0 → pure L2 (ridge)
maxiter=1000,
disp=0
)
if self.result is not None:
if alpha > 1e-4: # Only print if we needed stronger regularization
print(f"[ProbitMatchingModel] Used regularization alpha={alpha} due to singular matrix")
break
except (np.linalg.LinAlgError, ValueError):
continue
else:
raise RuntimeError(
"Could not fit probit model even with strong regularization. Possible causes:\n"
" - Perfect separation: One or more features perfectly predict matches\n"
" - Multicollinearity: Features are linear combinations of each other\n"
"Try: (1) More aggressive feature selection, (2) Remove problematic features, "
"(3) Increase training data size"
)
return self
def _check_fitted(self):
if self.result is None:
raise RuntimeError("ProbitMatchingModel is not fitted yet. Call `fit()` first.")
def predict_proba(
self,
df: pd.DataFrame,
feature_prefix: str = "col_",
) -> np.ndarray:
self._check_fitted()
if self.feature_cols:
feature_cols = self.feature_cols
else:
feature_cols = sorted([c for c in df.columns if c.startswith(feature_prefix)])
X = df[feature_cols]
X_sm = sm.add_constant(X, has_constant="add")
proba = self.result.predict(X_sm)
return np.asarray(proba)
def evaluate(
self,
df: pd.DataFrame,
match_col: str = "match",
feature_prefix: str = "col_",
threshold: float = 0.5,
) -> dict:
if match_col not in df.columns:
raise KeyError(f"Match column '{match_col}' not found in DataFrame.")
self._check_fitted()
y_true = df[match_col].to_numpy().astype(int)
proba = self.predict_proba(df, feature_prefix=feature_prefix)
y_hat = (proba >= threshold).astype(int)
tn, fp, fn, tp = confusion_matrix(y_true, y_hat, labels=[0, 1]).ravel()
acc = accuracy_score(y_true, y_hat)
prec = precision_score(y_true, y_hat, zero_division=0)
rec = recall_score(y_true, y_hat, zero_division=0)
f1 = f1_score(y_true, y_hat, zero_division=0)
mcc = matthews_corrcoef(y_true, y_hat) if tp + fp + tn + fn > 0 else 0.0
return {
"TP": int(tp),
"FP": int(fp),
"TN": int(tn),
"FN": int(fn),
"Accuracy": float(acc),
"Recall": float(rec),
"Precision": float(prec),
"F1": float(f1),
"MCC": float(mcc),
}
def summary(self):
self._check_fitted()
return self.result.summary()
[docs]
@dataclass
class GradientBoostingModel(SuggestMixin):
"""
Gradient boosting baseline on similarity features using scikit-learn.
Designed as an alternative to the DL/NS models in `neer_match`, using a
tree-based GradientBoostingClassifier on top of similarity features
produced by `AlternativeModels`.
It supports:
- evaluation with TP, FP, TN, FN, Accuracy, Recall, Precision, F1, MCC,
- a simple `summary()` reporting feature importances.
Notes
-----
- Unlike Logit/Probit, this model has no statistical inference (SE/p-values).
- Works well with nonlinearities and interactions in similarity features.
"""
model: GradientBoostingClassifier = field(
default_factory=lambda: GradientBoostingClassifier(
n_estimators=300,
learning_rate=0.05,
max_depth=3,
subsample=1.0,
random_state=42,
)
)
feature_cols: list[str] = field(default_factory=list, init=False)
best_threshold_: float | None = field(default=None, init=False)
[docs]
def fit(
self,
df: pd.DataFrame,
match_col: str = "match",
feature_prefix: str = "col_",
use_class_weight: bool = False,
) -> "GradientBoostingModel":
"""
Fit gradient boosting on a pairwise similarity DataFrame.
Parameters
----------
df : pd.DataFrame
(Possibly subsampled) DataFrame produced by AlternativeModels.
match_col : str, default "match"
Name of the binary target column.
feature_prefix : str, default "col\_"
Prefix of feature columns (similarity features).
use_class_weight : bool, default False
If True, uses inverse-frequency sample weights to upweight matches.
Useful if you fit on a very imbalanced dataset.
"""
if match_col not in df.columns:
raise KeyError(f"Match column '{match_col}' not found in df.columns")
# 1. Select feature columns
feature_cols = sorted([c for c in df.columns if c.startswith(feature_prefix)])
if not feature_cols:
raise ValueError(f"No feature columns starting with {feature_prefix!r} found.")
X = df[feature_cols]
y = df[match_col].to_numpy().astype(int)
# 2. Drop constant (zero-variance) columns
std = X.std(ddof=0)
nonconstant_cols = std[std > 0].index.tolist()
if len(nonconstant_cols) < len(feature_cols):
X = X[nonconstant_cols]
feature_cols = nonconstant_cols
self.feature_cols = feature_cols
# 3. Optional class weighting via sample weights
sample_weight = None
if use_class_weight:
# inverse frequency weights (balanced)
n_pos = int((y == 1).sum())
n_neg = int((y == 0).sum())
if n_pos == 0 or n_neg == 0:
sample_weight = None
else:
w_pos = n_neg / (n_pos + n_neg)
w_neg = n_pos / (n_pos + n_neg)
sample_weight = np.where(y == 1, w_pos, w_neg)
# 4. Fit
self.model.fit(X, y, sample_weight=sample_weight)
return self
def _check_fitted(self):
if not hasattr(self.model, "estimators_"):
raise RuntimeError("GradientBoostingModel is not fitted yet. Call `fit()` first.")
[docs]
def predict_proba(
self,
df: pd.DataFrame,
feature_prefix: str = "col_",
) -> np.ndarray:
"""
Predict match probabilities for a pairwise similarity DataFrame.
Returns the probability for the positive class (match = 1).
"""
self._check_fitted()
feature_cols = self.feature_cols or sorted([c for c in df.columns if c.startswith(feature_prefix)])
X = df[feature_cols]
proba = self.model.predict_proba(X)[:, 1]
return np.asarray(proba)
[docs]
def evaluate(
self,
df: pd.DataFrame,
match_col: str = "match",
feature_prefix: str = "col_",
threshold: float = 0.5,
) -> dict:
"""
Evaluate the model on a pairwise similarity DataFrame.
Returns a dict:
- TP, FP, TN, FN (integers)
- Accuracy, Recall, Precision, F1, MCC (floats)
"""
if match_col not in df.columns:
raise KeyError(f"Match column '{match_col}' not found in DataFrame.")
self._check_fitted()
y_true = df[match_col].to_numpy().astype(int)
proba = self.predict_proba(df, feature_prefix=feature_prefix)
y_hat = (proba >= threshold).astype(int)
tn, fp, fn, tp = confusion_matrix(y_true, y_hat, labels=[0, 1]).ravel()
acc = accuracy_score(y_true, y_hat)
prec = precision_score(y_true, y_hat, zero_division=0)
rec = recall_score(y_true, y_hat, zero_division=0)
f1 = f1_score(y_true, y_hat, zero_division=0)
mcc = matthews_corrcoef(y_true, y_hat) if (tp + fp + tn + fn) > 0 else 0.0
return {
"TP": int(tp),
"FP": int(fp),
"TN": int(tn),
"FN": int(fn),
"Accuracy": float(acc),
"Recall": float(rec),
"Precision": float(prec),
"F1": float(f1),
"MCC": float(mcc),
}
[docs]
def summary(self, top_k: int = 20) -> pd.DataFrame:
"""
Return a simple "summary" as a DataFrame of feature importances.
Parameters
----------
top_k : int, default 20
Number of most important features to return.
Returns
-------
pd.DataFrame
Columns: feature, importance
"""
self._check_fitted()
importances = getattr(self.model, "feature_importances_", None)
if importances is None:
raise RuntimeError("Model does not expose feature_importances_.")
df_imp = pd.DataFrame(
{"feature": self.feature_cols, "importance": importances}
).sort_values("importance", ascending=False)
return df_imp.head(top_k).reset_index(drop=True)
[docs]
def best_threshold(
self,
df_val: pd.DataFrame,
match_col: str = "match",
feature_prefix: str = "col_",
metric: str = "mcc",
thresholds: np.ndarray | None = None,
store_treshold: bool = True,
) -> tuple[float, dict]:
"""
Find the classification threshold that maximizes a metric on validation data.
Parameters
----------
df_val : pd.DataFrame
Validation DataFrame produced by AlternativeModels.pairwise_similarity_dataframe().
match_col : str, default "match"
Target column.
feature_prefix : str, default "col\_"
Feature column prefix.
metric : {"mcc","f1"}, default "mcc"
Metric to maximize.
thresholds : np.ndarray or None
Threshold grid. If None, uses np.linspace(0.01, 0.99, 99).
Returns
-------
best_t : float
Threshold achieving the best metric on df_val.
best_stats : dict
Evaluation dict (TP/FP/TN/FN/Accuracy/Recall/Precision/F1/MCC) at best_t.
"""
self._check_fitted()
if thresholds is None:
thresholds = np.linspace(0.01, 0.99, 99)
y_true = df_val[match_col].to_numpy().astype(int)
proba = self.predict_proba(df_val, feature_prefix=feature_prefix)
best_t = 0.5
best_score = -np.inf
best_stats = None
for t in thresholds:
y_hat = (proba >= t).astype(int)
# choose metric
if metric.lower() == "f1":
score = f1_score(y_true, y_hat, zero_division=0)
elif metric.lower() == "mcc":
score = matthews_corrcoef(y_true, y_hat)
else:
raise ValueError("metric must be 'mcc' or 'f1'")
if score > best_score:
best_score = score
best_t = float(t)
best_stats = self.evaluate(
df_val,
match_col=match_col,
feature_prefix=feature_prefix,
threshold=float(t),
)
if store_treshold:
self.best_threshold_ = best_t
return best_t, best_stats