Source code for skada._self_labeling

"""
DASVM estimator
===============

The DASVM method comes from [11].
.. [11] Bruzzone, L., & Marconcini, M. 'Domain adaptation problems: A DASVM
        classification technique and a circular validation strategy.'
        IEEE transactions on pattern analysis and machine intelligence, (2009).
"""
# Author: Ruben Bueno <ruben.bueno@polytechnique.edu>
#         Yanis Lalou <yanis.lalou@polytechnique.edu>
#
# License: BSD 3-Clause

import math
import warnings

import numpy as np
from sklearn.base import clone
from sklearn.svm import SVC
from sklearn.utils.metaestimators import available_if
from sklearn.utils.validation import check_is_fitted

from skada.utils import check_X_y_domain, source_target_split

from .base import DAEstimator


[docs] class DASVMClassifier(DAEstimator): """DASVM Estimator: Parameters ---------- base_estimator : BaseEstimator The estimator that will be used in the algorithm, It is a SVC by default, but can use any classifier equipped with a `decision_function` method k: int>0 The number of points per classes that will be discarded/added at each steps of the algorithm max_iter : int The maximal number of iteration of the algorithm when using `fit` save_estimators : Bool True if this object should remembers all the fitted estimators save_indices : Bool True if this object should remembers all the values of `index_source_deleted` and `index_target_added` References ---------- .. [11] Bruzzone, L., & Marconcini, M. 'Domain adaptation problems: A DASVM classification technique and a circular validation strategy.' IEEE transactions on pattern analysis and machine intelligence, (2009). """ __metadata_request__fit = {"sample_domain": True} __metadata_request__partial_fit = {"sample_domain": False} __metadata_request__predict = {"sample_domain": False, "allow_source": False} __metadata_request__predict_proba = {"sample_domain": False, "allow_source": False} __metadata_request__predict_log_proba = { "sample_domain": False, "allow_source": False, } __metadata_request__score = {"sample_domain": False, "allow_source": False} __metadata_request__decision_function = { "sample_domain": False, "allow_source": False, } def __init__( self, base_estimator=None, k=3, max_iter=1_000, save_estimators=False, save_indices=False, **kwargs, ): super().__init__() if base_estimator is None: self.base_estimator = SVC(probability=True) else: self.base_estimator = base_estimator self.max_iter = max_iter self.save_estimators = save_estimators self.save_indices = save_indices self.k = k def _find_points_next_step(self, indices_list, d, cond_array): """This function allow us to find the next points to add/discard. It is an inplace method, changing indices_list """ # We should take k points for each of the c classes, # depending on the values of d condition = np.logical_and(~indices_list, cond_array) for _ in range(min(self.k, math.ceil(sum(condition) / self.n_class))): idx = np.unique(np.argmax(d[condition], axis=0)) # We need to get all those indices to be take into account # the fact that the some previous points weren't in the list for ll in range(condition.shape[0]): if ~condition[ll]: idx[idx >= ll] += 1 # We finally only need to change the list for ll in idx: # indices_list[l] is False at that point indices_list[ll] = True def _get_X_y( self, new_estimator, index_target_added, index_source_deleted, Xs, Xt, ys ): """ Allow to get the X and y arrays at a state of the algorithm We take the source datapoints that have not been deleted, and the target points that have been added """ X = np.concatenate((Xs[~index_source_deleted], Xt[index_target_added])) semi_labels = new_estimator.predict(Xt[index_target_added]) y = np.concatenate((ys[~index_source_deleted], semi_labels)) return X, y def _get_decision(self, new_estimator, X, indices_list): """Look at the points that have either not been discarded or not been added.""" if sum(~indices_list) > 0: df = new_estimator.decision_function(X[~indices_list]) # df.ndim allows us to know if we are in the # `binary` case or the `multiclass` one decisions = np.ones(X.shape[0]) decisions[~indices_list] = df decisions = np.array([-decisions + 1, decisions + 1]).T else: decisions = np.ones(X.shape[0]) return decisions def fit(self, X, y=None, sample_domain=None): """Fit adaptation parameters. Parameters ---------- X : array-like, shape (n_samples, n_features) The source and training data. y : array-like, shape (n_samples,) The source labels, followed by some labels that won't be looked at. sample_domain : array-like, shape (n_samples,) The domain labels. Returns ------- self : object Returns self. """ X, y, sample_domain = check_X_y_domain(X, y, sample_domain) Xs, Xt, ys, _ = source_target_split(X, y, sample_domain=sample_domain) n = Xs.shape[0] m = Xt.shape[0] self.n_class = 2 # number of classes self.estimators = [] self.indices_source_deleted = [] self.indices_target_added = [] index_source_deleted = np.array([False] * n) index_target_added = np.array([False] * m) if self.save_indices: self.indices_source_deleted.append(np.copy(index_source_deleted)) self.indices_target_added.append(np.copy(index_target_added)) X_train = Xs y_train = ys new_estimator = self.base_estimator new_estimator.fit(X_train, y_train) if self.save_estimators: self.estimators.append(new_estimator) decisions_source = new_estimator.decision_function(Xs) if self.n_class == 2: decisions_source = np.array([-decisions_source, decisions_source]).T decisions_target = new_estimator.decision_function(Xt) if self.n_class == 2: decisions_target = np.array([-decisions_target, decisions_target]).T decisions_target_ = -np.abs(decisions_target - self.n_class + 1) self._find_points_next_step( index_source_deleted, decisions_source, np.ones(index_source_deleted.shape[0], dtype=bool), ) in_margin_target = ( np.sum( np.logical_and( decisions_target < self.n_class - 1, decisions_target > self.n_class - 2, ), axis=1, ) > 0 ) self._find_points_next_step( index_target_added, decisions_target_, in_margin_target ) if self.save_indices: self.indices_source_deleted.append(np.copy(index_source_deleted)) self.indices_target_added.append(np.copy(index_target_added)) i = 0 for i in range(1, self.max_iter): if sum(in_margin_target) == 0: break old_estimator = new_estimator X_train, y_train = self._get_X_y( new_estimator, index_target_added, index_source_deleted, Xs, Xt, ys ) new_estimator = clone(self.base_estimator) new_estimator.fit(X_train, y_train) if self.save_estimators: self.estimators.append(new_estimator) for j in range(len(index_target_added)): if index_target_added[j]: x = Xt[j] if new_estimator.predict([x]) != old_estimator.predict([x]): # index_target_added[j] should be True index_target_added[j] = False decisions_source = self._get_decision( new_estimator, Xs, index_source_deleted ) decisions_target = self._get_decision(new_estimator, Xt, index_target_added) if decisions_target.ndim > 1: decisions_target_ = -np.abs(decisions_target - 1) self._find_points_next_step( index_source_deleted, decisions_source, np.ones(index_source_deleted.shape[0], dtype=bool), ) in_margin_target = ( np.sum( np.logical_and(decisions_target < 1, decisions_target > 0), axis=1 ) > 0 ) self._find_points_next_step( index_target_added, decisions_target, in_margin_target ) if self.save_indices: self.indices_source_deleted.append(np.copy(index_source_deleted)) self.indices_target_added.append(np.copy(index_target_added)) old_estimator = new_estimator X_train, y_train = Xt, old_estimator.predict(Xt) new_estimator = clone(self.base_estimator) new_estimator.fit(X_train, y_train) if self.save_estimators: self.estimators.append(new_estimator) if self.save_indices: self.indices_source_deleted.append(np.ones(n, dtype=bool)) self.indices_target_added.append(np.ones(m, dtype=bool)) self.base_estimator_ = new_estimator return self def predict(self, X, **kwargs): """Return predicted value by the fitted estimator for `X` `predict` method from the estimator we fitted """ return self.base_estimator_.predict(X, **kwargs) def _check_proba(self): if hasattr(self.base_estimator, "predict_proba"): return True else: raise AttributeError( "The base estimator does not have a predict_proba method" ) @available_if(_check_proba) def predict_proba(self, X, **kwargs): """Return predicted probabilities by the fitted estimator for `X` `predict_proba` method from the estimator we fitted """ check_is_fitted(self) return self.base_estimator.predict_proba(X, **kwargs) def decision_function(self, X): """Return values of the decision function of the fitted estimator for `X` `decision_function` method from the base_estimator_ we fitted """ return self.base_estimator_.decision_function(X) def score(self, X, y, sample_domain=None, *, sample_weight=None, **kwargs): """Return the scores of the prediction""" check_is_fitted(self) if sample_domain is not None and np.any(sample_domain >= 0): warnings.warn( "Source domain detected. Predictor is trained on target" "and score might be biased." ) return self.base_estimator_.score(X, y, sample_weight=sample_weight, **kwargs)