import warnings
from typing import Iterable
import numpy as np
from cca_zoo._utils._checks import _process_parameter
from cca_zoo.linear._mcca import MCCA
[docs]
class GRCCA(MCCA):
"""
Grouped Regularized Canonical Correlation Analysis
Parameters
----------
latent_dimensions: int, default=1
Number of latent dimensions to use
copy_data: bool, default=True
Whether to copy the data
random_state: int, default=None
Random state for initialisation
eps: float, default=1e-3
Tolerance for convergence
c: float, default=0
Regularization parameter for the group means
mu: float, default=0
Regularization parameter for the group sizes
References
----------
Tuzhilina, Elena, Leonardo Tozzi, and Trevor Hastie. "Canonical correlation analysis in high dimensions with structured regularization." Statistical Modelling (2021): 1471082X211041033.
"""
def __init__(
self,
latent_dimensions: int = 1,
copy_data=True,
random_state=None,
eps=1e-3,
c: float = 0,
mu: float = 0,
):
super().__init__(
latent_dimensions=latent_dimensions,
copy_data=copy_data,
random_state=random_state,
eps=eps,
c=c,
pca=False,
)
self.mu = mu
def _check_params(self):
self.mu = _process_parameter("mu", self.mu, 0, self.n_views_)
self.c = _process_parameter("c", self.c, 0, self.n_views_)
[docs]
def fit(self, views: Iterable[np.ndarray], y=None, feature_groups=None, **kwargs):
return super().fit(views, y=y, feature_groups=feature_groups, **kwargs)
def _weights(self, eigvals, eigvecs, views, feature_groups=None, **kwargs):
# Loop through c and add group means to splits if c > 0
self.splits = [
n_features + n_groups if c > 0 else n_features
for n_features, n_groups, c in zip(
self.n_features_in_, self.n_groups_, self.c
)
]
# Add zero at the beginning and compute cumulative sum of splits
self.splits = np.insert(np.cumsum(self.splits), 0, 0)
# Slice eigenvectors according to splits
self.weights_ = [
eigvecs[split:next_split]
for split, next_split in zip(self.splits[:-1], self.splits[1:])
]
# Adjust weights_ for each view based on group means and mu parameters
for i, view in enumerate(views):
if self.c[i] > 0:
weights_1 = self.weights_[i][: -self.n_groups_[i]]
weights_2 = self.weights_[i][-self.n_groups_[i] :]
ids, unique_inverse, unique_counts, group_means = self._group_mean(
weights_1.T, feature_groups[i]
)
weights_1 = (weights_1 - group_means[:, unique_inverse].T) / self.c[i]
mu = 1 if self.mu[i] == 0 else self.mu[i]
weights_2 = weights_2 / np.sqrt(
mu * np.expand_dims(unique_counts, axis=1)
)
self.weights_[i] = weights_1 + weights_2[unique_inverse]
def _process_data(self, views, feature_groups=None, **kwargs):
# Use all features if no feature groups are provided
if feature_groups is None:
warnings.warn("No feature groups provided, using all features")
feature_groups = [np.ones(view.shape[1], dtype=int) for view in views]
# Check that feature groups are integers
for feature_group in feature_groups:
assert np.issubdtype(
feature_group.dtype, np.integer
), "feature groups must be integers"
# Number of unique groups in each view
self.n_groups_ = [np.unique(group).shape[0] for group in feature_groups]
# Process each view and return a list of processed representations and indices
return [
self._process_view(view, group, mu, c)
for view, group, mu, c in zip(views, feature_groups, self.mu, self.c)
]
def _process_view(self, view, group, mu, c):
"""
Process a single view by subtracting group means and adding them as new features.
Parameters
----------
view: numpy array or array like with shape (n_samples, n_features)
The view to be processed.
group: numpy array or array like with shape (n_features,)
The feature group labels for the view.
mu: float
The regularization parameter for the group means.
c: float
The regularization parameter for the view features.
Returns
-------
view: numpy array with shape (n_samples, n_features + n_groups)
The processed view with group means added as new features.
"""
if c > 0:
(
ids,
unique_inverse,
unique_counts,
group_means,
) = self._group_mean(view, group)
mu = 1 if mu == 0 else mu
view_1 = (view - group_means[:, unique_inverse]) / c
view_2 = group_means / np.sqrt(mu / unique_counts)
return np.hstack((view_1, view_2))
else:
return view
def _more_tags(self):
return {"multiview": True}
@staticmethod
def _group_mean(view, group):
"""
Compute the mean of each feature group in a view.
Parameters
----------
view: numpy array or array like with shape (n_samples, n_features)
The view to compute the group means from.
group: numpy array or array like with shape (n_features,)
The feature group labels for the view.
Returns
-------
ids: numpy array with shape (n_groups,)
The unique feature group ids.
unique_inverse: numpy array with shape (n_features,)
The indices to reconstruct the original group array from the unique ids.
unique_counts: numpy array with shape (n_groups,)
The number of occurrences of each unique id in the group array.
group_means: numpy array with shape (n_samples, n_groups)
The mean of each feature group in the view.
"""
ids, unique_inverse, unique_counts = np.unique(
group, return_inverse=True, return_counts=True
)
# Use axis argument to compute mean along columns for each group
group_means = np.array([view[:, group == id].mean(axis=1) for id in ids]).T
return ids, unique_inverse, unique_counts, group_means