Source code for tissue_purifier.models.classifier_regressor.scikit_learn_interface

from typing import List, Union, Tuple, Any
import torch
import torch.utils.data
import torch.nn.functional
import numpy
from abc import ABC
from pytorch_lightning import LightningModule
from pytorch_lightning.trainer import Trainer
from sklearn.metrics import r2_score, accuracy_score
from ._pl_clean import PlMlpClean
from ._pl_noisy import PlMlpNoisy


[docs]class BaseEstimator(ABC): """ Abstract Base Class which implements an interface similar to the MLP classifier/regressor in scikit-learn. The classes :class:`PlRegressor` and :class:`PlClassifier` inherit from this class. """ def __init__( self, # architecture hidden_dims: List[int] = None, hidden_activation: str = 'relu', # training batch_size: int = 256, # optimizers solver: str = 'adam', alpha: float = 0.99, momentum: float = 0.9, betas: Tuple[float, float] = (0.9, 0.999), # protocoll warm_up_epochs: int = 0, warm_down_epochs: int = 0, max_epochs: int = 200, min_learning_rate: float = 1.0E-4, max_learning_rate: float = 1.0E-3, min_weight_decay: float = 1.0E-4, max_weight_decay: float = 1.0E-4, **kargs, ): """ Args: hidden_dims: the size of the intermediate layer of the MLP. Default is empty list corresponding to linear prediction. hidden_activation: Either 'relu' or 'leaky_relu' it is the activation of the middle layers. batch_size: bath size solver: Either 'adam' (default) or 'sgd' or 'rmsprop'. The type of optimization to use. alpha: parameters for the rmsprop optimizer (used only if :attr:`solver` is 'rmsprop') momentum: parameter for sgd optimizer (used only if :attr:`solver` is 'sgd') betas: parameters for the adam optimizer (used only if :attr:`solver` is 'adam') warm_up_epochs: epochs during which to linearly increase learning rate (at the beginning of training) warm_down_epochs: epochs during which to anneal learning rate with cosine protocoll (at the end of training) max_epochs: total number of epochs min_learning_rate: minimum learning rate (at the very beginning and end of training) max_learning_rate: maximum learning rate (after linear ramp) min_weight_decay: minimum weight decay (during the entirety of the linear ramp) max_weight_decay: maximum weight decay (reached at the end of training) kargs: unused parameters """ super().__init__() assert hidden_dims is None or isinstance(hidden_dims, List), \ "Error. hidden_dim must be None or a list of int. Received {0}".format(hidden_dims) self.hidden_dims = hidden_dims if hidden_activation == 'relu': self.hidden_activation = torch.nn.ReLU(inplace=True) elif hidden_activation == 'leaky_relu': self.hidden_activation = torch.nn.LeakyReLU(negative_slope=0.01, inplace=True) else: raise NotImplementedError # optimizer stuff self.solver = solver self.batch_size = batch_size self.betas = betas self.alpha = alpha self.momentum = momentum # protocoll self.max_epochs = max_epochs self.warm_up_epochs = warm_up_epochs self.warm_down_epochs = warm_down_epochs self.min_learning_rate = min_learning_rate self.max_learning_rate = max_learning_rate self.min_weight_decay = min_weight_decay self.max_weight_decay = max_weight_decay # loss self._pl_net = None self._is_fit = False def create_trainer(self): return Trainer( logger=False, num_nodes=1, # uses a single machine possibly with many gpus, gpus=1 if torch.cuda.device_count() > 0 else None, check_val_every_n_epoch=-1, num_sanity_val_steps=0, max_epochs=self.max_epochs, num_processes=1, accelerator=None) @property def pl_net(self) -> LightningModule: assert self._pl_net is not None, "Error. You need to initialize mlp before accessing it." return self._pl_net @property def loss_(self): return None if self.pl_net is None else self.pl_net.loss_ @property def loss_curve_(self): return None if self.pl_net is None else self.pl_net.loss_curve_ @torch.no_grad() def _to_torch_tensor(self, x): """ Convert stuff to torch tensors. Useful for training to use pytorch, GPUs""" if isinstance(x, torch.Tensor): return x elif isinstance(x, numpy.ndarray): return torch.from_numpy(x) elif isinstance(x, list): return torch.Tensor(x) else: raise Exception("unexpected type in _to_torch_tensor", type(x)) @torch.no_grad() def _to_numpy(self, x): """ Convert stuff to numpy array. Useful for labels (which might be string) and for saving results. """ if isinstance(x, torch.Tensor): return x.detach().cpu().numpy() elif isinstance(x, numpy.ndarray): return x elif isinstance(x, list): return numpy.array(x) else: raise Exception("unexpected type in _to_numpy", type(x)) @torch.no_grad() def _make_integer_labels( self, labels, classes: Union[List[Any], numpy.ndarray] = None) -> (torch.Tensor, numpy.ndarray): """ Returns: integer_labels: torch.Tensor with labels converted to integers classes: numpy.array with the classes """ classes_np = numpy.unique(self._to_numpy(labels)) if classes is None else self._to_numpy(classes) assert isinstance(classes_np, numpy.ndarray) and len(classes_np.shape) == 1 # mapping labels to int_labels class_to_int_dict = dict(zip(classes_np, range(classes_np.shape[0]))) labels_np = self._to_numpy(labels) integer_labels_torch = torch.tensor([class_to_int_dict[label] for label in labels_np]) return integer_labels_torch, classes_np @property def is_classifier(self) -> bool: """ Return True if it is a classifier. For compatibility with scikit-learn interface. """ raise NotImplementedError @property def is_regressor(self) -> bool: """ Return True if it is a regressor. For compatibility with scikit-learn interface. """ raise NotImplementedError def create_pl_net(self, input_dim, output_dim) -> LightningModule: raise NotImplementedError def fit(self, X, y): raise NotImplementedError def predict(self, X) -> numpy.ndarray: raise NotImplementedError def score(self, X, y) -> float: raise NotImplementedError
[docs]class MlpRegressor(BaseEstimator): """ Mlp regressor with interface similar to scikit-learn but able to run on GPUs. """ def __init__(self, output_activation: torch.nn.Module = torch.nn.Identity(), **kargs): """ Args: output_activation: the activation to apply to the output activation to make the prediction. If `y` is unbounded use the Identity. If :math:`y \\in (0, 1)` use a Sigmoid function, etc. kargs: parameters passed to :class:`BaseEstimator` """ self.output_activation = output_activation super().__init__(**kargs) @property def is_classifier(self): """ Returns False. For compatibility with scikit-learn interface. """ return False @property def is_regressor(self): """ Returns True. For compatibility with scikit-learn interface. """ return True def create_pl_net(self, input_dim, output_dim): return PlMlpClean( criterium=torch.nn.MSELoss(reduction='mean'), input_dim=input_dim, output_dim=output_dim, hidden_dims=self.hidden_dims, hidden_activation=self.hidden_activation, output_activation=self.output_activation, # optimizer solver=self.solver, betas=self.betas, momentum=self.momentum, alpha=self.alpha, # protocoll max_epochs=self.max_epochs, warm_up_epochs=self.warm_up_epochs, warm_down_epochs=self.warm_down_epochs, min_learning_rate=self.min_learning_rate, max_learning_rate=self.max_learning_rate, min_weight_decay=self.min_weight_decay, max_weight_decay=self.max_weight_decay, )
[docs] def fit(self, X, y) -> None: """ Fit the model. Args: X: independent variable of shape :math:`(n, *)` y: dependent variable of shape :math:`(n)` """ X = self._to_torch_tensor(X) y = self._to_torch_tensor(y) if len(y.shape) == 1: y.unsqueeze_(dim=-1) assert X.shape[:-1] == y.shape[:-1] index = torch.arange(y.shape[0], dtype=torch.long, device=y.device) if torch.cuda.device_count(): X = X.cuda() y = y.cuda() index = index.cuda() train_dataset = torch.utils.data.TensorDataset(X, y, index) train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True) self._pl_net = self.create_pl_net(input_dim=X.shape[-1], output_dim=y.shape[-1]) trainer = self.create_trainer() trainer.fit(model=self.pl_net, train_dataloaders=train_loader) self._is_fit = True
@torch.no_grad() def predict(self, X) -> numpy.ndarray: """ Run the model forward to obtain the predictions, i.e. :math:`y_\\text{pred} = \\text{model}(X)`. Args: X: independent variable of shape :math:`(n, *)` Returns: y: the predicted values of shape :math:`(n)` """ assert self._is_fit, "Error. Need to run fit method before you can use the predict method" X = self._to_torch_tensor(X) assert X.shape[-1] == self.pl_net.input_dim, \ "Dimension mistmatch {0} vs {1}".format(X.shape[1], self.pl_net.input_dim) if torch.cuda.device_count(): X = X.cuda() pl_net_tmp = self.pl_net.cuda() else: pl_net_tmp = self.pl_net predictions = [] n1, n_max = 0, X.shape[0] while n1 < n_max: n2 = min(n_max, n1 + self.batch_size) y_hat = pl_net_tmp(X[n1:n2]) n1 = n2 predictions.append(y_hat) return torch.cat(predictions, dim=0).squeeze(dim=-1).cpu().numpy() @torch.no_grad() def score(self, X, y) -> float: """ Compute the predictions, i.e. :math:`y_\\text{pred} = \\text{model}(X)`, and score them against the true values `y`. Args: X: independent variable of shape :math:`(n, *)` y: dependent variable of shape :math:`(n)` Returns: score: R^2 (coefficient of determination) between :math:`y_\\text{pred}` and `y`. """ assert self._is_fit, "Error. Need to run fit method before you can use the score method" X = self._to_torch_tensor(X) y = self._to_torch_tensor(y) if len(y.shape) == 1: y.unsqueeze_(dim=-1) assert X.shape[0] == y.shape[0], "Dimension mistmatch X={0}, y={1}".format(X.shape, y.shape) assert X.shape[-1] == self.pl_net.input_dim, \ "Dimension mistmatch {0} vs {1}".format(X.shape[1], self.pl_net.input_dim) assert y.shape[-1] == self.pl_net.output_dim, \ "Dimension mistmatch {0} vs {1}".format(y.shape[1], self.pl_net.output_dim) with torch.no_grad(): y_pred = self.predict(X) return r2_score( y_true=y.squeeze(-1).detach().cpu().numpy(), y_pred=y_pred)
[docs]class MlpClassifier(BaseEstimator): """ Mlp classifier with interface similar to scikit-learn but able to run on GPUs. It can performs classification with noisy labels following the method described in `Unsupervised Label Noise Modeling and Loss Correction <https://arxiv.org/abs/1904.11238>`_ According to this method, the labels are dynamically corrected according to the formula: :math:`l_\\text{new} = (1.0-w) \\times l_\\text{old} + w \\times p_\\text{net}` where :math:`l_\\text{old}` are the noisy (and one-hot) original labels, :math:`p_\\text{net}` are the probabilities computed by the neural network and `w` is the probability of label being incorrect. `w` is computed by solving the assignment problem for a 2-component Mixture Model. This is based on the idea that correct (incorrect) labels will lead to small (large) losses. Therefore correct labels will belong to the low-loss component and incorrect label will belong to the high-loss component. """ def __init__( self, # special parameters for the noise label situation noisy_labels: bool = False, bootstrap_epoch_start: int = 100, lambda_reg: float = 1.0, hard_bootstrapping: bool = False, **kargs): """ Args: noisy_labels: if True (default si False) performs classification with noisy labels. bootstrap_epoch_start: used only if :attr:`noisy_labels` == True. At which epoch to start to dynamically correct the labels lambda_reg: used only if :attr:`noisy_labels` == True. Strength of the regularization which prevents the corrected labels from collapsing to a single class hard_bootstrapping: used only if :attr:`noisy_labels` == True. If True (default is Flase) the network probabilities are made one-hot before using them to update the classification labels kargs: any parameter passed to :class:`BaseEstimator` such as max_iter, solver, ... """ # spacial parameters which will be used only if noisy_labels == True self.noisy_labels = noisy_labels self.bootstrap_epoch_start = bootstrap_epoch_start self.lambda_reg = lambda_reg self.hard_bootstrapping = hard_bootstrapping # standard parameters self._classes_np = None self.output_activation = torch.nn.Identity() # return the raw logit super().__init__(**kargs) @property def is_classifier(self): """ Returns True. For compatibility with scikit-learn interface. """ return True @property def is_regressor(self): """ Returns False. For compatibility with scikit-learn interface. """ return False def create_mlp(self, input_dim, output_dim): if self.noisy_labels: return PlMlpNoisy( input_dim=input_dim, output_dim=output_dim, hidden_dims=self.hidden_dims, hidden_activation=self.hidden_activation, # optimizer solver=self.solver, betas=self.betas, momentum=self.momentum, alpha=self.alpha, # loss lambda_reg=self.lambda_reg, hard_bootstrapping=self.hard_bootstrapping, bootstrap_epoch_start=self.bootstrap_epoch_start, # protocoll max_epochs=self.max_epochs, warm_up_epochs=self.warm_up_epochs, warm_down_epochs=self.warm_down_epochs, min_learning_rate=self.min_learning_rate, max_learning_rate=self.max_learning_rate, min_weight_decay=self.min_weight_decay, max_weight_decay=self.max_weight_decay) else: return PlMlpClean( criterium=torch.nn.CrossEntropyLoss(reduction='mean'), input_dim=input_dim, output_dim=output_dim, hidden_dims=self.hidden_dims, hidden_activation=self.hidden_activation, output_activation=self.output_activation, # optimizer solver=self.solver, betas=self.betas, momentum=self.momentum, alpha=self.alpha, # protocoll max_epochs=self.max_epochs, warm_up_epochs=self.warm_up_epochs, warm_down_epochs=self.warm_down_epochs, min_learning_rate=self.min_learning_rate, max_learning_rate=self.max_learning_rate, min_weight_decay=self.min_weight_decay, max_weight_decay=self.max_weight_decay)
[docs] def fit(self, X, y): """ Fit the model. Args: X: independent variable of shape :math:`(n, *)` y: dependent variable of shape :math:`(n)` """ X = self._to_torch_tensor(X) labels_torch, self._classes_np = self._make_integer_labels(y) self._pl_net = self.create_mlp(input_dim=X.shape[-1], output_dim=self._classes_np.shape[0]) index = torch.arange(labels_torch.shape[0], dtype=torch.long, device=labels_torch.device) if torch.cuda.device_count(): X = X.cuda() labels_torch = labels_torch.cuda() index = index.cuda() train_dataset = torch.utils.data.TensorDataset(X.float(), labels_torch.long(), index) train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True) trainer = self.create_trainer() trainer.fit(model=self.pl_net, train_dataloaders=train_loader) self._is_fit = True
@torch.no_grad() def get_all_logits(self, X) -> torch.Tensor: raw_logit_list = [] n1, n_max = 0, X.shape[0] if torch.cuda.is_available(): pl_net_tmp = self.pl_net.cuda() X = X.cuda() else: pl_net_tmp = self.pl_net while n1 < n_max: n2 = min(n_max, n1 + self.batch_size) raw_logit = pl_net_tmp(X[n1:n2]) n1 = n2 raw_logit_list.append(raw_logit) raw_logit_all_torch = torch.cat(raw_logit_list, dim=0) return raw_logit_all_torch @torch.no_grad() def predict(self, X) -> numpy.ndarray: """ Run the model forward to obtain the predictions, i.e. :math:`y_\\text{pred} = \\text{model}(X)`. Args: X: independent variable of shape :math:`(n, *)` Returns: y: the predicted values of shape :math:`(n)` """ assert self._is_fit, "Error. Need to run fit method before you can use the predict method" X = self._to_torch_tensor(X).float() assert X.shape[-1] == self.pl_net.input_dim, "Dimension mistmatch" raw_logit_all_torch = self.get_all_logits(X) labels = torch.argmax(raw_logit_all_torch, dim=-1).cpu().numpy() return self._classes_np[labels] @torch.no_grad() def score(self, X, y) -> float: """ Compute the predictions, i.e. :math:`y_\\text{pred} = \\text{model}(X)`, and score them against the true values `y`. Args: X: independent variable of shape :math:`(n, *)` y: dependent variable of shape :math:`(n)` Returns: accuracy: Accuracy classification score """ assert self._is_fit, "Error. Need to run fit method before you can use the score method" X = self._to_torch_tensor(X) y_true_np = self._to_numpy(y) assert X.shape[0] == y_true_np.shape[0], \ "Dimension mistmatch X={0}, labels={1}".format(X.shape, y_true_np.shape) assert X.shape[-1] == self.pl_net.input_dim, \ "Dimension mistmatch {0} vs {1}".format(X.shape[1], self.pl_net.input_dim) y_pred_np = self.predict(X) return accuracy_score(y_true_np, y_pred_np) @torch.no_grad() def predict_proba(self, X) -> numpy.ndarray: """ Compute the probabilities for all the classes. Args: X: independent variable of shape :math:`(n, *)` Returns: prob: Probability of all the classes of shape :math:`(n, C)` where `C` is the number of classes. """ assert self._is_fit, "Error. Need to run fit method before you can use the predict_proba method" X = self._to_torch_tensor(X).float() assert X.shape[-1] == self.pl_net.input_dim, "Dimension mistmatch" raw_logit_all = self.get_all_logits(X) prob = torch.nn.functional.softmax(raw_logit_all, dim=-1) return prob.cpu().numpy() @torch.no_grad() def predict_log_proba(self, X) -> numpy.ndarray: """ Compute the log_probabilities for all the classes. Args: X: independent variable of shape :math:`(n, *)` Returns: log_p: Log_Probability of all the classes of shape :math:`(n, C)` where `C` is the number of classes. """ assert self._is_fit, "Error. Need to run fit method before you can use the predict_proba method" X = self._to_torch_tensor(X).float() assert X.shape[-1] == self.pl_net.input_dim, "Dimension mistmatch" raw_logit_all = self.get_all_logits(X) prob = torch.nn.functional.log_softmax(raw_logit_all, dim=-1) return prob.cpu().numpy()