Source code for skada._pipeline

# Author: Theo Gnassounou <theo.gnassounou@inria.fr>
#         Remi Flamary <remi.flamary@polytechnique.edu>
#         Oleksii Kachaiev <kachayev@gmail.com>
#
# License: BSD 3-Clause

from collections import defaultdict
from typing import Callable, List, Optional, Union

from joblib import Memory
from sklearn.base import BaseEstimator
from sklearn.pipeline import Pipeline

from .base import BaseSelector, PerDomain, Shared

_DEFAULT_SELECTORS = {
    "shared": Shared,
    "per_domain": PerDomain,
}


# xxx(okachaiev): block 'fit_predict' as it is somewhat unexpected
[docs] def make_da_pipeline( *steps, memory: Optional[Memory] = None, verbose: bool = False, default_selector: Union[str, Callable[[BaseEstimator], BaseSelector]] = "shared", ) -> Pipeline: """Construct a :class:`~sklearn.pipeline.Pipeline` from the given estimators. This is a shorthand for the :class:`sklearn.pipeline.Pipeline` constructor; it does not require, and does not permit, naming the estimators. Instead, their names will be set to the lowercase of their types automatically. Parameters ---------- *steps : list of estimators or tuples of the form (name of step, estimator). List of the scikit-learn estimators that are chained together. memory : str or object with the joblib.Memory interface, default=None Used to cache the fitted transformers of the pipeline. The last step will never be cached, even if it is a transformer. By default, no caching is performed. If a string is given, it is the path to the caching directory. Enabling caching triggers a clone of the transformers before fitting. Therefore, the transformer instance given to the pipeline cannot be inspected directly. Use the attribute ``named_steps`` or ``steps`` to inspect estimators within the pipeline. Caching the transformers is advantageous when fitting is time consuming. verbose : bool, default=False If True, the time elapsed while fitting each step will be printed as it is completed. default_selector : str or callable, default = 'shared' Specifies a domain selector to wrap the estimator, if it is not already wrapped. Refer to :class:`~skada.base.BaseSelector` for an understanding of selector functionalities. The available options include 'shared' and 'per_domain'. For integrating a custom selector as the default, pass a callable that accepts :class:`~sklearn.base.BaseEstimator` and returns the estimator encapsulated within a domain selector. Returns ------- p : Pipeline Returns a scikit-learn :class:`~sklearn.pipeline.Pipeline` object. Examples -------- >>> from sklearn.naive_bayes import GaussianNB >>> from sklearn.preprocessing import StandardScaler >>> from skada import make_da_pipeline >>> make_da_pipeline(StandardScaler(), GaussianNB(priors=None)) Pipeline(steps=[('standardscaler', Shared(base_estimator=StandardScaler(), copy=True, with_mean=True, with_std=True)), ('gaussiannb', Shared(base_estimator=GaussianNB(), priors=None, var_smoothing=1e-09))]) """ if not steps: raise TypeError("Missing 1 required positional argument: 'steps'") names, estimators = [], [] for step in steps: name, estimator = step if isinstance(step, tuple) else (None, step) if isinstance(estimator, Pipeline) and isinstance(estimator[0], BaseSelector): # this means we got DA pipeline as a step in the pipeline for nested_name, nested_selector in estimator.steps: if name is not None: nested_name = f"{name}__{nested_name}" names.append(nested_name) estimators.append(nested_selector._unmark_as_final()) else: names.append(name) estimators.append(estimator) wrapped_estimators = _wrap_with_selectors(estimators, default_selector) steps = _name_estimators(wrapped_estimators) named_steps = [ (auto_name, step) if user_name is None else (user_name, step) for user_name, (auto_name, step) in zip(names, steps) ] named_steps[-1][1]._mark_as_final() return Pipeline(named_steps, memory=memory, verbose=verbose)
def _wrap_with_selector( estimator: BaseEstimator, selector: Union[str, Callable[[BaseEstimator], BaseSelector]], ) -> BaseSelector: if (estimator is not None) and not isinstance(estimator, BaseSelector): if callable(selector): estimator = selector(estimator) if not isinstance(estimator, BaseSelector): raise ValueError( "Callable `default_selector` has to return `BaseSelector` " # noqa: E501 f"instance, got {type(estimator)} instead." ) elif isinstance(selector, str): selector_cls = _DEFAULT_SELECTORS.get(selector) if selector_cls is None: raise ValueError( f"Unsupported `default_selector` name: {selector}." f"Use one of {_DEFAULT_SELECTORS.keys().join(', ')}" ) estimator = selector_cls(estimator) else: raise ValueError(f"Unsupported `default_selector` type: {type(selector)}") return estimator def _wrap_with_selectors( estimators: List[BaseEstimator], default_selector: Union[str, Callable[[BaseEstimator], BaseSelector]], ) -> List[BaseEstimator]: return [ (_wrap_with_selector(estimator, default_selector)) for estimator in estimators ] def _name_estimators(estimators): """Generate names for estimators.""" # From scikit-learn: https://github.com/scikit-learn/scikit-learn # Author: Edouard Duchesnay # Gael Varoquaux # Virgile Fritsch # Alexandre Gramfort # Lars Buitinck # License: BSD names = [] for estimator in estimators: # xxx(okachaiev): this logic gets progressively more # awkward. maybe we just need to make sure that default # 'Shared' selector does not get into a way of setting # parameters, but all others are just fine to be more # verbose if hasattr(estimator, "base_estimator"): name = type(estimator.base_estimator).__name__.lower() else: name = estimator.__class__.__name__.lower() if isinstance(estimator, PerDomain): name = "perdomain_" + name names.append(name) namecount = defaultdict(int) for est, name in zip(estimators, names): namecount[name] += 1 for k, v in list(namecount.items()): if v == 1: del namecount[k] for i in reversed(range(len(estimators))): name = names[i] if name in namecount: names[i] += "-%d" % namecount[name] namecount[name] -= 1 return list(zip(names, estimators))