import numpy as np
import pandas as pd
import xgboost as xgb
from typing import Any, Dict, List, Optional, Tuple
from joblib import Parallel, delayed
from sklearn.preprocessing import (
QuantileTransformer,
StandardScaler,
OrdinalEncoder,
LabelEncoder,
)
from sklearn.utils import check_random_state
from tqdm import tqdm
from ..base import TabularBaseGenerator
def _vp_sched(
T: int,
beta_min: float,
beta_max: float,
eps: float,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
ts = np.linspace(eps, 1.0, T)
dt = (1.0 - eps) / T
betas = dt * (beta_min + ts * (beta_max - beta_min))
betas = np.clip(betas, 1e-8, (1.0 - 1e-8))
alphas = 1.0 - betas
a_bars = np.cumprod(alphas, axis=0)
return betas, alphas, a_bars
[docs]
class XGB_Diffusion_Generator(TabularBaseGenerator):
"""XGenBoost diffusion generator.
Denoising Diffusion Probabilistic Model (DDPM) using XGBoost as score estimator.
Args:
target_column (str): Name of the target column.
timesteps (int): Number of diffusion timesteps. Default: 50.
noise_samples_per_row (int): Number of noise levels per row. Default: 100.
n_jobs (int): Number of parallel jobs used across timesteps/features. Default: -1.
n_jobs_xgb (int): Number of threads used per XGBoost model. Default: 1.
beta_min (float): Minimum beta value for the variance-preserving schedule. Default: 0.1.
beta_max (float): Maximum beta value for the variance-preserving schedule. Default: 8.0.
eps (float): Lower bound for the diffusion time grid. Default: 0.0.
xgboost_params (Optional[Dict[str, Any]]): Base parameters for XGBoost regressors/classifiers.
Default: {"n_estimators": 100, "max_depth": 7, "reg_lambda": 0.0}.
random_state (int): Random seed for reproducibility. Default: 0.
clip_extremes (bool): Whether to clip synthesized numerical values to observed training min/max. Default: True.
sampler (str): Numerical reverse sampler. Options: "ddpm", "ddim". Default: "ddpm".
objective (str): Numerical prediction objective. Options: "x", "v". Default: "x".
dropout (float): Feature dropout probability applied to numerical inputs during training. Default: 0.1.
dropout_token (str): Token used when dropping numerical inputs. Options: "mean", "missing", "random". Default: "mean".
**kwargs: Additional arguments passed to `TabularBaseGenerator`.
Example:
>>> import pandas as pd
>>> from synthyverse.generators import XGB_Diffusion_Generator
>>>
>>> # Load data
>>> X = pd.read_csv("data.csv")
>>> discrete_features = ["target", "category_col"]
>>>
>>> # Create generator (requires target column)
>>> generator = XGB_Diffusion_Generator(
... target_column="target",
... timesteps=50,
... sampler="ddpm",
... random_state=42
... )
>>>
>>> # Fit and generate
>>> generator.fit(X, discrete_features)
>>> X_syn = generator.generate(1000)
"""
name = "xgenboost_diffusion"
needs_target_column = True
def __init__(
self,
target_column: str,
timesteps: int = 50,
noise_samples_per_row: int = 100,
n_jobs: int = -1,
n_jobs_xgb: int = 1,
beta_min: float = 0.1,
beta_max: float = 8.0,
eps: float = 0.0,
xgboost_params: Optional[Dict[str, Any]] = {
"n_estimators": 100,
"max_depth": 7,
"reg_lambda": 0.0,
},
random_state: int = 0,
clip_extremes: bool = True,
sampler: str = "ddpm", # ddpm or ddim
objective: str = "x", # x or v
dropout: float = 0.1, # dropout rate for numerical features
dropout_token: str = "mean", # mean or missing
**kwargs,
):
super().__init__(random_state=random_state, **kwargs)
self.objective = str(objective).lower()
assert self.objective in ["x", "v"], "objective must be either x or v"
self.dropout_token = str(dropout_token).lower()
assert self.dropout_token in [
"mean",
"missing",
"random",
], "dropout_token must be either mean, missing, or random"
self.sampler = str(sampler).lower()
assert self.sampler in ["ddpm", "ddim"], "sampler must be either ddpm or ddim"
self.target_column = target_column
self.timesteps = timesteps
self.noise_samples_per_row = noise_samples_per_row
self.n_jobs = n_jobs
self.n_jobs_xgb = n_jobs_xgb
self.beta_min = beta_min
self.beta_max = beta_max
self.eps = eps
self.random_state = random_state
self.clip_extremes = clip_extremes
self.xgboost_params = (
xgboost_params.copy() if xgboost_params is not None else {}
)
self.xgboost_params.update(
{
"tree_method": "hist",
"random_state": random_state,
"n_jobs": n_jobs_xgb,
}
)
num_samplers = {"ddpm": self._ddpm_update, "ddim": self._ddim_update}
self.num_sampler = num_samplers[self.sampler]
self.rng = check_random_state(self.random_state)
self.dropout = dropout
def _fit_model(
self, X: pd.DataFrame, discrete_features: List[str], X_val: pd.DataFrame = None
):
# store original training column order for exact reconstruction
self.ori_cols = X.columns.tolist()
self.discrete_features = [x for x in X.columns if x in discrete_features]
self.numerical_features = [c for c in X.columns if c not in discrete_features]
# in none of the conditioning modes we retain discrete target in X
self.disc_features_x = [
col for col in self.discrete_features if col != self.target_column
]
self.num_features_x = self.numerical_features
self.is_conditional = self.target_column in self.discrete_features
X_tr = X.copy()
# ensure contiguous encoding of categoricals
if len(self.discrete_features) > 0:
self.ord_enc = OrdinalEncoder()
X_tr[self.discrete_features] = self.ord_enc.fit_transform(
X_tr[self.discrete_features]
).astype(int)
self.n_cls = (
X_tr[self.discrete_features].max(axis=0) + 1
if len(self.discrete_features) > 0
else None
)
# store numeric min/max in ORIGINAL space for clipping during generation
self.clip_extremes = len(self.numerical_features) > 0 and self.clip_extremes
if self.clip_extremes:
self.num_min_ = X[self.numerical_features].min(axis=0).to_numpy()
self.num_max_ = X[self.numerical_features].max(axis=0).to_numpy()
X_tr = self._scale(X_tr)
# build corrupted dataset (shape N,D,T)
self.betas_, self.alphas_, self.alpha_bars_ = _vp_sched(
self.timesteps, self.beta_min, self.beta_max, self.eps
)
x_corrupted = self._build_corrupted_dataset(X_tr)
self.labels = X_tr[self.target_column]
cols = self.num_features_x + self.disc_features_x
cond = [None]
if self.is_conditional:
cond = X_tr[self.target_column].unique()
else:
cols = self.numerical_features + self.discrete_features
tasks = (
delayed(self._fit_one)(X_tr, x_corrupted, t, lv, col, cols)
for t in tqdm(range(self.timesteps), desc="Fitting models")
for lv in cond
for col in cols
)
res = Parallel(n_jobs=self.n_jobs, prefer="threads")(tasks)
self.models = {}
self.label_encoders = {}
for model, label_enc, t, lv, col in res:
self.models.setdefault(t, {})
self.models[t].setdefault(lv, {})
self.models[t][lv].setdefault(col, model)
if label_enc is not None:
self.label_encoders.setdefault(t, {})
self.label_encoders[t].setdefault(lv, {})
self.label_encoders[t][lv].setdefault(col, label_enc)
def _generate_data(self, n: int) -> pd.DataFrame:
if self.is_conditional:
lv_samples = self.labels.sample(n, replace=True, random_state=self.rng)
else:
lv_samples = np.zeros(n, dtype=np.int64) # dummy for consistent logic
cond = np.unique(lv_samples)
syn = np.empty((n, len(self.num_features_x) + len(self.disc_features_x)))
for lv in cond:
idx = np.where(lv_samples == lv)[0]
if len(idx) == 0:
continue
# initialize pure noise array
x_t = self._init_noise(
len(idx), len(self.num_features_x), len(self.disc_features_x)
)
for t in reversed(range(self.timesteps)):
# parallelized inference over cols
tasks = (
delayed(self._inference_one_col)(x_t, j, col, t, lv)
for j, col in enumerate(self.num_features_x + self.disc_features_x)
)
res = Parallel(n_jobs=self.n_jobs, prefer="threads")(tasks)
# build x0_hat
x0_hat = np.zeros_like(x_t)
for j, out in res:
x0_hat[:, j] = out
if t == 0:
break
# numerical update (ddpm or ddim)
if len(self.num_features_x) > 0:
x_t = self.num_sampler(x_t, x0_hat, t)
# categorical update
if len(self.disc_features_x) > 0:
x_t = self._cat_update(x_t, x0_hat, t)
# synthesized data per label
syn[idx, :] = x0_hat
idx = list(range(n))
syn = pd.DataFrame(
syn, index=idx, columns=self.num_features_x + self.disc_features_x
)
if self.is_conditional:
y = pd.Series(lv_samples.to_numpy(), index=idx, name=self.target_column)
syn = pd.concat([syn, y], axis=1)
# reinstate original column order
ori_cols = [x for x in self.ori_cols if x in syn.columns]
syn = syn[ori_cols]
# inverse scale
syn = self._inverse_scale(syn)
if len(self.discrete_features) > 0:
syn[self.discrete_features] = self.ord_enc.inverse_transform(
syn[self.discrete_features].astype(float)
)
# clip to original training ranges
if self.clip_extremes:
syn[self.numerical_features] = np.clip(
syn[self.numerical_features],
self.num_min_[None, :],
self.num_max_[None, :],
)
return syn
def _fit_one(
self,
X: pd.DataFrame,
x_corrupted: np.ndarray,
t: int,
lv: Optional[int],
col: str,
cols: List[str],
):
# get corrupted data at current timestep
x_in = x_corrupted[lv][:, :, t] if lv is not None else x_corrupted[:, :, t]
x_in = x_in.copy()
disc_features = [x for x in self.discrete_features if x in cols]
feat_types = ["c" if x in disc_features else "q" for x in cols]
# apply dropout on numerical features
x_in_v = x_in.copy()
if self.dropout > 0.0 and len(self.num_features_x) > 0:
mask = (
self.rng.random(size=(len(x_in), len(self.num_features_x)))
< self.dropout
)
if self.dropout_token == "random":
for j in range(len(self.num_features_x)):
m = mask[:, j]
if not np.any(m):
continue
# randomly resample rows
s_idx = self.rng.randint(0, len(x_in_v), size=m.sum())
x_in[m, j] = x_in_v[s_idx, j]
else:
token = 0.0 if self.dropout_token == "mean" else np.nan
x_in[:, : len(self.num_features_x)] = np.where(
mask, token, x_in[:, : len(self.num_features_x)]
)
# get target values, i.e., the original data values
y_in = (
X[col][X[self.target_column] == lv].values
if lv is not None
else X[col].values
)
# repeat to match input data shape
y_in = np.repeat(y_in, self.noise_samples_per_row)
# align target to v-prediction
if (col in self.numerical_features) and (self.objective == "v"):
j = cols.index(col)
alpha = np.sqrt(self.alpha_bars_[t])
sigma = np.sqrt(
max(1.0 - self.alpha_bars_[t], 1e-12)
) # avoid divide-by-zero
eps = (x_in_v[:, j] - alpha * y_in) / sigma
y_in = alpha * eps - sigma * y_in
params = self.xgboost_params.copy()
params["feature_types"] = feat_types
label_enc = None
if col in self.numerical_features:
model = xgb.XGBRegressor(**params)
else:
# for some timesteps and/or subsets there may not be an ordinal encoding so label encode
label_enc = LabelEncoder()
y_in = label_enc.fit_transform(y_in)
model = xgb.XGBClassifier(**params)
model.fit(x_in, y_in)
if lv is None:
lv = 0
return model, label_enc, t, lv, col
def _build_corrupted_dataset(
self,
X: pd.DataFrame,
) -> np.ndarray:
"""
Build a corrupted dataset (combining Gaussian and multinomial diffusion) of shape N,D,T
"""
self.n_cls_ = (
self.n_cls.loc[self.disc_features_x].to_numpy(dtype=np.int64)
if self.n_cls is not None
else None
)
x_tr = X[[col for col in X.columns if col != self.target_column]]
if self.is_conditional:
# per-label corruption on X
x_corrupted = {}
for lv in X[self.target_column].unique():
x_corrupted[lv] = self._corrupt_num_cat(
x_tr[X[self.target_column] == lv],
self.disc_features_x,
self.num_features_x,
)
else:
# full corruption on X,y
x_corrupted = self._corrupt_num_cat(
X,
self.disc_features_x,
self.num_features_x,
)
return x_corrupted
def _corrupt_num_cat(
self,
X: pd.DataFrame,
discrete_features: List[str],
numerical_features: List[str],
) -> np.ndarray:
"""
Combine corrupted datasets for numerical and categorical features
"""
x_num_corrupted = None
x_cat_corrupted = None
if len(numerical_features) > 0:
x_num_corrupted = self._corrupt_num_full(
X[numerical_features].to_numpy(copy=False)
)
if len(discrete_features) > 0:
x_cat_corrupted = self._corrupt_cat_full(
X[discrete_features].to_numpy(copy=False)
)
x = (
np.concatenate([x_num_corrupted, x_cat_corrupted], axis=1)
if (x_num_corrupted is not None and x_cat_corrupted is not None)
else (x_num_corrupted if x_num_corrupted is not None else x_cat_corrupted)
)
return x
def _corrupt_cat_full(self, x0: np.ndarray) -> np.ndarray:
"""
Create a fully corrupted dataset (multinomial diffusion) of shape (N*K, D, T)
"""
# extend dataset K times
x0k = np.repeat(x0[:, :, None], self.noise_samples_per_row, axis=0)
a_bar = self.alpha_bars_[None, None, :]
keep = (
self.rng.random(size=(x0k.shape[0], x0k.shape[1], self.timesteps)) < a_bar
)
rep = (
self.rng.random(size=(x0k.shape[0], x0k.shape[1], self.timesteps))
* self.n_cls_[None, :, None]
).astype(np.int64)
x_corrupted = np.where(keep, x0k, rep)
return x_corrupted.astype(np.int64)
def _corrupt_num_full(self, x0: np.ndarray) -> np.ndarray:
"""
Create a fully corrupted dataset (Gaussian diffusion) of shape (N*K, D, T)
"""
x0k = np.repeat(x0[:, :, None], self.noise_samples_per_row, axis=0)
a_bar = self.alpha_bars_[None, None, :]
z = self.rng.normal(size=(x0k.shape[0], x0k.shape[1], self.timesteps))
x_corrupted = np.sqrt(a_bar) * x0k + np.sqrt(1.0 - a_bar) * z
return x_corrupted
def _init_noise(self, n: int, d_num: int, d_disc: int):
if d_num > 0:
x_t_num = self.rng.normal(size=(n, d_num))
if d_disc > 0:
x_t_cat = np.vstack(
[
self.rng.randint(0, int(self.n_cls_[j]), size=n)
for j in range(d_disc)
]
).T.astype(np.int64)
x_t = (
np.concatenate([x_t_num, x_t_cat], axis=1)
if (d_num > 0 and d_disc > 0)
else x_t_num if d_num > 0 else x_t_cat
)
return x_t
def _ddpm_update(
self,
x_t,
x0_hat,
t,
):
coef1 = (np.sqrt(self.alpha_bars_[t - 1]) * self.betas_[t]) / (
1.0 - self.alpha_bars_[t]
)
coef2 = (np.sqrt(self.alphas_[t]) * (1.0 - self.alpha_bars_[t - 1])) / (
1.0 - self.alpha_bars_[t]
)
dnum = len(self.num_features_x)
_x0_hat = x0_hat[:, :dnum]
_x_t = x_t[:, :dnum]
shape = (len(x_t), dnum)
mu = coef1 * _x0_hat + coef2 * _x_t
var = (
(1.0 - self.alpha_bars_[t - 1]) / (1.0 - self.alpha_bars_[t])
) * self.betas_[t]
var = max(var, 1e-20)
z = self.rng.normal(size=shape)
_x_t = mu + np.sqrt(var) * z
x_t[:, :dnum] = _x_t
return x_t
def _ddim_update(
self,
x_t,
x0_hat,
t,
):
dnum = len(self.num_features_x)
_x_t = x_t[:, :dnum]
_x0_hat = x0_hat[:, :dnum]
eps_hat = (_x_t - np.sqrt(self.alpha_bars_[t]) * _x0_hat) / np.sqrt(
1.0 - self.alpha_bars_[t]
)
_x_t = (
np.sqrt(self.alpha_bars_[t - 1]) * _x0_hat
+ np.sqrt(1.0 - self.alpha_bars_[t - 1]) * eps_hat
)
x_t[:, :dnum] = _x_t
return x_t
def _cat_update(self, x_t, x0_hat, t):
for j in range(len(self.disc_features_x)):
jj = len(self.num_features_x) + j
K = int(self.n_cls_[j])
x_t[:, jj] = self._sample_disc_posterior(
x_t[:, jj].astype(np.int64),
x0_hat[:, jj].astype(np.int64),
K=K,
t=t,
)
return x_t
def _sample_disc_posterior(
self,
xt: np.ndarray, # (N,) int in [0..K-1]
x0_hat: np.ndarray, # (N,) int in [0..K-1]
K: int,
t: int, # current timestep index (1..T-1)
) -> np.ndarray:
"""
Sample x_{t-1} for the categorical forward kernel:
q(x_t | x_{t-1}) = alpha_t * I + (1-alpha_t) * Uniform(K)
and prior:
q(x_{t-1} | x0) = a_bar_{t-1} * onehot(x0) + (1-a_bar_{t-1}) * Uniform(K)
Posterior:
q(x_{t-1}=k | x_t, x0) ∝ q(x_t | x_{t-1}=k) * q(x_{t-1}=k | x0)
"""
base_prior = (1.0 - self.alpha_bars_[t - 1]) / K
base_lik = (1.0 - self.alphas_[t]) / K
N = xt.shape[0]
probs = np.full((N, K), base_prior * base_lik, dtype=np.float64)
probs[np.arange(N), xt] += base_prior * self.alphas_[t]
probs[np.arange(N), x0_hat] += base_lik * self.alpha_bars_[t - 1]
same = xt == x0_hat
if np.any(same):
idx = np.where(same)[0]
probs[idx, xt[idx]] += self.alpha_bars_[t - 1] * self.alphas_[t]
probs_sum = probs.sum(axis=1, keepdims=True)
probs = probs / np.clip(probs_sum, 1e-12, None)
u = self.rng.random(size=N)
cdf = np.cumsum(probs, axis=1)
x_prev = (cdf < u[:, None]).sum(axis=1).astype(np.int64)
return x_prev
def _inference_one_col(self, x_t, j, col, t, lv):
# grab model
model = self.models[t][lv][col]
if col in self.numerical_features:
pred = model.predict(x_t)
if self.objective == "x":
out = pred
elif self.objective == "v":
alpha = np.sqrt(self.alpha_bars_[t])
sigma = np.sqrt(max(1.0 - self.alpha_bars_[t], 1e-12))
out = alpha * x_t[:, j] - sigma * pred
else:
le = self.label_encoders[t][lv][col]
cls_ids = le.classes_.astype(np.int64)
# If only one class was seen in training for this (t, lv, col), output it deterministically.
if cls_ids.shape[0] == 1:
out = cls_ids[0]
else:
proba = model.predict_proba(x_t)
proba = proba[:, : cls_ids.shape[0]]
proba = proba / np.clip(proba.sum(axis=1, keepdims=True), 1e-12, None)
u = self.rng.random(size=len(x_t))
cdf = np.cumsum(proba, axis=1)
cdf[:, -1] = 1.0 # ensure never fall off the edge
pick = (cdf < u[:, None]).sum(axis=1)
out = cls_ids[pick]
return j, out
def _scale(self, X: pd.DataFrame):
if len(self.num_features_x) > 0:
self.qt = QuantileTransformer(
output_distribution="normal",
n_quantiles=min(2000, len(X)),
subsample=int(1e9),
random_state=self.random_state,
)
X[self.num_features_x] = self.qt.fit_transform(X[self.num_features_x])
self.st_scaler = StandardScaler()
X[self.num_features_x] = self.st_scaler.fit_transform(
X[self.num_features_x]
)
else:
self.qt, self.st_scaler = None, None
return X
def _inverse_scale(self, X: pd.DataFrame):
if self.st_scaler is not None:
X[self.num_features_x] = self.st_scaler.inverse_transform(
X[self.num_features_x]
)
if self.qt is not None:
X[self.num_features_x] = self.qt.inverse_transform(X[self.num_features_x])
return X