Source code for synthyverse.evaluation.fidelity

import inspect
import json
import os

import numpy as np
import optuna
import pandas as pd
import xgboost as xgb
from sdmetrics.reports.single_table import QualityReport
from sklearn.compose import ColumnTransformer
from sklearn.metrics import roc_auc_score
from sklearn.neighbors import NearestNeighbors
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import (
    KBinsDiscretizer,
    MinMaxScaler,
    OneHotEncoder,
    StandardScaler,
)
from sklearn.utils import all_estimators
from .hyperparameters import sklearn_hyperparams, xgboost_hyperparams
from scipy.stats import spearmanr, pearsonr, chi2_contingency, wasserstein_distance
from scipy.spatial.distance import jensenshannon
from scipy.stats import ks_2samp


[docs] class ClassifierTest: """AUC score of a classifier that distinguishes synthetic from real data. Lower scores indicate better quality synthetic data (harder to distinguish from real). Args: X_val (pd.DataFrame, optional): Validation data for hyperparameter tuning. Default: None. discrete_features (list): List of discrete/categorical feature names. Default: []. random_state (int): Random seed for reproducibility. Default: 0. model_name (str): Classifier name. Use "xgboost" for native XGBoost, or any sklearn classifier class name discoverable via sklearn.utils.discovery.all_estimators. clf_params (dict): Classifier parameters passed to the selected estimator. tune (bool): Whether to tune hyperparameters. Default: False. tuning_trials (int): Number of Optuna trials for hyperparameter tuning. Default: 32. Example: >>> import pandas as pd >>> from synthyverse.evaluation import ClassifierTest >>> >>> # Prepare data >>> X_train = pd.DataFrame(...) >>> X_test = pd.DataFrame(...) >>> X_syn = pd.DataFrame(...) >>> X_val = pd.DataFrame(...) >>> discrete_features = ["category_col"] >>> >>> # Create metric >>> metric = ClassifierTest( ... X_val=X_val, ... discrete_features=discrete_features, ... tune=True, ... random_state=42 ... ) >>> >>> # Evaluate >>> results = metric.evaluate(X_train, X_test, X_syn) """ name = "classifier_test" data_requirement = "train_and_test" needs_discrete_features = True needs_random_state = True needs_val_set = True def __init__( self, X_val: pd.DataFrame = None, discrete_features: list = None, random_state: int = 0, model_name: str = "xgboost", clf_params: dict = None, tune: bool = False, tuning_trials: int = 32, ): super().__init__() self.random_state = random_state self.discrete_features = ( discrete_features if discrete_features is not None else [] ) self.X_val = X_val self.tune = tune self.tuning_trials = tuning_trials self.model_name = model_name self.model_name_lc = model_name.lower() self.uses_xgboost = self.model_name_lc in { "xgboost", "xgb", "xgbclassifier", } self.clf_params = clf_params if clf_params is not None else {}
[docs] def evaluate( self, train: pd.DataFrame, test: pd.DataFrame, sd: pd.DataFrame, ): """Evaluate synthetic data using classifier test. Args: train: Training data as a pandas DataFrame. test: Test data as a pandas DataFrame. sd: Synthetic data as a pandas DataFrame. Returns: dict: Dictionary with "classifiertest.auc" key and AUC score value. """ if self.tune: assert self.X_val is not None, "X_val must be provided when tune=True." # try to load params from file - if it doesn't exist, we tune and save the params model_slug = self.model_name_lc.replace(" ", "_") param_file = ( f"synthyverse_hyperparams_tuned/classifier_test_{model_slug}.json" ) if os.path.exists(param_file): with open(param_file, "r") as f: params = json.load(f) else: params = self._tune(train, sd) os.makedirs(os.path.dirname(param_file), exist_ok=True) with open(param_file, "w") as f: json.dump(params, f) return self._evaluate(train, test, sd, params) else: return self._evaluate(train, test, sd, self.clf_params)
def _evaluate( self, train: pd.DataFrame, test: pd.DataFrame, sd: pd.DataFrame, clf_params: dict = None, ): categorical_features = [ col for col in train.columns if col in self.discrete_features ] numerical_features = [ col for col in train.columns if col not in self.discrete_features ] # training y = np.concatenate((np.zeros(len(train)), np.ones(len(train)))) x = pd.concat((train, sd[: len(train)])).reset_index(drop=True) if self.uses_xgboost or "histgradientboosting" in self.model_name_lc: x = self._prepare_native_categorical_input(x, categorical_features) x_train_model = x model = self._build_classifier( clf_params=clf_params, categorical_features=categorical_features, numerical_features=numerical_features, ) model.fit(x, y) # evaluation y = np.concatenate((np.zeros(len(test)), np.ones(len(test)))) x = pd.concat((test, sd[-len(test) :])).reset_index(drop=True) if self.uses_xgboost or "histgradientboosting" in self.model_name_lc: x = self._prepare_native_categorical_input( x, categorical_features, categories_ref=x_train_model ) preds = self._predict_binary_scores(model, x) score = roc_auc_score(y, preds) return {f"classifiertest.auc": float(score)} def _tune(self, train: pd.DataFrame, sd: pd.DataFrame): def objective(trial: optuna.Trial): if self.uses_xgboost: params = xgboost_hyperparams(trial) params.update( {"objective": "binary:logistic", "random_state": self.random_state} ) else: params = sklearn_hyperparams( trial, self.model_name, estimator_type="classifier" ) return self._evaluate(train, self.X_val, sd, params)[f"classifiertest.auc"] study = optuna.create_study( sampler=optuna.samplers.TPESampler(seed=self.random_state), direction="maximize", ) study.optimize( objective, n_trials=self.tuning_trials, show_progress_bar=True, ) return study.best_params.copy() def _supports_random_state(self, estimator_cls) -> bool: try: sig = inspect.signature(estimator_cls) except (TypeError, ValueError): return False return "random_state" in sig.parameters def _filter_supported_params(self, estimator_cls, params: dict) -> dict: params = {} if params is None else params.copy() try: sig = inspect.signature(estimator_cls) except (TypeError, ValueError): return params if any( p.kind == inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values() ): return params allowed = set(sig.parameters.keys()) return {k: v for k, v in params.items() if k in allowed} def _resolve_classifier(self): estimators = { name.lower(): cls for name, cls in all_estimators(type_filter="classifier") } cls = estimators.get(self.model_name_lc) if cls is None: raise ValueError( f"Unknown classifier model_name '{self.model_name}'. " f"Please use a valid sklearn classifier name from all_estimators " f"or 'xgboost'." ) return cls def _needs_numeric_scaling(self, model_cls) -> bool: name = model_cls.__name__.lower() no_scale_tokens = [ "tree", "forest", "boosting", "bagging", "randomforest", "extratrees", "histgradientboosting", "isolationforest", ] return not any(token in name for token in no_scale_tokens) def _uses_native_histgb(self, model_cls) -> bool: return "histgradientboosting" in model_cls.__name__.lower() def _build_preprocessor(self, model_cls, categorical_features, numerical_features): numeric_transformer = ( StandardScaler() if self._needs_numeric_scaling(model_cls) else "passthrough" ) return ColumnTransformer( transformers=[ ( "categorical", OneHotEncoder(handle_unknown="ignore", sparse_output=False), categorical_features, ), ("numerical", numeric_transformer, numerical_features), ], remainder="drop", ) def _build_classifier( self, clf_params: dict, categorical_features: list, numerical_features: list, ): clf_params = {} if clf_params is None else clf_params.copy() if self.uses_xgboost: clf_params.setdefault("random_state", self.random_state) clf_params.setdefault("objective", "binary:logistic") clf_params.setdefault("tree_method", "hist") return xgb.XGBClassifier( **clf_params, enable_categorical=True, ) model_cls = self._resolve_classifier() if self._supports_random_state(model_cls): clf_params.setdefault("random_state", self.random_state) clf_params = self._filter_supported_params(model_cls, clf_params) if self._uses_native_histgb(model_cls): clf_params.setdefault("categorical_features", categorical_features) return model_cls(**clf_params) estimator = model_cls(**clf_params) preprocessor = self._build_preprocessor( model_cls, categorical_features, numerical_features ) return Pipeline([("preprocessor", preprocessor), ("model", estimator)]) def _prepare_native_categorical_input( self, x: pd.DataFrame, categorical_features: list, categories_ref: pd.DataFrame = None, ): x = x.copy() for col in categorical_features: if categories_ref is None: x[col] = x[col].astype("category") else: x[col] = pd.Categorical( x[col], categories=categories_ref[col].cat.categories ) return x def _predict_binary_scores(self, model, x): if hasattr(model, "predict_proba"): return model.predict_proba(x)[:, 1] if hasattr(model, "decision_function"): scores = model.decision_function(x) if np.ndim(scores) > 1: scores = scores[:, 0] return 1.0 / (1.0 + np.exp(-scores)) return model.predict(x).astype(float)
[docs] class AlphaPrecisionBetaRecallAuthenticity: """Alpha-Precision, Beta-Recall, Authenticity score. Paper: "How faithful is your synthetic data? sample-level metrics for evaluating and auditing generative models" by Alaa et al. (2022). Args: discrete_features (list): List of discrete/categorical feature names. Default: []. Example: >>> import pandas as pd >>> from synthyverse.evaluation import AlphaPrecisionBetaRecallAuthenticity >>> >>> # Prepare data >>> X_real = pd.DataFrame(...) >>> X_syn = pd.DataFrame(...) >>> discrete_features = ["category_col"] >>> >>> # Create metric >>> metric = AlphaPrecisionBetaRecallAuthenticity( ... discrete_features=discrete_features ... ) >>> >>> # Evaluate >>> results = metric.evaluate(X_real, X_syn) """ name = "prauth" data_requirement = "train" needs_discrete_features = True def __init__(self, discrete_features: list = []): super().__init__() self.discrete_features = discrete_features
[docs] def evaluate( self, rd: pd.DataFrame, sd: pd.DataFrame, ): """Evaluate synthetic data using alpha-precision, beta-recall, and authenticity. Args: rd: Real data as a pandas DataFrame. sd: Synthetic data as a pandas DataFrame. Returns: dict: Dictionary with keys: - "alphaprecision.naive.score": Alpha-precision score - "betacoverage.naive.score": Beta-coverage score - "authenticity.naive.score": Authenticity score """ numerical_features = [ col for col in rd.columns if col not in self.discrete_features ] # one hot and standard scale onehot_encoder = OneHotEncoder(sparse_output=False) onehot_encoder.fit( pd.concat([rd[self.discrete_features], sd[self.discrete_features]]) ) scaler = MinMaxScaler() scaler.fit(rd[numerical_features]) data = {} for df, name in zip([rd, sd], ["rd", "sd"]): cat = onehot_encoder.transform(df[self.discrete_features]) cat = cat / 2 # scaling for Gower distance num = scaler.transform(df[numerical_features]) data[name] = np.concatenate((cat, num), axis=1) x_rd = data["rd"] x_sd = data["sd"] emb_center = np.mean(x_rd, axis=0) n_steps = 30 alphas = np.linspace(0, 1, n_steps) # Radii = np.quantile(np.sqrt(np.sum((x_rd - emb_center) ** 2, axis=1)), alphas) # use L1 distance to get Gower-type distance Radii = np.quantile(np.sum(np.abs(x_rd - emb_center), axis=1), alphas) synth_center = np.mean(x_sd, axis=0) alpha_precision_curve = [] beta_coverage_curve = [] # synth_to_center = np.sqrt(np.sum((x_sd - emb_center) ** 2, axis=1)) # use L1 distance to get Gower-type distance synth_to_center = np.sum(np.abs(x_sd - emb_center), axis=1) # use L1 distance to get Gower-type distance nbrs_real = NearestNeighbors(n_neighbors=2, n_jobs=-1, p=1).fit(x_rd) real_to_real, _ = nbrs_real.kneighbors(x_rd) nbrs_synth = NearestNeighbors(n_neighbors=1, n_jobs=-1, p=1).fit(x_sd) real_to_synth, real_to_synth_args = nbrs_synth.kneighbors(x_rd) real_to_real = real_to_real[:, 1].squeeze() real_to_synth = real_to_synth.squeeze() real_to_synth_args = real_to_synth_args.squeeze() real_synth_closest = x_sd[real_to_synth_args] # real_synth_closest_d = np.sqrt( # np.sum((real_synth_closest - synth_center) ** 2, axis=1) # ) # use L1 distance to get Gower-type distance real_synth_closest_d = np.sum(np.abs(real_synth_closest - synth_center), axis=1) closest_synth_Radii = np.quantile(real_synth_closest_d, alphas) for k in range(len(Radii)): precision_audit_mask = synth_to_center <= Radii[k] alpha_precision = np.mean(precision_audit_mask) beta_coverage = np.mean( ( (real_to_synth <= real_to_real) * (real_synth_closest_d <= closest_synth_Radii[k]) ) ) alpha_precision_curve.append(alpha_precision) beta_coverage_curve.append(beta_coverage) authen = real_to_real[real_to_synth_args] < real_to_synth authenticity = np.mean(authen) Delta_precision_alpha = 1 - np.sum( np.abs(np.array(alphas) - np.array(alpha_precision_curve)) ) / np.sum(alphas) Delta_coverage_beta = 1 - np.sum( np.abs(np.array(alphas) - np.array(beta_coverage_curve)) ) / np.sum(alphas) return { "alphaprecision.naive.score": float(Delta_precision_alpha), "betacoverage.naive.score": float(Delta_coverage_beta), "authenticity.naive.score": float(authenticity), }
[docs] class ShapeTrend: """Column Shapes and Column Pair Trends from the SDMetrics library (https://docs.sdv.dev/sdmetrics/) Indicates quality of marginal distributions and correlations in synthetic data, respectively. Args: discrete_features (list): List of discrete/categorical feature names. Default: []. Example: >>> import pandas as pd >>> from synthyverse.evaluation import ShapeTrend >>> >>> # Prepare data >>> X_real = pd.DataFrame(...) >>> X_syn = pd.DataFrame(...) >>> discrete_features = ["category_col"] >>> >>> # Create metric >>> metric = ShapeTrend(discrete_features=discrete_features) >>> >>> # Evaluate >>> results = metric.evaluate(X_real, X_syn) """ name = "shapetrend" data_requirement = "train" needs_discrete_features = True def __init__( self, discrete_features: list = [], ): super().__init__() self.discrete_features = discrete_features
[docs] def evaluate( self, rd: pd.DataFrame, sd: pd.DataFrame, ): """Evaluate synthetic data using SDMetrics shape and trend scores. Args: rd: Real data as a pandas DataFrame. sd: Synthetic data as a pandas DataFrame. Returns: dict: Dictionary with keys: - "shapetrend.shape": Column shapes score - "shapetrend.trend": Column pair trends score """ dtypes = [ "categorical" if x in self.discrete_features else "numerical" for x in rd.columns ] metadata = {k: {"sdtype": v} for k, v in zip(rd.columns, dtypes)} metadata = {"columns": metadata} metadata["primary_key"] = "index" report = QualityReport() report.generate(rd, sd, metadata, verbose=False) scores = report.get_properties() shape = scores.loc[scores["Property"] == "Column Shapes", "Score"] trend = scores.loc[scores["Property"] == "Column Pair Trends", "Score"] return { "shapetrend.shape": float(shape.iloc[0]), "shapetrend.trend": float(trend.iloc[0]), }
[docs] class Marginals: """Per-column distributional distance between real and synthetic marginals. Computes a distance metric for each column independently and returns the average distance over numerical and categorical features separately. Supported distance functions: Wasserstein (wsd), Jensen-Shannon divergence (jsd), Kolmogorov-Smirnov statistic (ks), and Total Variation distance (tvd). For histogram-based metrics (jsd, tvd) on numerical features, values are discretized into equal-width bins before comparison. Lower scores indicate better fidelity to the real marginals. Args: discrete_features (list): List of discrete/categorical feature names. Default: []. numerical_distance (str): Distance metric for numerical features. One of "wsd", "jsd", "ks", or "tvd". Default: "wsd". categorical_distance (str): Distance metric for categorical features. One of "jsd", "tvd", "wsd", or "ks". Default: "jsd". n_bins_numerical (int): Number of equal-width bins used when discretizing numerical features for jsd/tvd. Must be >= 2. Default: 30. Example: >>> import pandas as pd >>> from synthyverse.evaluation import Marginals >>> >>> # Prepare data >>> X_real = pd.DataFrame(...) >>> X_syn = pd.DataFrame(...) >>> discrete_features = ["category_col"] >>> >>> # Create metric >>> metric = Marginals( ... discrete_features=discrete_features, ... numerical_distance="wsd", ... categorical_distance="jsd", ... ) >>> >>> # Evaluate >>> results = metric.evaluate(X_real, X_syn) """ name = "marginals" data_requirement = "train" needs_discrete_features = True def __init__( self, discrete_features: list = [], numerical_distance: str = "wsd", categorical_distance: str = "jsd", n_bins_numerical: int = 30, ): super().__init__() self.discrete_features = discrete_features self.numerical_distance = self._check_distance(numerical_distance) self.categorical_distance = self._check_distance(categorical_distance) self.n_bins_numerical = int(n_bins_numerical) if self.n_bins_numerical < 2: raise ValueError("n_bins_numerical must be >= 2")
[docs] def evaluate(self, rd: pd.DataFrame, sd: pd.DataFrame): """Evaluate synthetic data by comparing marginal distributions. Args: rd: Real data as a pandas DataFrame. sd: Synthetic data as a pandas DataFrame. Returns: dict: Dictionary with keys: - "marginals.<numerical_distance>": Mean distance over numerical features - "marginals.<categorical_distance>": Mean distance over categorical features """ rd = rd.copy() sd = sd.copy() numerical_features = [c for c in rd.columns if c not in self.discrete_features] dist_func = { "wsd": self._wsd, "jsd": self._jsd, "ks": self._ks, "tvd": self._tvd, } # For numerical JSD/TVD we need to discretize to compare histograms if self.numerical_distance in ["jsd", "tvd"]: if len(numerical_features) > 0: n_bins = min(len(rd), self.n_bins_numerical) # KBinsDiscretizer expects 2D; it will bin per-feature discretizer = KBinsDiscretizer( n_bins=n_bins, encode="ordinal", strategy="uniform" ) discretizer.fit(rd[numerical_features]) rd[numerical_features] = discretizer.transform(rd[numerical_features]) sd[numerical_features] = discretizer.transform(sd[numerical_features]) num = [] for col in numerical_features: num.append(dist_func[self.numerical_distance](rd[col], sd[col])) cat = [] for col in self.discrete_features: cat.append(dist_func[self.categorical_distance](rd[col], sd[col])) return { f"marginals.{self.numerical_distance}": ( float(np.mean(num)) if len(num) else np.nan ), f"marginals.{self.categorical_distance}": ( float(np.mean(cat)) if len(cat) else np.nan ), }
def _jsd(self, s1: pd.Series, s2: pd.Series) -> float: p = s1.value_counts(normalize=True, dropna=False) q = s2.value_counts(normalize=True, dropna=False) support = p.index.union(q.index) p = p.reindex(support, fill_value=0.0).to_numpy(dtype=float) q = q.reindex(support, fill_value=0.0).to_numpy(dtype=float) return float(jensenshannon(p, q, base=2)) def _tvd(self, s1: pd.Series, s2: pd.Series) -> float: p = s1.value_counts(normalize=True, dropna=False) q = s2.value_counts(normalize=True, dropna=False) support = p.index.union(q.index) p = p.reindex(support, fill_value=0.0).to_numpy(dtype=float) q = q.reindex(support, fill_value=0.0).to_numpy(dtype=float) return float(0.5 * np.abs(p - q).sum()) def _ks(self, s1: pd.Series, s2: pd.Series) -> float: x = s1.to_numpy() y = s2.to_numpy() # ks_2samp handles ties; returns statistic in [0, 1] return float(ks_2samp(x, y, alternative="two-sided", mode="auto").statistic) def _wsd(self, s1: pd.Series, s2: pd.Series) -> float: mu = float(np.mean(s1)) sigma = float(np.std(s1)) if sigma == 0.0 or not np.isfinite(sigma): return float(wasserstein_distance(s1, s2)) s1z = (s1 - mu) / sigma s2z = (s2 - mu) / sigma return float(wasserstein_distance(s1z, s2z)) def _check_distance(self, distance: str) -> str: distance = distance.lower() if distance in ["wsd", "wasserstein", "ws"]: return "wsd" elif distance in ["jsd", "jensenshannon", "js"]: return "jsd" elif distance in [ "ks", "kstest", "kolmogorov", "kolmogorovsmirnov", "kolmogorov-smirnov", ]: return "ks" elif distance in [ "tv", "tvd", "totalvariation", "total_variation", "total-variation", ]: return "tvd" else: raise ValueError(f"Invalid distance: {distance}")
[docs] class Correlations: """Pairwise correlation matrix difference between real and synthetic data. Builds a full correlation matrix for both real and synthetic data and returns the L2 norm of their absolute difference. Correlation type is chosen automatically per feature pair: Spearman/Pearson for numerical-numerical, Cramer's V for categorical-categorical, and the correlation ratio (eta-squared) for mixed pairs. Lower scores indicate better preservation of feature dependencies. Args: discrete_features (list): List of discrete/categorical feature names. Default: []. numerical_correlation (str): Correlation method for numerical-numerical pairs. One of "spearman" or "pearson". Default: "pearson". Example: >>> import pandas as pd >>> from synthyverse.evaluation import Correlations >>> >>> # Prepare data >>> X_real = pd.DataFrame(...) >>> X_syn = pd.DataFrame(...) >>> discrete_features = ["category_col"] >>> >>> # Create metric >>> metric = Correlations( ... discrete_features=discrete_features, ... numerical_correlation="spearman", ... ) >>> >>> # Evaluate >>> results = metric.evaluate(X_real, X_syn) """ name = "correlations" data_requirement = "train" needs_discrete_features = True def __init__( self, discrete_features: list = [], numerical_correlation: str = "pearson", ): super().__init__() self.discrete_features = discrete_features self.numerical_correlation = numerical_correlation
[docs] def evaluate(self, rd: pd.DataFrame, sd: pd.DataFrame): """Evaluate synthetic data by comparing pairwise correlation matrices. Args: rd: Real data as a pandas DataFrame. sd: Synthetic data as a pandas DataFrame. Returns: dict: Dictionary with key: - "correlations.l2": L2 norm of the absolute difference between the real and synthetic correlation matrices """ rd = rd.copy() sd = sd.copy() cols = rd.columns.tolist() n = len(cols) C_rd = np.zeros((n, n)) C_sd = np.zeros((n, n)) for i, ci in enumerate(cols): for j, cj in enumerate(cols): if j < i: C_rd[i, j] = C_rd[j, i] C_sd[i, j] = C_sd[j, i] continue corr_type = self._get_corr_type(ci, cj) C_rd[i, j] = self._get_corr(rd[ci], rd[cj], corr_type) C_sd[i, j] = self._get_corr(sd[ci], sd[cj], corr_type) diff = np.abs(C_rd - C_sd) l2 = np.linalg.norm(diff) return { "correlations.l2": float(l2), }
# ---------- helpers ---------- def _get_corr_type(self, c1: str, c2: str) -> str: d1 = c1 in self.discrete_features d2 = c2 in self.discrete_features if not d1 and not d2: return "numerical" if d1 and d2: return "categorical" return "mixed" def _get_corr(self, s1: pd.Series, s2: pd.Series, corr_type: str) -> float: if corr_type == "numerical": return self._num_corr(s1, s2) elif corr_type == "categorical": return self._cat_corr(s1, s2) elif corr_type == "mixed": # correlation ratio: categorical explains numerical if s1.name in self.discrete_features: return self._mixed_corr(s1, s2) else: return self._mixed_corr(s2, s1) else: raise ValueError(f"Invalid corr_type: {corr_type}") def _num_corr(self, s1: pd.Series, s2: pd.Series) -> float: if self.numerical_correlation == "spearman": return float(spearmanr(s1, s2).statistic) elif self.numerical_correlation == "pearson": return float(pearsonr(s1, s2).statistic) else: raise ValueError( f"Invalid numerical correlation: {self.numerical_correlation}" ) def _cat_corr(self, s1: pd.Series, s2: pd.Series) -> float: ct = pd.crosstab(s1, s2) if ct.size == 0: return 0.0 chi2 = chi2_contingency(ct, correction=False)[0] n = ct.values.sum() r, k = ct.shape denom = n * (min(r - 1, k - 1)) if denom == 0: return 0.0 return float(np.sqrt(chi2 / denom)) def _mixed_corr(self, cat: pd.Series, num: pd.Series) -> float: """ η^2 = Var(E[X|Y]) / Var(X) """ x = num.to_numpy() y = cat.to_numpy() if np.var(x) == 0: return 0.0 df = pd.DataFrame({"x": x, "y": y}) means = df.groupby("y")["x"].mean() counts = df.groupby("y")["x"].count() grand_mean = x.mean() num = np.sum(counts * (means - grand_mean) ** 2) den = np.sum((x - grand_mean) ** 2) if den == 0: return 0.0 return float(num / den)