Source code for cca_zoo.linear._mcca

from typing import Iterable, Union

import numpy as np
from scipy.linalg import block_diag, eigh
from sklearn.decomposition import PCA

from cca_zoo._base import _BaseModel
from cca_zoo._utils._checks import _process_parameter
from cca_zoo._utils._cross_correlation import cross_cov


[docs] class MCCA(_BaseModel): r""" A class used to fit a Regularised CCA (canonical ridge) model. This model adds a regularization term to the CCA objective function to avoid overfitting and improve stability. It uses PCA to perform the optimization efficiently for high dimensional data. The objective function of regularised CCA is: .. math:: w_{opt}=\underset{w}{\mathrm{argmax}}\{ w_1^TX_1^TX_2w_2 \}\\ \text{subject to:} (1-c_1)w_1^TX_1^TX_1w_1+c_1w_1^Tw_1=n (1-c_2)w_2^TX_2^TX_2w_2+c_2w_2^Tw_2=n where :math:`c_i` are the regularization parameters for each view. Parameters ---------- latent_dimensions : int, optional Number of latent dimensions to use, by default 1 copy_data : bool, optional Whether to copy the data, by default True random_state : int, optional Random state, by default None c : Union[Iterable[float], float], optional Regularisation parameter, by default None accept_sparse : Union[bool, str], optional Whether to accept sparse data, by default None Examples -------- >>> import numpy as np >>> rng=np.random.RandomState(0) >>> X1 = rng.random((10,5)) >>> X2 = rng.random((10,5)) >>> model = MCCA() >>> model.fit((X1,X2)).score((X1,X2)) References -------- Vinod, Hrishikesh _D. "Canonical ridge and econometrics of joint production." Journal of econometrics 4.2 (1976): 147-166. """ def __init__( self, latent_dimensions: int = 1, copy_data=True, random_state=None, c: Union[Iterable[float], float] = None, accept_sparse=None, eps: float = 1e-6, pca: bool = True, ): # Set the default value for accept_sparse if accept_sparse is None: accept_sparse = ["csc", "csr"] # Call the parent class constructor super().__init__( latent_dimensions=latent_dimensions, copy_data=copy_data, accept_sparse=accept_sparse, random_state=random_state, ) # Store the c parameter self.c = c self.eps = eps self.pca = pca def _check_params(self): # Process the c parameter for each view self.c = _process_parameter("c", self.c, 0, self.n_views_)
[docs] def fit(self, views: Iterable[np.ndarray], y=None, **kwargs): # Validate the input data views = self._validate_data(views) # Check the parameters self._check_params() views = self._process_data(views, **kwargs) eigvals, eigvecs = self._solve_gevp(views, y=y, **kwargs) # Compute the weights_ for each view self._weights(eigvals, eigvecs, views, **kwargs) # delete pca to save memory if self.pca: del self.pca_models return self
def _process_data(self, views, **kwargs): if self.pca: views = self._apply_pca(views) return views def _solve_gevp(self, views: Iterable[np.ndarray], y=None, **kwargs): # Setup the eigenvalue problem C = self._C(views, **kwargs) D = self._D(views, **kwargs) self.splits = np.cumsum([view.shape[1] for view in views]) # Solve the eigenvalue problem # Get the dimension of _C p = C.shape[0] # Solve the generalized eigenvalue problem Cx=lambda Dx using a subset of eigenvalues and eigenvectors [eigvals, eigvecs] = eigh( C, D, subset_by_index=[p - self.latent_dimensions, p - 1], ) # Sort the eigenvalues and eigenvectors in descending order idx = np.argsort(eigvals, axis=0)[::-1] if eigvals.shape[0] < self.latent_dimensions: [eigvals, eigvecs] = eigh( C, D, ) # Sort the eigenvalues and eigenvectors in descending order idx = np.argsort(eigvals, axis=0)[::-1][: self.latent_dimensions] eigvecs = eigvecs[:, idx].real eigvals = eigvals[idx].real return eigvals, eigvecs def _weights(self, eigvals, eigvecs, views, **kwargs): # split eigvecs into weights_ for each view self.weights_ = np.split(eigvecs, self.splits[:-1], axis=0) if self.pca: # go from weights_ in PCA space to weights_ in original space self.weights_ = [ pca.components_.T @ self.weights_[i] for i, pca in enumerate(self.pca_models) ] def _apply_pca(self, views): """ Do data driven PCA on each view """ self.pca_models = [PCA() for _ in views] # Fit PCA on each view return [self.pca_models[i].fit_transform(view) for i, view in enumerate(views)] def _C(self, views, **kwargs): all_views = np.hstack(views) C = np.cov(all_views, rowvar=False) C -= block_diag(*[np.cov(view, rowvar=False) for view in views]) return C / len(views) def _D(self, views, **kwargs): if self.pca: # Can regularise by adding to diagonal D = block_diag( *[ np.diag((1 - self.c[i]) * pc.explained_variance_ + self.c[i]) for i, pc in enumerate(self.pca_models) ] ) else: D = block_diag( *[ (1 - self.c[i]) * np.cov(view, rowvar=False) + self.c[i] * np.eye(view.shape[1]) for i, view in enumerate(views) ] ) D_smallest_eig = min(0, np.linalg.eigvalsh(D).min()) - self.eps D = D - D_smallest_eig * np.eye(D.shape[0]) return D / len(views) def _more_tags(self): # Indicate that this class is for multiview data return {"multiview": True}
[docs] class rCCA(MCCA): r""" A class used to fit Regularised CCA (canonical ridge) model. This model adds a regularization term to the CCA objective function to avoid overfitting and improve stability. It uses PCA to perform the optimization efficiently for high dimensional data. The objective function of regularised CCA is: .. math:: w_{opt}=\underset{w}{\mathrm{argmax}}\{ w_1^TX_1^TX_2w_2 \}\\ \text{subject to:} (1-c_1)w_1^TX_1^TX_1w_1+c_1w_1^Tw_1=n (1-c_2)w_2^TX_2^TX_2w_2+c_2w_2^Tw_2=n where :math:`c_i` are the regularization parameters for each view. Examples -------- >>> import numpy as np >>> rng=np.random.RandomState(0) >>> X1 = rng.random((10,5)) >>> X2 = rng.random((10,5)) >>> model = rCCA(c=0.1) >>> model.fit((X1,X2)).score((X1,X2)) References -------- Vinod, Hrishikesh _D. "Canonical ridge and econometrics of joint production." Journal of econometrics 4.2 (1976): 147-166. """ def _C(self, views, **kwargs): if len(views) != 2: raise ValueError( f"Model can only be used with two representations, but {len(views)} were given. Use MCCA or GCCA instead for CCA or MPLS for PLS." ) if self.pca: # Compute the B matrices for each view B = [ (1 - self.c[i]) * pc.explained_variance_ + self.c[i] for i, pc in enumerate(self.pca_models) ] C = cross_cov( views[0] / np.sqrt(B[0]), views[1] / np.sqrt(B[1]), rowvar=False ) self.primary_view = 0 return C @ C.T else: # cholesky decomposition of views self.L0 = np.linalg.inv( np.linalg.cholesky( (1 - self.c[0]) * np.cov(views[0], rowvar=False) + (self.c[0] + self.eps) * np.eye(views[0].shape[1]) ) ) self.L1 = np.linalg.inv( np.linalg.cholesky( (1 - self.c[1]) * np.cov(views[1], rowvar=False) + (self.c[1] + self.eps) * np.eye(views[1].shape[1]) ) ) C = cross_cov(views[0], views[1], rowvar=False) if views[0].shape[1] <= views[1].shape[1]: self.primary_view = 0 self.T = self.L0 @ C @ self.L1 @ self.L1.T @ C.T @ self.L0.T return self.T else: self.primary_view = 1 self.T = self.L1 @ C.T @ self.L0 @ self.L0.T @ C @ self.L1.T return self.T def _D(self, views, **kwargs): return None def _weights(self, eigvals, eigvecs, views): self.weights_ = [None] * 2 if self.pca: B = [ (1 - self.c[i]) * pc.singular_values_**2 / self.n_samples_ + self.c[i] for i, pc in enumerate(self.pca_models) ] C = np.cov( views[self.primary_view], views[1 - self.primary_view], rowvar=False )[ 0 : views[self.primary_view].shape[1], views[self.primary_view].shape[1] :, ] # Compute the weight matrix for primary view self.weights_[1 - self.primary_view] = ( # Project view 1 onto its principal components self.pca_models[1 - self.primary_view].components_.T # Scale by the inverse of B[0] @ np.diag(1 / B[1 - self.primary_view]) # Multiply by the cross-covariance matrix @ C.T # Scale by the inverse of the square root of B[1] @ np.diag(1 / np.sqrt(B[self.primary_view])) # Multiply by the eigenvectors @ eigvecs # Scale by the inverse of the square root of eigenvalues / np.sqrt(eigvals) ) # Compute the weight matrix for view 2 self.weights_[self.primary_view] = ( # Project view 2 onto its principal components self.pca_models[self.primary_view].components_.T # Scale by the inverse of the square root of B[1] @ np.diag(1 / np.sqrt(B[self.primary_view])) # Multiply by the eigenvectors @ eigvecs ) else: if self.primary_view == 0: self.weights_[0] = self.L0.T @ eigvecs self.weights_[1] = ( (self.L1.T @ self.L1) @ cross_cov(views[1], views[0], rowvar=False) @ self.weights_[0] ) else: self.weights_[1] = self.L1.T @ eigvecs self.weights_[0] = ( (self.L0.T @ self.L0) @ cross_cov(views[0], views[1], rowvar=False) @ self.weights_[1] ) def _more_tags(self): # Inherit all tags from MCCA but override the multiview tag tags = super()._more_tags() tags["multiview"] = False return tags
[docs] class CCA(rCCA): r""" A class used to fit a simple CCA model. This model finds the linear projections of two representations that maximize their correlation. The objective function of CCA is: .. math:: w_{opt}=\underset{w}{\mathrm{argmax}}\{ w_1^TX_1^TX_2w_2 \}\\ \text{subject to:} w_1^TX_1^TX_1w_1=n w_2^TX_2^TX_2w_2=n Parameters ---------- latent_dimensions : int, optional Number of latent dimensions to use, by default 1 copy_data : bool, optional Whether to copy the data, by default True random_state : int, optional Random seed for reproducibility, by default None References -------- Hotelling, Harold. "Relations between two sets of variates." Breakthroughs in statistics. Springer, New York, NY, 1992. 162-190. Example ------- >>> import numpy as np >>> rng=np.random.RandomState(0) >>> X1 = rng.random((10,5)) >>> X2 = rng.random((10,5)) >>> model = CCA() >>> model.fit((X1,X2)).score((X1,X2)) """ def __init__( self, latent_dimensions: int = 1, copy_data=True, random_state=None, accept_sparse=None, eps: float = 1e-6, pca: bool = True, ): # Initialize the rCCA class with c set to 0 super().__init__( latent_dimensions=latent_dimensions, copy_data=copy_data, random_state=random_state, c=0, # Setting c to 0 accept_sparse=accept_sparse, eps=eps, pca=pca, )