# -*- coding: utf-8 -*-
'''
diPLSlib model classes
- DIPLS base class
- GCTPLS class
- EDPLS class
- KDAPLS class
'''
# Modules
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.utils.validation import check_array, check_X_y
from sklearn.utils import check_random_state
from sklearn.exceptions import NotFittedError
from scipy.sparse import issparse, sparray
import numpy as np
import matplotlib.pyplot as plt
from diPLSlib import functions as algo
from diPLSlib.utils import misc as helpers
import scipy.stats
from sklearn.metrics.pairwise import rbf_kernel, linear_kernel
# Create KDAPLS class
[docs]
class KDAPLS(RegressorMixin, BaseEstimator):
"""
Kernel Domain Adaptive Partial Least Squares (KDAPLS) algorithm for domain adaptation.
This class implements KDAPLS by calling the `kdapls` function from `functions.py`.
KDAPLS projects both source and target data into a reproducing kernel Hilbert space (RKHS) and aligns domains in that space while fitting the regression model on labeled data.
Parameters
----------
A : int, default=2
Number of latent variables to use in the model.
l : float or tuple, default=0
Regularization parameter. If a single value is provided, the same regularization is applied to all latent variables.
kernel_params : dict, optional
Dictionary specifying the kernel type and parameters. Accepted keys:
- "type" : str, default="rbf"
Kernel type, can be "rbf", "linear", or "primal".
- "gamma" : float, default=0.0001
Kernel coefficient for RBF kernels.
target_domain : int, default=0
Specifies which domain's coefficient vector is used for predictions.
Attributes
----------
n_ : int
Number of samples in `X`.
n_features_in_ : int
Number of features in `X`.
ns_ : int
Number of samples in `xs`.
nt_ : int or list
Number of samples in `xt`. If multiple target domains are provided, this is a list of sample counts for each domain.
coef_ : ndarray of shape (n_features, 1)
Regression coefficient vector used for predictions.
X_ : ndarray of shape (n_, n_features_in_)
Training data used for fitting the model.
xs_ : ndarray of shape (ns_, n_features_in_)
(Unlabeled) source domain data used for fitting the model.
xt_ : ndarray of shape (nt_, n_features_in_)
(Unlabeled) target domain data used for fitting the model.
y_mean_ : float
Mean of the training response variable.
centering_ : dict
Dictionary of stored centering information for kernel operations.
is_fitted_ : bool
Whether the model has been fitted to data.
Examples
--------
>>> import numpy as np
>>> from diPLSlib.models import KDAPLS
>>> x = np.random.rand(100, 10)
>>> y = np.random.rand(100, 1)
>>> xs = np.random.rand(80, 10)
>>> xt = np.random.rand(50, 10)
>>> model = KDAPLS(A=2, l=0.5, kernel_params={"type": "rbf", "gamma": 0.001})
>>> model.fit(x, y, xs, xt)
KDAPLS(kernel_params={'gamma': 0.001, 'type': 'rbf'}, l=0.5)
>>> xtest = np.random.rand(5, 10)
>>> yhat = model.predict(xtest)
References
----------
1. Huang, G., Chen, X., Li, L., Chen, X., Yuan, L., & Shi, W. (2020). Domain adaptive partial least squares regression.
Chemometrics and Intelligent Laboratory Systems, 201, 103986.
2. B. Schölkopf, A. Smola, and K. Müller. Nonlinear component analysis as a kernel eigenvalue problem.
Neural computation, 10(5):1299-1319, 1998.
"""
def __init__(self, A=2, l=0, kernel_params=None, target_domain=0):
self.A = A
self.l = l
self.kernel_params = kernel_params
self.target_domain = target_domain
[docs]
def fit(self, X, y, xs=None, xt=None, **kwargs):
"""
Fit the KDAPLS Model.
Parameters
----------
X : np.ndarray
Labeled source domain data (usually the same as xs).
y : np.ndarray
Corresponding labels for X.
xs : np.ndarray
Source domain data.
xt : np.ndarray
Target domain data.
**kwargs : dict, optional
Additional keyword arguments to pass to the model (e.g., for model selection purposes).
Returns
-------
self : object
Fitted estimator.
"""
# Set kernel parameters
if self.kernel_params is None:
kernel_params = {"type": "primal"}
else:
kernel_params = self.kernel_params.copy()
# Check for sparse input
if issparse(X):
raise ValueError("Sparse input is not supported. Please convert your data to dense format.")
# Validate input arrays
X, y = check_X_y(X, y, ensure_2d=True, allow_nd=False, accept_large_sparse=False, accept_sparse=False, force_all_finite=True)
# Check if source and target data are provided
if xs is None:
xs = X
if xt is None:
xt = X
# Validate source and target arrays
xs = check_array(xs, ensure_2d=True, allow_nd=False, accept_large_sparse=False, accept_sparse=False, force_all_finite=True)
xs = np.atleast_2d(xs) if xs is not None else X
if isinstance(xt, list):
xt = [check_array(x, ensure_2d=True, allow_nd=False, accept_large_sparse=False, accept_sparse=False, force_all_finite=True) for x in xt]
else:
xt = check_array(xt, ensure_2d=True, allow_nd=False, accept_large_sparse=False, accept_sparse=False, force_all_finite=True)
xt = [np.atleast_2d(x) for x in xt] if isinstance(xt, list) else np.atleast_2d(xt) if xt is not None else X
# Check if at least two samples and features are provided
if X.shape[0] < 2:
raise ValueError("At least two samples are required to fit the model (got n_samples = {}).".format(X.shape[0]))
if X.shape[1] < 2:
raise ValueError("KDAPLS requires at least 2 features to fit the model (got n_features = {}).".format(X.shape[1]))
# Ensure y is 2D
if y.ndim == 1:
y = y.reshape(-1, 1)
# Check for complex data
if np.iscomplexobj(X) or np.iscomplexobj(y) or np.iscomplexobj(xs) or np.iscomplexobj(xt):
raise ValueError("Complex data not supported")
# Preliminaries
self.n_, self.n_features_in_ = X.shape
self.ns_, _ = xs.shape
if isinstance(xt, list):
self.nt_ = [x.shape[0] for x in xt]
else:
self.nt_, _ = xt.shape
self.y_ = y
self.xs_ = xs
self.xt_ = xt
b, bst, T, Tst, W, P, Pst, E, Est, Ey, C, centering = algo.kdapls(
X, y, xs, xt,
A=self.A,
l=self.l,
kernel_params=kernel_params
)
# Select coefficient vector based on target_domain
if self.target_domain == 0:
self.coef_ = b
else:
self.coef_ = bst
self.centering_ = centering[self.target_domain]
self.X_ = X
self.y_mean_ = centering[0]["y_mean_"] if 0 in centering else 0.0
self.is_fitted_ = True
return self
[docs]
def predict(self, X):
"""
Predict with KDAPLS model.
Parameters
----------
X : ndarray of shape (n_samples, n_features)
Test data matrix to perform the prediction on.
Returns
-------
yhat : ndarray of shape (n_samples_test,)
Predicted response values for the test data.
"""
# Check if the model is fitted
check_is_fitted = getattr(self, "is_fitted_", False)
if not check_is_fitted:
raise NotFittedError("KDAPLS object is not fitted yet.")
# Check for sparse input
if issparse(X):
raise ValueError("Sparse input is not supported. Please convert your data to dense format.")
# Validate input array
X = check_array(X, ensure_2d=True, allow_nd=False, force_all_finite=True)
# Assert feature match
if X.shape[1] != self.X_.shape[1]:
raise ValueError(
f"Number of features in the test data ({X.shape[1]}) does not match "
f"the number of features in the training data ({self.X_.shape[1]})."
)
Kt_c = self._x_centering(X)
yhat = Kt_c @ self.coef_ + self.centering_["y_mean_"]
# Ensure the shape of yhat matches the shape of y
yhat = np.ravel(yhat)
return yhat
def _x_centering(self, X):
"""
Center new data X using stored centering_.
Parameters
----------
X : ndarray of shape (n_samples, n_features)
Test data matrix to perform the prediction on.
Returns
-------
Kt : ndarray
Centered test data matrix. The shape of Kt depends on the kernel type:
- For 'rbf' and 'linear', Kt is the kernel matrix between X and X_.
- For 'primal', Kt is the centered test data matrix.
"""
n = self.X_.shape[0]
Kt = None
# Check if X has same number of features as X_
if X.shape[1] != self.X_.shape[1]:
raise ValueError(
f"Number of features in the test data ({X.shape[1]}) does not match "
f"the number of features in the training data ({self.X_.shape[1]})."
)
if self.kernel_params is not None:
if self.kernel_params["type"] == "rbf":
gamma_ = self.kernel_params["gamma"]
Kt = rbf_kernel(X, self.X_, gamma=gamma_)
elif self.kernel_params["type"] == "linear":
Kt = linear_kernel(X, self.X_)
elif self.kernel_params["type"] == "primal":
Kt = X.copy()
else:
raise ValueError("Invalid kernel type. Supported types are 'rbf', 'linear', and 'primal'.")
if self.kernel_params["type"] == "primal":
return Kt - self.centering_["K"].mean(axis=0)
else:
J = (1 / n) * np.ones((n, n))
Jt = (1 / self.centering_["n"]) * (np.ones((X.shape[0], 1)) @ np.ones((1, self.centering_["n"])))
return Kt - Kt @ J - Jt @ self.centering_["K"] + Jt @ self.centering_["K"] @ J
else: # Use primal da-PLS
Kt = X.copy()
mean_vec = self.centering_["K"].mean(axis=0)
return Kt - mean_vec
[docs]
class DIPLS(RegressorMixin, BaseEstimator):
"""
Domain-Invariant Partial Least Squares (DIPLS) algorithm for domain adaptation.
This class implements the DIPLS algorithm, which is designed to align feature distributions
across different domains while predicting the target variable `y`. It supports multiple
source and target domains through domain-specific feature transformations.
Parameters
----------
A : int, default=2
Number of latent variables to use in the model.
l : float or tuple of length A, default=0
Regularization parameter. If a single value is provided, the same regularization is applied to all latent variables.
centering : bool, default=True
If True, source and target domain data are mean-centered.
heuristic : bool, default=False
If True, the regularization parameter is set to a heuristic value that
balances fitting the output variable y and minimizing domain discrepancy.
target_domain : int, default=0
If multiple target domains are passed, target_domain specifies
for which of the target domains the model should apply.
If target_domain=0, the model applies to the source domain,
if target_domain=1, it applies to the first target domain, and so on.
rescale : str or ndarray, default='Target'
Determines rescaling of the test data. If 'Target' or 'Source', the test data will be
rescaled to the mean of xt or xs, respectively. If an ndarray is provided, the test data
will be rescaled to the mean of the provided array.
Attributes
----------
n_ : int
Number of samples in `X`.
ns_ : int
Number of samples in `xs`.
nt_ : int
Number of samples in `xt`.
n_features_in_ : int
Number of features in `X`.
mu_ : ndarray of shape (n_features,)
Mean of columns in `X`.
mu_s_ : ndarray of shape (n_features,)
Mean of columns in `xs`.
mu_t_ : ndarray of shape (n_features,) or list of ndarray
Mean of columns in `xt`, averaged per target domain if multiple domains exist.
b_ : ndarray of shape (n_features, 1)
Regression coefficient vector.
b0_ : float
Intercept of the regression model.
T_ : ndarray of shape (n_samples, A)
Training data projections (scores).
Ts_ : ndarray of shape (n_source_samples, A)
Source domain projections (scores).
Tt_ : ndarray of shape (n_target_samples, A) or list of ndarray
Target domain projections (scores).
W_ : ndarray of shape (n_features, A)
Weight matrix.
P_ : ndarray of shape (n_features, A)
Loadings matrix corresponding to X.
Ps_ : ndarray of shape (n_features, A)
Loadings matrix corresponding to xs.
Pt_ : ndarray of shape (n_features, A) or list of ndarray
Loadings matrix corresponding to xt.
E_ : ndarray
Residuals of training data.
Es_ : ndarray
Source domain residual matrix.
Et_ : ndarray or list of ndarray
Target domain residual matrix.
Ey_ : ndarray
Residuals of response variable in the source domain.
C_ : ndarray of shape (A, 1)
Regression vector relating source projections to the response variable.
opt_l_ : ndarray of shape (A,)
Heuristically determined regularization parameter for each latent variable.
discrepancy_ : ndarray of shape (A,)
The variance discrepancy between source and target domain projections.
is_fitted_ : bool
Whether the model has been fitted to data.
References
----------
1. Ramin Nikzad-Langerodi et al., "Domain-Invariant Partial Least Squares Regression", Analytical Chemistry, 2018.
2. Ramin Nikzad-Langerodi et al., "Domain-Invariant Regression under Beer-Lambert's Law", Proc. ICMLA, 2019.
3. Ramin Nikzad-Langerodi et al., "Domain adaptation for regression under Beer–Lambert’s law", Knowledge-Based Systems, 2020.
4. B. Mikulasek et al., "Partial least squares regression with multiple domains", Journal of Chemometrics, 2023.
Examples
--------
>>> import numpy as np
>>> from diPLSlib.models import DIPLS
>>> x = np.random.rand(100, 10)
>>> y = np.random.rand(100, 1)
>>> xs = np.random.rand(100, 10)
>>> xt = np.random.rand(50, 10)
>>> model = DIPLS(A=5, l=10)
>>> model.fit(x, y, xs, xt)
DIPLS(A=5, l=10)
>>> xtest = np.array([5, 7, 4, 3, 2, 1, 6, 8, 9, 10]).reshape(1, -1)
>>> yhat = model.predict(xtest)
"""
def __init__(self, A=2, l=0, centering=True, heuristic=False, target_domain=0, rescale='Target'):
# Model parameters
self.A = A
self.l = l
self.centering = centering
self.heuristic = heuristic
self.target_domain = target_domain
self.rescale = rescale
[docs]
def fit(self, X, y, xs=None, xt=None, **kwargs):
"""
Fit the DIPLS model.
This method fits the domain-invariant partial least squares (di-PLS) model
using the provided source and target domain data. It can handle both single
and multiple target domains.
Parameters
----------
X : ndarray of shape (n_samples, n_features)
Labeled input data from the source domain.
y : ndarray of shape (n_samples, 1)
Response variable corresponding to the input data `x`.
xs : ndarray of shape (n_samples_source, n_features)
Source domain X-data. If not provided, defaults to `X`.
xt : Union[ndarray of shape (n_samples_target, n_features), List[ndarray]]
Target domain X-data. Can be a single target domain or a list of arrays
representing multiple target domains. If not provided, defaults to `X`.
**kwargs : dict, optional
Additional keyword arguments to pass to the model (e.g.,
for model selection purposes).
Returns
-------
self : object
Fitted model instance.
"""
# Check for sparse input
if issparse(X):
raise ValueError("Sparse input is not supported. Please convert your data to dense format.")
# Validate input arrays
X, y = check_X_y(X, y, ensure_2d=True, allow_nd=False, accept_large_sparse=False, accept_sparse=False, force_all_finite=True)
# Check if source and target data are provided
if xs is None:
xs = X
if xt is None:
xt = X
# Validate source and target arrays
xs = check_array(xs, ensure_2d=True, allow_nd=False, accept_large_sparse=False, accept_sparse=False, force_all_finite=True)
xs = np.atleast_2d(xs) if xs is not None else X
if isinstance(xt, list):
xt = [check_array(x, ensure_2d=True, allow_nd=False, accept_large_sparse=False, accept_sparse=False, force_all_finite=True) for x in xt]
else:
xt = check_array(xt, ensure_2d=True, allow_nd=False, accept_large_sparse=False, accept_sparse=False, force_all_finite=True)
xt = [np.atleast_2d(x) for x in xt] if isinstance(xt, list) else np.atleast_2d(xt) if xt is not None else X
# Flatten y to 1D array
y = np.ravel(y)
# Check for complex data
if np.iscomplexobj(X) or np.iscomplexobj(y) or np.iscomplexobj(xs) or np.iscomplexobj(xt):
raise ValueError("Complex data not supported")
# Check if source and target data are provided
if xs is None:
xs = X
if xt is None:
xt = X
# Preliminaries
self.n_, self.n_features_in_ = X.shape
self.ns_, _ = xs.shape
self.x_ = X
self.y_ = y
self.xs_ = xs
self.xt_ = xt
self.b0_ = np.mean(self.y_)
# Mean centering
if self.centering:
self.mu_ = np.mean(self.x_, axis=0)
self.mu_s_ = np.mean(self.xs_, axis=0)
self.x_ = self.x_ - self.mu_
self.xs_ = self.xs_ - self.mu_s_
y = self.y_ - self.b0_
# Mutliple target domains
if isinstance(self.xt_, list):
self.nt_, _ = xt[0].shape
self.mu_t_ = [np.mean(x, axis=0) for x in self.xt_]
self.xt_ = [x - mu for x, mu in zip(self.xt_, self.mu_t_)]
else:
self.nt_, _ = xt.shape
self.mu_t_ = np.mean(self.xt_, axis=0)
self.xt_ = self.xt_ - self.mu_t_
else:
y = self.y_
x = self.x_
xs = self.xs_
xt = self.xt_
# Fit model
results = algo.dipals(x, y.reshape(-1,1), xs, xt, self.A, self.l, heuristic=self.heuristic, target_domain=self.target_domain)
self.b_, self.T_, self.Ts_, self.Tt_, self.W_, self.P_, self.Ps_, self.Pt_, self.E_, self.Es_, self.Et_, self.Ey_, self.C_, self.opt_l_, self.discrepancy_ = results
self.is_fitted_ = True
return self
[docs]
def predict(self, X):
"""
Predict y using the fitted DIPLS model.
This method predicts the response variable for the provided test data using
the fitted domain-invariant partial least squares (di-PLS) model.
Parameters
----------
X : ndarray of shape (n_samples, n_features)
Test data matrix to perform the prediction on.
Returns
-------
yhat : ndarray of shape (n_samples_test,)
Predicted response values for the test data.
"""
if not hasattr(self, 'is_fitted_') or not self.is_fitted_:
raise NotFittedError("This DIPLS instance is not fitted yet. Call 'fit' with appropriate arguments before using this estimator.")
# Check for sparse input
if issparse(X):
raise ValueError("Sparse input is not supported. Please convert your data to dense format.")
# Validate input array
X = check_array(X, ensure_2d=True, allow_nd=False, force_all_finite=True)
# Rescale Test data
if(type(self.rescale) is str):
if(self.rescale == 'Target'):
if(type(self.xt_) is list):
if(self.target_domain==0):
Xtest = X[...,:] - self.mu_s_
else:
Xtest = X[...,:] - self.mu_t_[self.target_domain-1]
else:
Xtest = X[...,:] - self.mu_t_
elif(self.rescale == 'Source'):
Xtest = X[...,:] - self.mu_
elif(self.rescale == 'none'):
Xtest = X
elif(type(self.rescale) is np.ndarray):
Xtest = X[...,:] - np.mean(self.rescale,0)
else:
raise Exception('rescale must either be Source, Target or a Dataset')
yhat = Xtest@self.b_ + self.b0_
# Ensure the shape of yhat matches the shape of y
yhat = np.ravel(yhat)
return yhat
# Create a separate class for GCT-PLS model inheriting from class model
[docs]
class GCTPLS(DIPLS):
"""
Graph-based Calibration Transfer Partial Least Squares (GCT-PLS).
This method minimizes the distance between source (xs) and target (xt) domain data pairs in the latent variable space
while fitting the response.
Parameters
----------
A : int, default=2
Number of latent variables to use in the model.
l : float or tuple of length A, default=0
Regularization parameter. If a single value is provided, the same regularization is applied to all latent variables.
centering : bool, default=True
If True, source and target domain data are mean-centered before fitting.
heuristic : bool, default=False
If True, the regularization parameter is set to a heuristic value aimed
at balancing model fitting quality for the response variable y while minimizing
discrepancies between domain representations.
rescale : str or ndarray, default='Target'
Determines rescaling of the test data. If 'Target' or 'Source', the test data will be rescaled to the mean of xt or xs, respectively.
If an ndarray is provided, the test data will be rescaled to the mean of the provided array.
Attributes
----------
n_ : int
Number of samples in `X`.
ns_ : int
Number of samples in `xs`.
nt_ : int
Number of samples in `xt`.
n_features_in_ : int
Number of features in `X`.
mu_ : ndarray of shape (n_features,)
Mean of columns in `X`.
mu_s_ : ndarray of shape (n_features,)
Mean of columns in `xs`.
mu_t_ : ndarray of shape (n_features,)
Mean of columns in `xt`.
b_ : ndarray of shape (n_features, 1)
Regression coefficient vector.
b0_ : float
Intercept of the regression model.
T_ : ndarray of shape (n_samples, A)
Training data projections (scores).
Ts_ : ndarray of shape (n_source_samples, A)
Source domain projections (scores).
Tt_ : ndarray of shape (n_target_samples, A)
Target domain projections (scores).
W_ : ndarray of shape (n_features, A)
Weight matrix.
P_ : ndarray of shape (n_features, A)
Loadings matrix corresponding to X.
Ps_ : ndarray of shape (n_features, A)
Loadings matrix corresponding to xs.
Pt_ : ndarray of shape (n_features, A)
Loadings matrix corresponding to xt.
E_ : ndarray of shape (n_source_samples, n_features)
Residuals of source domain data.
Es_ : ndarray of shape (n_source_samples, n_features)
Source domain residual matrix.
Et_ : ndarray of shape (n_target_samples, n_features)
Target domain residual matrix.
Ey_ : ndarray of shape (n_source_samples, 1)
Residuals of response variable in the source domain.
C_ : ndarray of shape (A, 1)
Regression vector relating source projections to the response variable.
opt_l_ : ndarray of shape (A,)
Heuristically determined regularization parameter for each latent variable.
discrepancy_ : ndarray
The variance discrepancy between source and target domain projections.
is_fitted_ : bool
Whether the model has been fitted to data.
References
----------
Nikzad‐Langerodi, R., & Sobieczky, F. (2021). Graph‐based calibration transfer.
Journal of Chemometrics, 35(4), e3319.
Examples
--------
>>> import numpy as np
>>> from diPLSlib.models import GCTPLS
>>> x = np.random.rand(100, 10)
>>> y = np.random.rand(100, 1)
>>> xs = np.random.rand(80, 10)
>>> xt = np.random.rand(80, 10)
>>> model = GCTPLS(A=3, l=(2, 5, 7))
>>> model.fit(x, y, xs, xt)
GCTPLS(A=3, l=(2, 5, 7))
>>> xtest = np.array([5, 7, 4, 3, 2, 1, 6, 8, 9, 10]).reshape(1, -1)
>>> yhat = model.predict(xtest)
"""
def __init__(self, A=2, l=0, centering=True, heuristic=False, rescale='Target'):
# Model parameters
self.A = A
self.l = l
self.centering = centering
self.heuristic = heuristic
self.rescale = rescale
[docs]
def fit(self, X, y, xs=None, xt=None, **kwargs):
"""
Fit the GCT-PLS model to data.
Parameters
----------
x : ndarray of shape (n_samples, n_features)
Labeled input data from the source domain.
y : ndarray of shape (n_samples, 1)
Response variable corresponding to the input data `x`.
xs : ndarray of shape (n_sample_pairs, n_features)
Source domain X-data. If not provided, defaults to `X`.
xt : ndarray of shape (n_sample_pairs, n_features)
Target domain X-data. If not provided, defaults to `X`.
**kwargs : dict, optional
Additional keyword arguments to pass to the model (e.g.,
for model selection purposes).
Returns
-------
self : object
Fitted model instance.
"""
# Check for sparse input
if issparse(X):
raise ValueError("Sparse input is not supported. Please convert your data to dense format.")
# Validate input arrays
X, y = check_X_y(X, y, ensure_2d=True, allow_nd=False, accept_large_sparse=False, accept_sparse=False, force_all_finite=True)
# Check if source and target data are provided
if xs is None:
xs = X
if xt is None:
xt = X
# Validate source and target arrays
xs = check_array(xs, ensure_2d=True, allow_nd=False, accept_large_sparse=False, accept_sparse=False, force_all_finite=True)
xs = np.atleast_2d(xs) if xs is not None else X
if isinstance(xt, list):
xt = [check_array(x, ensure_2d=True, allow_nd=False, accept_large_sparse=False, accept_sparse=False, force_all_finite=True) for x in xt]
else:
xt = check_array(xt, ensure_2d=True, allow_nd=False, accept_large_sparse=False, accept_sparse=False, force_all_finite=True)
xt = [np.atleast_2d(x) for x in xt] if isinstance(xt, list) else np.atleast_2d(xt) if xt is not None else X
# Flatten y to 1D array
y = np.ravel(y)
# Check for complex data
if np.iscomplexobj(X) or np.iscomplexobj(y) or np.iscomplexobj(xs) or np.iscomplexobj(xt):
raise ValueError("Complex data not supported")
# Check if source and target data are provided
if xs is None:
xs = X
if xt is None:
xt = X
# Preliminaries
self.n_, self.n_features_in_ = X.shape
self.ns_, _ = xs.shape
self.nt_, _ = xt.shape
if self.ns_ != self.nt_:
raise ValueError("The number of samples in the source domain (ns) must be equal to the number of samples in the target domain (nt).")
self.x_ = X
self.y_ = y
self.xs_ = xs
self.xt_ = xt
self.b0_ = np.mean(self.y_)
self.mu_ = np.mean(self.x_, axis=0)
self.mu_s_ = np.mean(self.xs_, axis=0)
self.mu_t_ = np.mean(self.xt_, axis=0)
# Mean Centering
if self.centering is True:
x = self.x_[...,:] - self.mu_
y = self.y_ - self.b0_
else:
x = self.x_
y = self.y_
xs = self.xs_
xt = self.xt_
# Fit model and store matrices
results = algo.dipals(x, y.reshape(-1,1), xs, xt, self.A, self.l, heuristic=self.heuristic, laplacian=True)
self.b_, self.T_, self.Ts_, self.Tt_, self.W_, self.P_, self.Ps_, self.Pt_, self.E_, self.Es_, self.Et_, self.Ey_, self.C_, self.opt_l_, self.discrepancy_ = results
self.is_fitted_ = True # Set the is_fitted attribute to True
return self
[docs]
class EDPLS(DIPLS):
r'''
(\epsilon, \delta)-Differentially Private Partial Least Squares Regression.
This class implements the (\epsilon, \delta)-Differentially Private Partial Least Squares (PLS) regression method by Nikzad-Langerodi et al. (2024, unpublished).
Parameters
----------
A : int, default=2
Number of latent variables.
epsilon : float, default=1.0
Privacy loss parameter.
delta : float, default=0.05
Failure probability.
centering : bool, default=True
If True, the data will be centered before fitting the model.
random_state : int, RandomState instance or None, default=None
Controls the randomness of the noise added for differential privacy.
Attributes
----------
n_ : int
Number of samples in the training data.
n_features_in_ : int
Number of features in the training data.
x_mean_ : ndarray of shape (n_features,)
Estimated mean of each feature.
coef_ : ndarray of shape (n_features, 1)
Estimated regression coefficients.
y_mean_ : float
Estimated intercept.
x_scores_ : ndarray of shape (n_samples, A)
X scores.
x_loadings_ : ndarray of shape (n_features, A)
X loadings.
x_weights_ : ndarray of shape (n_features, A)
X weights.
y_loadings_ : ndarray of shape (A, 1)
Y loadings.
x_residuals_ : ndarray of shape (n_samples, n_features)
X residuals.
y_residuals_ : ndarray of shape (n_samples, 1)
Y residuals.
is_fitted_ : bool
True if the model has been fitted.
References
----------
- R. Nikzad-Langerodi, et al. (2024). (epsilon,delta)-Differentially private partial least squares regression (unpublished).
- Balle, B., & Wang, Y. X. (2018, July). Improving the Gaussian mechanism for differential privacy: Analytical calibration and optimal denoising. In International Conference on Machine Learning (pp. 394-403). PMLR.
Examples
--------
>>> from diPLSlib.models import EDPLS
>>> import numpy as np
>>> x = np.random.rand(100, 10)
>>> y = np.random.rand(100, 1)
>>> model = EDPLS(A=5, epsilon=0.1, delta=0.01)
>>> model.fit(x, y)
EDPLS(A=5, delta=0.01, epsilon=0.1)
>>> xtest = np.array([5, 7, 4, 3, 2, 1, 6, 8, 9, 10]).reshape(1, -1)
>>> yhat = model.predict(xtest)
'''
def __init__(self, A:int=2, epsilon:float=1.0, delta:float=0.05, centering:bool=True, random_state=None):
# Model parameters
self.A = A
self.epsilon = epsilon
self.delta = delta
self.centering = centering
self.random_state = random_state
[docs]
def fit(self, X:np.ndarray, y:np.ndarray, **kwargs):
'''
Fit the EDPLS model.
Parameters
----------
X : array, shape (n_samples, n_features)
Training data.
y : array, shape (n_samples,)
Target values.
**kwargs : dict, optional
Additional keyword arguments to pass to the model (e.g.,
for model selection purposes).
Returns
-------
self : object
Fitted model instance.
'''
### Validate input data
# Check for sparse input
if issparse(X):
raise ValueError("Sparse input is not supported. Please convert your data to dense format.")
# Validate input arrays
X, y = check_X_y(X, y, ensure_2d=True, allow_nd=False, accept_large_sparse=False, accept_sparse=False, force_all_finite=True)
# Flatten y to 1D array
y = np.ravel(y)
# Check for complex entries
if np.iscomplexobj(X) or np.iscomplexobj(y):
raise ValueError("Complex data not supported")
### Preliminaries
self.n_, self.n_features_in_ = X.shape
self.x_ = X
self.y_ = y
self.y_mean_= np.mean(self.y_)
# Mean centering
if self.centering:
self.x_mean_ = np.mean(self.x_, axis=0)
self.x_ = self.x_ - self.x_mean_
y = self.y_ - self.y_mean_
else:
y = self.y_
x = self.x_
### Fit model
rng = check_random_state(self.random_state)
results = algo.edpls(x, y.reshape(-1,1), self.A, epsilon=self.epsilon, delta=self.delta, rng=rng)
self.coef_, self.x_weights_, self.x_loadings_, self.y_loadings_, self.x_scores_, self.x_residuals_, self.y_residuals_ = results
self.is_fitted_ = True
return self
[docs]
def predict(self, x:np.ndarray):
"""
Predict y using the fitted EDPLS model.
Parameters
----------
x: numpy array of shape (n_samples_test, n_features)
Test data matrix to perform the prediction on.
Returns
-------
yhat: numpy array of shape (n_samples_test, )
Predicted response values for the test data.
"""
# Check if the model has been fitted
if not hasattr(self, 'is_fitted_') or not self.is_fitted_:
raise NotFittedError("This DIPLS instance is not fitted yet. Call 'fit' with appropriate arguments before using this estimator.")
# Check for sparse input
if issparse(x):
raise ValueError("Sparse input is not supported. Please convert your data to dense format.")
# Validate input array
x = check_array(x, ensure_2d=True, allow_nd=False, force_all_finite=True)
# Center and scale x
if self.centering is True:
x = x[...,:] - self.x_mean_
# Predict y
yhat = x@self.coef_ + self.y_mean_
# Ensure the shape of yhat matches the shape of y
yhat = np.ravel(yhat)
return yhat
def _more_tags(self):
'''
Return tags for the estimator.
'''
return {"poor_score": True}