Source code for cca_zoo.linear._iterative._elastic

from typing import Union, Iterable

import numpy as np
from sklearn.linear_model import ElasticNet, Lasso, Ridge, SGDRegressor

from cca_zoo._utils._checks import _process_parameter
from cca_zoo.linear._iterative._base import _BaseIterative
from cca_zoo.linear._iterative._deflation import _DeflationMixin


# Define SCCA_Elastic class
[docs] class ElasticCCA(_DeflationMixin, _BaseIterative): def __init__( self, latent_dimensions: int = 1, copy_data=True, random_state=None, tol=1e-3, accept_sparse=None, epochs=100, initialization: Union[str, callable] = "uniform", early_stopping=False, verbose=None, alpha=None, # regularization parameter for Elastic l1_ratio=None, # ratio of L1 to L2 penalty for Elastic positive=None, # whether to enforce positive coefficients for Elastic stochastic=False, # whether to use stochastic gradient descent for Elastic ): super().__init__( latent_dimensions=latent_dimensions, copy_data=copy_data, random_state=random_state, tol=tol, accept_sparse=accept_sparse, epochs=epochs, initialization=initialization, early_stopping=early_stopping, verbose=verbose, ) self.alpha = alpha self.l1_ratio = l1_ratio self.positive = positive self.stochastic = stochastic def _check_params(self): self.alpha = _process_parameter("alpha", self.alpha, 0, self.n_views_) self.l1_ratio = _process_parameter("l1_ratio", self.l1_ratio, 0, self.n_views_) self.positive = _process_parameter( "positive", self.positive, False, self.n_views_ ) self.regressors = initialize_regressors( self.alpha, self.l1_ratio, self.positive, self.stochastic, self.tol, self.random_state, ) def _update_weights(self, views: Iterable[np.ndarray], i: int): # Update the weights_ for the current view using Elastic # Get the scores of all representations scores = np.stack(self.transform(views)) # Compute the target by summing the scores along dim 0 and dividing by the square root of the covariance of # the target target = np.sum(scores, axis=0) target = target / np.linalg.norm(target) # Loop over the representations and fit each regressor to the view and the target self.regressors[i] = self.regressors[i].fit(views[i], target) # Update the weights_ with the coefficients of each regressor new_weights = np.atleast_2d(self.regressors[i].coef_).T # Return the updated weights_ return new_weights def _objective(self, views: Iterable[np.ndarray]): scores = np.stack(self.transform(views)) objective = 0 for view_index, view in enumerate(views): # create a mask that is True for elements not equal to k along dim k mask = np.arange(scores.shape[0]) != view_index # apply the mask to scores and sum along dim k target = np.sum(scores[mask], axis=0) objective += elastic_objective( scores[view_index], target, self.regressors[view_index].coef_, self.alpha[view_index], self.l1_ratio[view_index], ) return objective
# Define SCCA_IPLS class
[docs] class SCCA_IPLS(_DeflationMixin, _BaseIterative): def __init__( self, latent_dimensions: int = 1, copy_data=True, random_state=None, tol=1e-3, accept_sparse=None, epochs=100, initialization: Union[str, callable] = "uniform", early_stopping=False, verbose=True, alpha=None, # regularization parameter for Elastic l1_ratio=1, # ratio of L1 to L2 penalty for Elastic positive=None, # whether to enforce positive coefficients for Elastic stochastic=False, # whether to use stochastic gradient descent for Elastic ): super().__init__( latent_dimensions=latent_dimensions, copy_data=copy_data, random_state=random_state, tol=tol, accept_sparse=accept_sparse, epochs=epochs, initialization=initialization, early_stopping=early_stopping, verbose=verbose, ) self.alpha = alpha self.l1_ratio = l1_ratio self.positive = positive self.stochastic = stochastic def _check_params(self): self.alpha = _process_parameter("alpha", self.alpha, 0, self.n_views_) self.positive = _process_parameter( "positive", self.positive, False, self.n_views_ ) self.l1_ratio = _process_parameter("l1_ratio", self.l1_ratio, 1, self.n_views_) self.regressors = initialize_regressors( self.alpha, self.l1_ratio, self.positive, self.stochastic, self.tol, self.random_state, ) def _update_weights(self, views: Iterable[np.ndarray], i: int): # Update the weights_ for the current view using IPLS # Get the scores of all representations scores = np.stack(self.transform(views)) # Create a mask that is True for elements not equal to j along dim j mask = np.arange(scores.shape[0]) != i # Apply the mask to scores and sum along dim j target = np.sum(scores[mask], axis=0) # Solve the elastic regression self.regressors[i] = self.regressors[i].fit(views[i], target) # Normalize the coefficients by dividing by the square root of the covariance of the view and the coefficients self.regressors[i].coef_ /= np.sqrt(np.cov(views[i] @ self.regressors[i].coef_)) # Update the weights_ with the coefficients of each regressor new_weights = self.regressors[i].coef_ # Return the updated weights_ return new_weights[:, None] def _objective(self, views: Iterable[np.ndarray]): scores = np.stack(self.transform(views)) objective = 0 for view_index, view in enumerate(views): # create a mask that is True for elements not equal to k along dim k mask = np.arange(scores.shape[0]) != view_index # apply the mask to scores and sum along dim k target = np.sum(scores[mask], axis=0) objective += elastic_objective( scores[view_index], target, self.regressors[view_index].coef_, self.alpha[view_index], self.l1_ratio[view_index], ) return objective
def initialize_regressors(alpha, l1_ratio, positive, stochastic, tol, random_state): regressors = [] for alpha, l1_ratio, positive in zip(alpha, l1_ratio, positive): if stochastic: regressors.append( SGDRegressor( penalty="elasticnet", alpha=alpha, l1_ratio=l1_ratio, fit_intercept=False, tol=tol, warm_start=True, random_state=random_state, ) ) elif l1_ratio == 0: regressors.append( Ridge( alpha=alpha, fit_intercept=False, positive=positive, random_state=random_state, tol=tol, ) ) elif l1_ratio == 1: regressors.append( Lasso( alpha=alpha, fit_intercept=False, warm_start=True, positive=positive, random_state=random_state, tol=tol, selection="random", ) ) else: regressors.append( ElasticNet( alpha=alpha, l1_ratio=l1_ratio, fit_intercept=False, warm_start=True, positive=positive, random_state=random_state, tol=tol, selection="random", ) ) return regressors def elastic_objective(z, y, w, alpha, l1_ratio): n = len(y) objective = np.linalg.norm(z - y) ** 2 / (2 * n) l1_pen = alpha * l1_ratio * np.linalg.norm(w, ord=1) l2_pen = alpha * (1 - l1_ratio) * np.linalg.norm(w, ord=2) return objective + l1_pen + l2_pen