import pandas as pd
import numpy as np
import xgboost as xgb
from sklearn.utils import check_random_state
from joblib import Parallel, delayed
from typing import Union, Tuple
from tqdm import tqdm
ArrayLike = Union[np.ndarray, list, Tuple[float, ...]]
from .xgenboost import XGenBoost
from .utils import sample_from_posterior
from .eqf import EmpiricalInterpolatedQuantile
[docs]
class XGB_AR_Generator(XGenBoost):
"""XGenBoost autoregressive generator.
Trains a hierarchical autoregressive model where conditionals are learned by XGBoost classifiers.
Args:
target_column (str): Name of the target column.
conditioning (str): Conditioning mode. Options: "generation", "inference". Default: "inference".
xgboost_params (dict): Parameters passed to each underlying XGBoost model.
Default: {"n_estimators": 30, "max_depth": 3, "max_bin": 256, "early_stopping_rounds": 20, "device": "cpu"}.
use_early_stopping (bool): Whether to use validation-based early stopping when validation data is provided. Default: False.
temperature (float): Sampling temperature for posterior sampling. Default: 1.0.
discretization (str): Numerical discretization strategy. Default: "quantile".
per_bin_sampling (str): Sampling method within numerical bins. Default: "eqf".
cat_merge_type (str): Strategy for merging infrequent categories. Default: "clustering".
cat_merge_n_infrequent (int): Number of infrequent category clusters to merge into. Default: 5.
visit_order_method (str): Feature visit-order method. Default: "naive".
visit_order_mode (str): Visit-order direction. Options: "ascending", "descending". Default: "ascending".
random_state (int): Random seed for reproducibility. Default: 0.
n_jobs_xgb (int): Number of threads used per XGBoost model. Default: 1.
n_jobs (int): Number of parallel jobs used to train/sample across tasks. Default: -1.
H (int): Meta-tree height for numerical features. The number of bins is ``2**H``. Default: 5.
route_method (str): Numerical routing method. Options: "propagate", "routing". Default: "routing".
start_method (str): Initialization method for the first feature. Options: "bootstrap", "eqf". Default: "bootstrap".
**kwargs: Additional arguments passed to `TabularBaseGenerator`.
Example:
>>> import pandas as pd
>>> from synthyverse.generators import XGB_AR_Generator
>>>
>>> # Load data
>>> X = pd.read_csv("data.csv")
>>> discrete_features = ["target", "category_col"]
>>>
>>> # Create generator (requires target column)
>>> generator = XGB_AR_Generator(
... target_column="target",
... H=5,
... random_state=42
... )
>>>
>>> # Fit and generate
>>> generator.fit(X, discrete_features)
>>> X_syn = generator.generate(1000)
"""
name = "xgenboost_ar"
needs_target_column = True
def __init__(
self,
target_column: str,
conditioning: str = "inference", # "generation", "inference"
xgboost_params: dict = {
"n_estimators": 30,
"max_depth": 3,
"max_bin": 256,
"early_stopping_rounds": 20,
"device": "cpu",
},
use_early_stopping: bool = False,
temperature: float = 1.0,
discretization: str = "quantile",
per_bin_sampling: str = "eqf",
cat_merge_type: str = "clustering",
cat_merge_n_infrequent: int = 5,
visit_order_method: str = "naive",
visit_order_mode: str = "ascending",
random_state: int = 0,
n_jobs_xgb: int = 1,
n_jobs: int = -1,
H: int = 5, # meta-tree height; n_bins = 2^H for continuous discretizers
route_method: str = "routing", # "propagate" or "routing"
start_method: str = "bootstrap", # "bootstrap" or "eqf"
**kwargs,
) -> None:
super().__init__(
target_column=target_column,
conditioning=conditioning,
use_early_stopping=use_early_stopping,
discretization=discretization,
n_bins=2**H,
per_bin_sampling=per_bin_sampling,
cat_merge_type=cat_merge_type,
cat_merge_n_infrequent=cat_merge_n_infrequent,
random_state=random_state,
**kwargs,
)
assert route_method in [
"propagate",
"routing",
], "route_methods must be either 'propagate' or 'routing'"
self.__dict__.update(locals())
assert start_method in [
"bootstrap",
"eqf",
], "start_method must be either 'bootstrap' or 'eqf'"
device = self.xgboost_params.get("device", "cpu")
self.xgboost_params.update(
{
"random_state": self.random_state,
"nthread": self.n_jobs_xgb,
"tree_method": "hist" if device == "cpu" else "gpu_hist",
}
)
self.rng = check_random_state(self.random_state)
self.models_ut = {}
self.models_cat = {}
self.feature_names = None
self.feature_types = None
# --------------------------
# Training
# --------------------------
def _train_model(self, X, X_enc, val_X, val_X_enc):
self.feature_names = X.columns.tolist()
self.feature_types = [
"c" if c in self.discrete_columns else "q" for c in self.feature_names
]
x = X.to_numpy()
x_enc = X_enc.to_numpy()
if val_X is not None:
val_x = val_X.to_numpy()
val_x_enc = val_X_enc.to_numpy()
else:
val_x = None
val_x_enc = None
cols = self.feature_names
# create a flat list of all tasks for maximum parallelism
tasks = []
for i in range(1, len(cols)):
col = cols[i]
if col in self.discrete_columns:
tasks.append(("cat", i, col, None, None))
else:
for d in range(self.H):
for node in range(2**d):
tasks.append(("ut", i, col, d, node))
# run all XGB classifiers in parallel
results = Parallel(n_jobs=self.n_jobs, prefer="threads")(
delayed(self._train_one_task)(
kind=kind,
i=i,
col=col,
d=d,
node=node,
x=x,
x_enc=x_enc,
feature_types=self.feature_types,
H=self.H,
val_x=val_x,
val_x_enc=val_x_enc,
)
for (kind, i, col, d, node) in tqdm(tasks, desc="Training", leave=False)
)
# init output
self.models_ut = {
col: {d: {} for d in range(self.H)}
for col in cols[1:]
if col not in self.discrete_columns
}
self.models_cat = {}
# save results
for kind, col, d, node, clf in results:
if kind == "cat":
self.models_cat[col] = clf
else:
self.models_ut[col][d][node] = clf
def _train_one_task(
self,
kind: str,
i: int,
col: str,
d,
node,
x,
x_enc,
feature_types,
H,
val_x=None,
val_x_enc=None,
):
params = self.xgboost_params.copy()
if kind == "cat":
params.update({"num_class": int(len(self.label_encoders[col].classes_))})
if kind == "cat":
clf = self._train_multiclass(
i=i,
x=x,
x_enc=x_enc,
feature_types=feature_types,
xgboost_params=params,
val_x=val_x,
val_x_enc=val_x_enc,
)
return ("cat", col, None, None, clf)
clf = self._train_ut_node(
i=i,
d=d,
node=node,
x=x,
x_enc=x_enc,
feature_types=feature_types,
xgboost_params=params,
H=H,
val_x=val_x,
val_x_enc=val_x_enc,
)
return ("ut", col, d, node, clf)
def _train_ut_node(
self,
i,
d,
node,
x,
x_enc,
feature_types,
xgboost_params,
H,
val_x=None,
val_x_enc=None,
):
y = x_enc[:, i]
x_input = x[:, :i]
span = 2 ** (H - d)
half = span // 2
start = node * span
mid = start + half
end = start + span
idx = (y >= start) & (y < end)
y_node = (y[idx] >= mid).astype(np.int32)
x_node = x_input[idx]
f_types = feature_types[:i]
params = xgboost_params.copy()
params.update({"feature_types": f_types})
clf = xgb.XGBClassifier(**params)
if val_x is not None and self.use_early_stopping:
val_y = val_x_enc[:, i]
val_x_input = val_x[:, :i]
val_idx = (val_y >= start) & (val_y < end)
val_y_node = (val_y[val_idx] >= mid).astype(np.int32)
val_x_node = val_x_input[val_idx]
clf.fit(x_node, y_node, eval_set=[(val_x_node, val_y_node)])
else:
clf.fit(x_node, y_node)
return clf
def _train_multiclass(
self,
i,
x,
x_enc,
feature_types,
xgboost_params,
val_x=None,
val_x_enc=None,
):
y = x_enc[:, i].astype(np.int32)
x_input = x[:, :i]
f_types = feature_types[:i]
params = xgboost_params.copy()
params.update(
{"feature_types": f_types, "objective": "multi:softprob"}
) # "feature_names": f_names,
clf = xgb.XGBClassifier(**params)
if val_x is not None and self.use_early_stopping:
y_val = val_x_enc[:, i].astype(np.int32)
x_val = val_x[:, :i]
clf.fit(
x_input,
y,
eval_set=[(x_val, y_val)],
)
else:
clf.fit(x_input, y)
return clf
def _sample_data(self, n: int):
syn = pd.DataFrame(index=range(n), columns=self.feature_names)
if self.start_method == "bootstrap":
syn[self.feature_names[0]] = (
self.X[self.feature_names[0]]
.sample(n=n, replace=True, random_state=self.rng)
.to_numpy()
)
elif self.start_method == "eqf":
if self.feature_names[0] in self.discrete_columns:
syn[self.feature_names[0]] = (
self.X[self.feature_names[0]]
.sample(n=n, replace=True, random_state=self.rng)
.to_numpy()
)
else:
eqf = EmpiricalInterpolatedQuantile(
n_knots=-1, # use all training samples as knots
use_spline=False, # whether to use monotonic cubic spline interpolation
)
eqf.fit(self.X[self.feature_names[0]].to_numpy())
syn[self.feature_names[0]] = eqf.rvs(size=n, rng=self.rng)
else:
raise ValueError(f"Invalid start method: {self.start_method}")
for i, col in enumerate(
tqdm(self.feature_names[1:], desc="Sampling", leave=False), start=1
):
x_input = syn[self.feature_names[:i]].to_numpy(copy=False)
if col in self.discrete_columns:
clf = self.models_cat[col]
probs = clf.predict_proba(x_input)
else:
if self.route_method == "propagate":
probs = self._meta_tree_leaf_probs(col=col, x_input=x_input)
else: # "routing"
probs = self._meta_tree_leaf_probs_routing(col=col, x_input=x_input)
# clip to the column’s label space and renormalize
k = len(self.label_encoders[col].classes_)
probs = probs[:, :k]
row_sums = probs.sum(axis=1, keepdims=True)
probs = np.divide(probs, np.maximum(row_sums, 1e-12))
syn[col] = sample_from_posterior(
probs,
col,
n,
self.temperature,
self.discrete_columns,
self.rng,
self.per_bin_sampling,
self.label_encoders,
self.discretizers,
self.repo,
)
return syn
def _meta_tree_leaf_probs(self, col: str, x_input: np.ndarray) -> np.ndarray:
"""
Compute p(leaf_bin | x_input) by multiplying node decisions along paths.
Equivalent to computing the full leaf distribution.
"""
n = x_input.shape[0]
mass = np.ones((n, 1), dtype=np.float64)
for d in range(self.H):
n_nodes = 2**d
next_mass = np.zeros((n, 2 ** (d + 1)), dtype=np.float64)
for node in range(n_nodes):
clf = self.models_ut[col][d][node]
p_right = clf.predict_proba(x_input)[:, 1].astype(np.float64)
m = mass[:, node]
next_mass[:, 2 * node] = m * (1.0 - p_right)
next_mass[:, 2 * node + 1] = m * p_right
mass = next_mass
return mass
def _meta_tree_leaf_probs_routing(
self, col: str, x_input: np.ndarray
) -> np.ndarray:
"""
Stochastically route the meta-tree to obtain a bin.
Retrieves the same leaf distribution in expectation.
"""
n = x_input.shape[0]
# current node index per sample
node_idx = np.zeros(n, dtype=np.int32)
for d in range(self.H):
next_node_idx = np.empty_like(node_idx)
for node in range(2**d):
mask = node_idx == node
if not np.any(mask):
continue
clf = self.models_ut[col][d][node]
p_right = clf.predict_proba(x_input[mask])[:, 1]
# sample Bernoulli routing decisions
go_right = self.rng.uniform(size=p_right.shape[0]) < p_right
next_node_idx[mask] = 2 * node + go_right.astype(np.int32)
node_idx = next_node_idx
# convert routed leaves to one-hot mass
mass = np.zeros((n, 2**self.H), dtype=np.float64)
mass[np.arange(n), node_idx] = 1.0
return mass