Source code for synthyverse.benchmark.synthesis

from sklearn.model_selection import train_test_split
import pandas as pd
from time import time
import os
import shutil
import copy
from uuid import uuid4
from typing import Any, Dict, List, Tuple, Union

from .utils import format_results
from ..evaluation.eval import TabularMetricEvaluator
from ..generators import get_generator
from ..generators.base import TabularBaseGenerator
from ..utils.utils import free_up_memory
from ..utils.reproducibility import set_seed

DEFAULT_BENCHMARK_METRICS = ("classifier_test", "mle", "dcr")


[docs] class TabularSynthesisBenchmark: """Benchmark for evaluating tabular synthetic data generators. Args: generator (Union[str, TabularBaseGenerator]): Generator identifier. Can be a synthyverse generator name or a custom generator instance. generator_params (dict): Dictionary of generator-specific parameters. Default: None (empty dict). n_random_splits (int): Number of random train/test splits to evaluate. Default: 1. n_inits (int): Number of generator training initializations per split. Default: 1. test_size (float): Proportion of data to use for testing (0.0 to 1.0). Default: 0.2. val_size (float): Proportion of data to use for validation (0.0 to 1.0). Set to 0.0 to disable the validation split. Note that val_size+test_size must be < 1.0. Default: 0.1. missing_imputation_method (str): Method for handling missing values. "drop" removes missing rows, other options perform imputation: "random", "mean", "median", "most_frequent", "missforest". Default: "drop". retain_missingness (bool): Whether to retain missing values in generated datasets. Default: False. constraints (Union[str, list]): List of constraint strings which should hold in the generated data. Note that the constraints should already hold in the training datasets. Default: None (empty list). workspace (str): Directory for storing intermediate files. Default: "workspace". Example: >>> import pandas as pd >>> from synthyverse.benchmark import TabularSynthesisBenchmark >>> >>> # Load your data >>> X = pd.read_csv("data.csv") >>> discrete_columns = ["category_col"] >>> target_column = "target" >>> >>> # Create benchmark >>> benchmark = TabularSynthesisBenchmark( ... generator="arf", ... generator_params={"num_trees": 50}, ... n_random_splits=3, ... n_inits=3 ... ) >>> >>> # Train and evaluate models >>> trained_models = benchmark.train(X, target_column, discrete_columns) >>> results = benchmark.eval( ... X, ... trained_models, ... metrics=["classifier_test", "mle", "dcr"], ... n_generated_datasets=1, ... ) >>> # Or, train and evaluate models in one step: >>> results, trained_models = benchmark.train_and_eval(X, target_column, discrete_columns) >>> results """ def __init__( self, generator: Union[str, TabularBaseGenerator] = "arf", generator_params: dict = None, n_random_splits: int = 1, n_inits: int = 1, test_size: float = 0.2, val_size: float = 0.1, missing_imputation_method: str = "drop", retain_missingness: bool = False, constraints: Union[str, list] = None, workspace: str = "workspace", random_state: int = 0, ): if generator_params is None: generator_params = {} if constraints is None: constraints = [] self._custom_generator_template = None if isinstance(generator, str): self.generator = generator elif isinstance(generator, TabularBaseGenerator): self._custom_generator_template = generator custom_name = getattr(generator, "name", generator.__class__.__name__) self.generator = str(custom_name).replace(" ", "_") else: raise TypeError( "generator must be either a generator name (str) or an instance of TabularBaseGenerator." ) self.generator_params = dict(generator_params) self.generator_params.pop( "target_column", None ) # target column already provided if required self.generator_params.pop( "workspace", None ) # workspace already provided if needed self.generator_params.pop("random_state", None) # use loop-based random_states self.generator_params.update( { "missing_imputation_method": missing_imputation_method, "retain_missingness": retain_missingness, "constraints": constraints, } ) self.n_random_splits = n_random_splits self.n_inits = n_inits self.test_size = test_size self.val_size = val_size self.missing_imputation_method = missing_imputation_method self.retain_missingness = retain_missingness self.constraints = constraints self._trained_target_column = None self._trained_discrete_columns = None self.workspace = workspace self.random_state = random_state self._benchmark_instance_id = uuid4().hex def _get_generator_setup(self, target_column: str): if self._custom_generator_template is not None: generator_ = self._custom_generator_template.__class__ return generator_, {} generator_ = get_generator(self.generator) generator_params = dict(self.generator_params) # add workspace if needed if getattr(generator_, "needs_workspace", False): generator_params["workspace"] = self.workspace # add target column if needed if getattr(generator_, "needs_target_column", False): generator_params["target_column"] = target_column # do not impute missing values if the generator natively handles missingness if ( getattr(generator_, "handles_missingness", False) and generator_params["missing_imputation_method"] != "drop" ): generator_params["missing_imputation_method"] = "keep" return generator_, generator_params def _create_generator_instance( self, generator_: type, generator_params: dict, init_i: int, target_column: str, workspace: Union[str, None] = None, ): if self._custom_generator_template is None: return generator_(random_state=init_i, **generator_params) try: generator = copy.deepcopy(self._custom_generator_template) except Exception as exc: raise TypeError( "Custom generator instance must be deepcopy-able for repeated benchmark training." ) from exc # Harmonize benchmark-level preprocessing and seed settings on the copied instance. if hasattr(generator, "random_state"): generator.random_state = init_i if hasattr(generator, "missing_imputation_method"): generator.missing_imputation_method = self.missing_imputation_method if hasattr(generator, "retain_missingness"): generator.retain_missingness = self.retain_missingness if hasattr(generator, "constraints"): generator.constraints = copy.deepcopy(self.constraints) if hasattr(generator_, "needs_workspace") and getattr( generator_, "needs_workspace", False ): if hasattr(generator, "workspace"): generator.workspace = self.workspace if workspace is None else workspace if hasattr(generator_, "needs_target_column") and getattr( generator_, "needs_target_column", False ): if hasattr(generator, "target_column"): generator.target_column = target_column return generator def _split_data( self, X: pd.DataFrame, target_column: str, discrete_columns: list, split_i: int, test_size: Union[float, None] = None, val_size: Union[float, None] = None, ): if test_size is None: test_size = self.test_size if val_size is None: val_size = self.val_size # split data according to current seed stratify = None if target_column in discrete_columns: stratify = X[target_column] X_train, X_test = train_test_split( X, stratify=stratify, test_size=test_size, random_state=split_i ) X_train, X_test = X_train.reset_index(drop=True), X_test.reset_index(drop=True) if val_size > 0: stratify = None if target_column in discrete_columns: stratify = X_train[target_column] X_train, X_val = train_test_split( X_train, stratify=stratify, test_size=val_size / (1 - test_size), # val_size is a proportion of the training set random_state=split_i, ) X_train, X_val = X_train.reset_index(drop=True), X_val.reset_index( drop=True ) else: X_val = None return X_train, X_test, X_val def _get_model_workspace_path( self, split_i: int, init_i: int, generator: Union[str, None] = None ): if generator is None: generator = self.generator return os.path.join( self.workspace, f"{generator}_split_{split_i}_init_{init_i}_workspace", ) def _attach_training_context_to_model( self, model: TabularBaseGenerator, split_i: int, init_i: int, target_column: str, discrete_columns: list, training_time: float, ) -> None: model._benchmark_split_random_state = split_i model._benchmark_init_random_state = init_i model._benchmark_target_column = target_column model._benchmark_discrete_columns = list(discrete_columns) model._benchmark_test_size = self.test_size model._benchmark_val_size = self.val_size model._benchmark_base_random_state = self.random_state model._benchmark_training_time = training_time model._benchmark_instance_id = self._benchmark_instance_id @staticmethod def _extract_seed_from_key(prefix: str, key: Any) -> Union[int, None]: if not isinstance(key, str): return None expected_prefix = f"{prefix}_" if not key.startswith(expected_prefix): return None seed_str = key[len(expected_prefix) :] if seed_str.startswith("-"): digit_seed = seed_str[1:] else: digit_seed = seed_str if not digit_seed.isdigit(): return None return int(seed_str) def _collect_model_entries( self, trained_models: Union[TabularBaseGenerator, Dict[str, Any]], split_seed: Union[int, None] = None, init_seed: Union[int, None] = None, ) -> List[Dict[str, Any]]: entries = [] if isinstance(trained_models, TabularBaseGenerator): entries.append( { "model": trained_models, "split_seed": split_seed, "init_seed": init_seed, } ) return entries if isinstance(trained_models, dict): for key, value in trained_models.items(): key_split_seed = self._extract_seed_from_key("split", key) key_init_seed = self._extract_seed_from_key("init", key) next_split_seed = ( split_seed if key_split_seed is None else key_split_seed ) next_init_seed = init_seed if key_init_seed is None else key_init_seed entries.extend( self._collect_model_entries( trained_models=value, split_seed=next_split_seed, init_seed=next_init_seed, ) ) return entries raise TypeError( "trained_models must be a trained model or a nested dict of trained models returned by this benchmark's train()." ) def _validate_model_provenance(self, model: TabularBaseGenerator) -> None: required_attributes = [ "_benchmark_split_random_state", "_benchmark_init_random_state", "_benchmark_target_column", "_benchmark_discrete_columns", "_benchmark_test_size", "_benchmark_val_size", "_benchmark_base_random_state", "_benchmark_instance_id", ] missing_attributes = [ attribute_name for attribute_name in required_attributes if getattr(model, attribute_name, None) is None ] if missing_attributes: missing = ", ".join(missing_attributes) raise ValueError( f"Provided model is missing benchmark training metadata ({missing}). " "Pass models returned by this benchmark's train()." ) model_instance_id = str(getattr(model, "_benchmark_instance_id")) if model_instance_id != self._benchmark_instance_id: raise ValueError( "Provided model was not trained by this benchmark instance. " "Pass models returned by this benchmark's train()." ) def _normalize_trained_models( self, trained_models: Union[TabularBaseGenerator, Dict[str, Any]] ) -> Dict[int, Dict[int, TabularBaseGenerator]]: model_entries = self._collect_model_entries(trained_models=trained_models) if len(model_entries) == 0: raise ValueError("No trained models were provided to eval().") normalized_models: Dict[int, Dict[int, TabularBaseGenerator]] = {} for model_entry in model_entries: model = model_entry["model"] self._validate_model_provenance(model) split_from_dict = model_entry["split_seed"] init_from_dict = model_entry["init_seed"] split_from_model = getattr(model, "_benchmark_split_random_state") init_from_model = getattr(model, "_benchmark_init_random_state") if ( split_from_model is not None and split_from_dict is not None and int(split_from_model) != int(split_from_dict) ): raise ValueError( "Inconsistent split seeds detected between model metadata and dict keys." ) if ( init_from_model is not None and init_from_dict is not None and int(init_from_model) != int(init_from_dict) ): raise ValueError( "Inconsistent init seeds detected between model metadata and dict keys." ) split_i = split_from_model init_i = init_from_model split_i = int(split_i) init_i = int(init_i) if split_i not in normalized_models: normalized_models[split_i] = {} if init_i in normalized_models[split_i]: raise ValueError( f"Duplicate model detected for split={split_i}, init={init_i}." ) normalized_models[split_i][init_i] = model return normalized_models @staticmethod def _resolve_uniform_model_attribute( models: list, attribute_name: str, default_value: Any = None ) -> Any: resolved_value = default_value has_resolved_from_model = False for model in models: model_value = getattr(model, attribute_name, None) if model_value is None: continue if not has_resolved_from_model: resolved_value = model_value has_resolved_from_model = True continue if model_value != resolved_value: raise ValueError( f"Inconsistent '{attribute_name}' found across provided models." ) return resolved_value
[docs] def train( self, X: pd.DataFrame, target_column: str, discrete_columns: list, ): """Train the configured generator and return trained model objects. Args: X: Full dataset as a pandas DataFrame. target_column: Name of the target column. discrete_columns: List of discrete/categorical column names. Returns: TabularBaseGenerator or dict: Trained model, or nested `split/init` dict of trained models. """ os.makedirs(self.workspace, exist_ok=True) self._trained_target_column = target_column self._trained_discrete_columns = list(discrete_columns) trained_models = {} generator_, generator_params = self._get_generator_setup(target_column) needs_workspace = getattr(generator_, "needs_workspace", False) for split_i in range( self.random_state, self.random_state + self.n_random_splits ): # remove any previously tuned hyperparameters; they need to be re-tuned for different training splits shutil.rmtree("synthyverse_hyperparams_tuned", ignore_errors=True) split_key = f"split_{split_i}" trained_models[split_key] = {} X_train, _X_test, X_val = self._split_data( X, target_column, discrete_columns, split_i ) for init_i in range(self.random_state, self.random_state + self.n_inits): set_seed(init_i) iteration_workspace = self.workspace iteration_generator_params = dict(generator_params) if needs_workspace: iteration_workspace = self._get_model_workspace_path( split_i, init_i ) shutil.rmtree(iteration_workspace, ignore_errors=True) os.makedirs(iteration_workspace, exist_ok=True) iteration_generator_params["workspace"] = iteration_workspace else: self.clean_directory(self.workspace, remove_self=False) generator = self._create_generator_instance( generator_=generator_, generator_params=iteration_generator_params, init_i=init_i, target_column=target_column, workspace=iteration_workspace, ) start_time = time() generator.fit( X=X_train, discrete_features=discrete_columns, X_val=X_val ) training_time = time() - start_time self._attach_training_context_to_model( model=generator, split_i=split_i, init_i=init_i, target_column=target_column, discrete_columns=discrete_columns, training_time=training_time, ) trained_models[split_key][f"init_{init_i}"] = generator # release memory for next iteration free_up_memory() if not needs_workspace: self.clean_directory(self.workspace, remove_self=True) shutil.rmtree("synthyverse_hyperparams_tuned", ignore_errors=True) if len(trained_models) == 1: split_models = next(iter(trained_models.values())) if len(split_models) == 1: return next(iter(split_models.values())) return trained_models
[docs] def eval( self, X: pd.DataFrame, trained_models: Union[TabularBaseGenerator, dict], metrics: Union[list, dict, None] = None, n_generated_datasets: int = 1, max_eval_size: int = int(1e9), result_format: str = "frame", # "frame" or "dict" ): """Evaluate trained model objects. Args: X: Full dataset as a pandas DataFrame. trained_models: A single trained model or nested `split/init` dict returned by this benchmark's `train()`. metrics: List or dictionary of metrics to evaluate. Defaults to ["classifier_test", "mle", "dcr"] when None. n_generated_datasets: Number of synthetic datasets to generate per initialization. max_eval_size: Maximum size of sampled train/test/validation subsets used for evaluation. result_format: Format of results ("frame" for DataFrame, "dict" for nested dict). Returns: pd.DataFrame or dict: Benchmark results in the specified format. """ if metrics is None: metrics = list(DEFAULT_BENCHMARK_METRICS) os.makedirs(self.workspace, exist_ok=True) normalized_models = self._normalize_trained_models( trained_models=trained_models ) all_models = [ model for split_models in normalized_models.values() for model in split_models.values() ] model_target_column = self._resolve_uniform_model_attribute( all_models, "_benchmark_target_column", default_value=None ) model_discrete_columns = self._resolve_uniform_model_attribute( all_models, "_benchmark_discrete_columns", default_value=None ) if model_discrete_columns is not None: model_discrete_columns = list(model_discrete_columns) resolved_target_column = model_target_column resolved_discrete_columns = model_discrete_columns if resolved_target_column is None or resolved_discrete_columns is None: raise ValueError( "Could not resolve target/discrete columns from model metadata. Pass models returned by this benchmark's train()." ) resolved_discrete_columns = list(resolved_discrete_columns) resolved_test_size = self._resolve_uniform_model_attribute( all_models, "_benchmark_test_size", default_value=None ) resolved_val_size = self._resolve_uniform_model_attribute( all_models, "_benchmark_val_size", default_value=None ) resolved_random_state = self._resolve_uniform_model_attribute( all_models, "_benchmark_base_random_state", default_value=None ) if ( resolved_test_size is None or resolved_val_size is None or resolved_random_state is None ): raise ValueError( "Could not resolve test/validation split configuration from model metadata. " "Pass models returned by this benchmark's train()." ) resolved_random_state = int(resolved_random_state) results = {} for split_i in sorted(normalized_models.keys()): split_key = f"split_{split_i}" results[split_key] = {} X_train, X_test, X_val = self._split_data( X, resolved_target_column, resolved_discrete_columns, split_i, test_size=resolved_test_size, val_size=resolved_val_size, ) for init_i in sorted(normalized_models[split_i].keys()): init_key = f"init_{init_i}" results[split_key][init_key] = {} generator = normalized_models[split_i][init_i] # potentially generate multiple datasets for generated_dataset_i in range( resolved_random_state, resolved_random_state + n_generated_datasets, ): set_seed(generated_dataset_i) generated_dataset_key = f"generated_dataset_{generated_dataset_i}" results[split_key][init_key][generated_dataset_key] = {} # sample synthetic dataset and perform evaluation n_train = min(max_eval_size, len(X_train)) n_test = min(max_eval_size, len(X_test)) n = n_train + n_test start_time = time() X_syn = generator.generate(n) results[split_key][init_key][generated_dataset_key][ "inference_time" ] = (time() - start_time) evaluator = TabularMetricEvaluator( metrics=metrics, discrete_features=resolved_discrete_columns, target_column=resolved_target_column, missing_imputation_method=self.missing_imputation_method, random_state=generated_dataset_i, ) X_val_sample = None if X_val is not None: n_val = min(max_eval_size, len(X_val)) X_val_sample = X_val.sample( n_val, replace=False, random_state=generated_dataset_i ) start_time = time() metric_results = evaluator.evaluate( X_train.sample( n_train, replace=False, random_state=generated_dataset_i ), X_test.sample( n_test, replace=False, random_state=generated_dataset_i ), X_syn, X_val_sample, ) results[split_key][init_key][generated_dataset_key][ "evaluation_time" ] = (time() - start_time) results[split_key][init_key][generated_dataset_key].update( metric_results ) # release memory for next iteration free_up_memory() if result_format == "frame": results = format_results(results) return results
[docs] def train_and_eval( self, X: pd.DataFrame, target_column: str, discrete_columns: list, metrics: Union[list, dict, None] = None, n_generated_datasets: int = 1, max_eval_size: int = int(1e9), result_format: str = "frame", # "frame" or "dict" ) -> Tuple[Union[pd.DataFrame, dict], Union[TabularBaseGenerator, dict]]: """Train and evaluate the generator. This is a convenience wrapper for users who only need benchmark results and do not need to call `train()` and `eval()` separately. Args: X: Full dataset as a pandas DataFrame. target_column: Name of the target column. discrete_columns: List of discrete/categorical column names. metrics: List or dictionary of metrics to evaluate. Defaults to ["classifier_test", "mle", "dcr"] when None. n_generated_datasets: Number of synthetic datasets to generate per initialization. max_eval_size: Maximum size of sampled train/test/validation subsets used for evaluation. result_format: Format of results ("frame" for DataFrame, "dict" for nested dict). Returns: tuple: `(results, trained_models)`, where `results` is in the requested `result_format`, and `trained_models` is the output from `train()`. """ trained_models = self.train( X=X, target_column=target_column, discrete_columns=discrete_columns, ) results = self.eval( X=X, trained_models=trained_models, metrics=metrics, n_generated_datasets=n_generated_datasets, max_eval_size=max_eval_size, result_format=result_format, ) return results, trained_models
def clean_directory(self, path: str, remove_self: bool = False) -> None: if not os.path.exists(path): raise FileNotFoundError(f"Directory '{path}' does not exist.") if remove_self: shutil.rmtree(path) else: for entry in os.listdir(path): entry_path = os.path.join(path, entry) if os.path.isfile(entry_path) or os.path.islink(entry_path): os.remove(entry_path) elif os.path.isdir(entry_path): shutil.rmtree(entry_path)