Source code for skada.datasets._base

# Author: Oleksii Kachaiev <kachayev@gmail.com>
#         Yanis Lalou <yanis.lalou@polytechnique.edu>
#
# License: BSD 3-Clause

import os
from functools import reduce
from typing import Dict, Iterable, List, Mapping, Optional, Tuple, Union

import numpy as np
from sklearn.utils import Bunch

_DEFAULT_HOME_FOLDER_KEY = "SKADA_DATA_FOLDER"
_DEFAULT_HOME_FOLDER = "~/skada_datasets"

# xxx(okachaiev): if we use -1 as a detector for targets,
# we should not allow non-labeled dataset or... we need
# to come up with a way to pack them properly
DomainDataType = Union[
    # (name, X, y)
    Tuple[str, np.ndarray, np.ndarray],
    # (X, y)
    Tuple[np.ndarray, np.ndarray],
    # (X,)
    Tuple[np.ndarray,],
]

PackedDatasetType = Union[Bunch, Tuple[np.ndarray, np.ndarray, np.ndarray]]


def get_data_home(data_home: Union[str, os.PathLike, None]) -> str:
    """Return the path of the `skada` data folder.

    This folder is used by some large dataset loaders to avoid downloading the
    data several times.

    By default the data directory is set to a folder named 'skada_datasets' in the
    user home folder.

    Alternatively, it can be set by the 'SKADA_DATA_FOLDER' environment
    variable or programmatically by giving an explicit folder path. The '~'
    symbol is expanded to the user home folder.

    If the folder does not already exist, it is automatically created.

    Parameters
    ----------
    data_home : str or path-like, default=None
        The path to `skada` data folder. If `None`, the default path
        is `~/skada_datasets`.

    Returns
    -------
    data_home: str
        The path to `skada` data folder.
    """
    if data_home is None:
        data_home = os.environ.get(_DEFAULT_HOME_FOLDER_KEY, _DEFAULT_HOME_FOLDER)
    data_home = os.path.expanduser(data_home)
    os.makedirs(data_home, exist_ok=True)
    return data_home


[docs] class DomainAwareDataset: """ Container carrying all dataset domains. This class allows to store and manipulate datasets from multiple domains, keeping track of the domain information for each sample. Parameters ---------- domains : list of tuple or dict of tuple or None, optional List or dictionary of domains to add at initialization. Each domain can be a tuple (X, y) or (X, y, name). Attributes ---------- domains_ : list List of domains added, each as a tuple (X, y) or (X,). domain_names_ : dict Dictionary mapping each domain name to its internal identifier. """ def __init__( self, # xxx(okachaiev): not sure if dictionary is a good format :thinking: domains: Union[List[DomainDataType], Dict[str, DomainDataType], None] = None, ): self.domains_ = [] self.domain_names_ = {} # xxx(okachaiev): there should be a simpler way for adding those if domains is not None: for d in domains: if len(d) == 2: X, y = d domain_name = None elif len(d) == 3: X, y, domain_name = d self.add_domain(X, y=y, domain_name=domain_name)
[docs] def add_domain( self, X, y=None, domain_name: Optional[str] = None ) -> "DomainAwareDataset": """ Add a new domain to the dataset. Parameters ---------- X : np.ndarray Feature matrix for the domain. y : np.ndarray or None, optional Labels for the domain. If None, labels are not provided. domain_name : str, optional Name of the domain. If None, a unique name is autogenerated. Returns ------- self : DomainAwareDataset The updated dataset. """ if domain_name is not None: # check the name is unique # xxx(okachaiev): ValueError would be more appropriate assert domain_name not in self.domain_names_ else: domain_name = f"_{len(self.domain_names_)+1}" domain_id = len(self.domains_) + 1 self.domains_.append((X, y) if y is not None else (X,)) self.domain_names_[domain_name] = domain_id return self
[docs] def merge( self, dataset: "DomainAwareDataset", names_mapping: Optional[Mapping] = None ) -> "DomainAwareDataset": """ Merge another DomainAwareDataset into this one. Parameters ---------- dataset : DomainAwareDataset The dataset to merge. names_mapping : mapping, optional Mapping from old domain names to new domain names. Returns ------- self : DomainAwareDataset The updated dataset. """ for domain_name in dataset.domain_names_: # xxx(okachaiev): this needs to be more flexible # as it should be possible to pass only X with y=None # i guess best way of doing so is to change 'add_domain' API X, y = dataset.get_domain(domain_name) if names_mapping is not None: domain_name = names_mapping.get(domain_name, domain_name) self.add_domain(X, y, domain_name) return self
[docs] def get_domain(self, domain_name: str) -> Tuple[np.ndarray, Optional[np.ndarray]]: """ Retrieve the data and labels for a given domain. Parameters ---------- domain_name : str Name of the domain to retrieve. Returns ------- domain : tuple Tuple containing (X, y) or (X,) for the specified domain. """ domain_id = self.domain_names_[domain_name] return self.domains_[domain_id - 1]
[docs] def select_domain( self, sample_domain: np.ndarray, domains: Union[str, Iterable[str]] ) -> np.ndarray: """ Select samples belonging to one or more domains. Parameters ---------- sample_domain : np.ndarray Array of domain labels for each sample. domains : str or iterable of str Domain name(s) to select. Returns ------- mask : np.ndarray Boolean mask indicating selected samples. """ return select_domain(self.domain_names_, sample_domain, domains)
# xxx(okachaiev): i guess, if we are using names to pack domains into array, # we should not autogenerate them... otherwise it might be not obvious at all
[docs] def pack( self, as_sources: List[str] = None, as_targets: List[str] = None, return_X_y: bool = True, train: bool = False, mask: Union[None, int, float] = None, ) -> PackedDatasetType: """Aggregates datasets from all domains into a unified domain-aware representation, ensuring compatibility with domain adaptation (DA) estimators. Parameters ---------- as_sources : list List of domain names to be used as sources. as_targets : list List of domain names to be used as targets. return_X_y : bool, default=True When set to True, returns a tuple (X, y, sample_domain). Otherwise returns :class:`~sklearn.utils.Bunch` object with the structure described below. train: bool, default=False When set to True, masks labels for target domains with -1 (or a `mask` given), so they are not available at train time. mask: int | float (optional), default=None Value to mask labels at training time. Returns ------- data : :class:`~sklearn.utils.Bunch` Dictionary-like object, with the following attributes. X: ndarray Samples from all sources and all targets given. y : ndarray Labels from all sources and all targets. sample_domain : ndarray The integer label for domain the sample was taken from. By convention, source domains have non-negative labels, and target domain label is always < 0. domain_names : dict The names of domains and associated domain labels. (X, y, sample_domain) : tuple if `return_X_y=True` Tuple of (data, target, sample_domain), see the description above. """ Xs, ys, sample_domains = [], [], [] domain_labels = {} if as_sources is None: as_sources = [] if as_targets is None: as_targets = [] for domain_name in as_sources: domain_id = self.domain_names_[domain_name] source = self.get_domain(domain_name) if len(source) == 1: (X,) = source y = -np.ones(X.shape[0], dtype=np.int32) elif len(source) == 2: X, y = source else: raise ValueError("Invalid definition for domain data") # xxx(okachaiev): this is horribly inefficient, re-write when API is fixed Xs.append(X) ys.append(y) sample_domains.append(np.ones_like(y, dtype=int) * domain_id) domain_labels[domain_name] = domain_id # xxx(okachaiev): code duplication, re-write when API is fixed dtype = None for domain_name in as_targets: domain_id = self.domain_names_[domain_name] target = self.get_domain(domain_name) if len(target) == 1: (X,) = target # xxx(okachaiev): for what it's worth, we should likely to # move the decision about dtype to the very end of the list y = -np.ones(X.shape[0], dtype=np.int32) elif len(target) == 2: X, y = target else: raise ValueError("Invalid definition for domain data") if train: if mask is not None: y = np.array([mask] * X.shape[0], dtype=dtype) elif y.dtype in (np.int32, np.int64): y = -np.ones(X.shape[0], dtype=y.dtype) # make sure that the mask is reused on the next iteration mask, dtype = -1, y.dtype elif y.dtype in (np.float32, np.float64): y = np.array([np.nan] * X.shape[0], dtype=y.dtype) # make sure that the mask is reused on the next iteration mask, dtype = np.nan, y.dtype # xxx(okachaiev): this is horribly inefficient, rewrite when API is fixed Xs.append(X) ys.append(y) sample_domains.append(-1 * domain_id * np.ones_like(y, dtype=int)) domain_labels[domain_name] = -1 * domain_id # xxx(okachaiev): so far this only works if source and target has the same size Xs = np.concatenate(Xs) ys = np.concatenate(ys) sample_domain = np.concatenate(sample_domains) return ( (Xs, ys, sample_domain) if return_X_y else Bunch( X=Xs, y=ys, sample_domain=sample_domain, domain_names=domain_labels, ) )
[docs] def pack_train( self, as_sources: List[str], as_targets: List[str], return_X_y: bool = True, mask: Union[None, int, float] = None, ) -> PackedDatasetType: """ Aggregate source and target domains for training. This method is equivalent to :meth:`pack` with ``train=True``. It masks the labels for target domains (with -1 or a custom mask value) so that they are not available during training, as required for domain adaptation scenarios. Parameters ---------- as_sources : list of str List of domain names to be used as sources. as_targets : list of str List of domain names to be used as targets. return_X_y : bool, default=True If True, returns a tuple (X, y, sample_domain). Otherwise, returns a :class:`sklearn.utils.Bunch` object. mask : int or float, optional Value to mask labels at training time. If None, uses -1 for integers and np.nan for floats. Returns ------- data : :class:`sklearn.utils.Bunch` Dictionary-like object with attributes X, y, sample_domain, domain_names. (X, y, sample_domain) : tuple if `return_X_y=True` Tuple of (data, target, sample_domain). """ return self.pack( as_sources=as_sources, as_targets=as_targets, return_X_y=return_X_y, train=True, mask=mask, )
[docs] def pack_test( self, as_targets: List[str], return_X_y: bool = True, ) -> PackedDatasetType: """ Aggregate target domains for testing. This method is equivalent to :meth:`pack` with only target domains and ``train=False``. Labels are not masked. Parameters ---------- as_targets : list of str List of domain names to be used as targets. return_X_y : bool, default=True If True, returns a tuple (X, y, sample_domain). Otherwise, returns a :class:`sklearn.utils.Bunch` object. Returns ------- data : :class:`sklearn.utils.Bunch` Dictionary-like object with attributes X, y, sample_domain, domain_names. (X, y, sample_domain) : tuple if `return_X_y=True` Tuple of (data, target, sample_domain). """ return self.pack( as_sources=[], as_targets=as_targets, return_X_y=return_X_y, train=False, )
[docs] def pack_lodo(self, return_X_y: bool = True) -> PackedDatasetType: """Packages all domains in a format compatible with the Leave-One-Domain-Out cross-validator (refer to :class:`~skada.model_selection.LeaveOneDomainOut` for more details). To enable the splitter's dynamic assignment of source and target domains, data from each domain is included in the output twice — once as a source and once as a target. Exercise caution when using this output for purposes other than its intended use, as this could lead to incorrect results and data leakage. Parameters ---------- return_X_y : bool, default=True When set to True, returns a tuple (X, y, sample_domain). Otherwise returns :class:`~sklearn.utils.Bunch` object with the structure described below. Returns ------- data : :class:`~sklearn.utils.Bunch` Dictionary-like object, with the following attributes. X: ndarray Samples from all sources and all targets given. y : ndarray Labels from all sources and all targets. sample_domain : np.ndarray The integer label for domain the sample was taken from. By convention, source domains have non-negative labels, and target domain label is always < 0. domain_names : dict The names of domains and associated domain labels. (X, y, sample_domain) : tuple if `return_X_y=True` Tuple of (data, target, sample_domain), see the description above. """ return self.pack( as_sources=list(self.domain_names_.keys()), as_targets=list(self.domain_names_.keys()), return_X_y=return_X_y, train=True, )
def __str__(self) -> str: return f"DomainAwareDataset(domains={self._get_domain_representation()})" def __repr__(self) -> str: head = self.__str__() body = [f"Number of domains: {len(self.domains_)}"] body.append(f"Total size: {sum(len(tup[0]) for tup in self.domains_)}") output = "\n".join([head] + body) return output def _get_domain_representation(self, max_domains=5, max_length=50): domain_names = list(self.domain_names_.keys()) if len(domain_names) <= max_domains: # If the number of domains is small, include all names domain_str = str(domain_names) else: # If the number of domains is large, truncate the list and add ellipsis truncated_domains = domain_names[:max_domains] domain_str = str(truncated_domains)[:-1] + ", ...]" # Truncate the string representation if it exceeds max_length if len(domain_str) > max_length: domain_str = domain_str[: max_length - 3] + "...]" return domain_str
# xxx(okachaiev): putting `domain_names` first argument # so it's compatible with `partial` def select_domain( domain_names: Dict[str, int], sample_domain: np.ndarray, domains: Union[str, Iterable[str]], ) -> np.ndarray: if isinstance(domains, str): domains = [domains] # xxx(okachaiev): this version is not the most efficient return reduce( np.logical_or, (sample_domain == domain_names[domain] for domain in domains) )