Source code for skada.utils

# Author: Yanis Lalou <yanis.lalou@polytechnique.edu>
#         Antoine Collas <contact@antoinecollas.fr>
#
# License: BSD 3-Clause

import warnings
from itertools import chain
from typing import Optional, Sequence, Set

import numpy as np
from scipy.optimize import LinearConstraint, minimize
from sklearn.utils import check_array, check_consistent_length

from skada._utils import (
    _DEFAULT_MASKED_TARGET_CLASSIFICATION_LABEL,
    _DEFAULT_MASKED_TARGET_REGRESSION_LABEL,
    _DEFAULT_SOURCE_DOMAIN_LABEL,
    _DEFAULT_TARGET_DOMAIN_LABEL,
    _DEFAULT_TARGET_DOMAIN_ONLY_LABEL,
    _check_y_masking,
    Y_Type,
    _find_y_type
)


[docs] def check_X_y_domain( X, y, sample_domain=None, allow_source: bool = True, allow_multi_source: bool = True, allow_target: bool = True, allow_multi_target: bool = True, allow_common_domain_idx: bool = True, allow_auto_sample_domain: bool = True, allow_nd: bool = False, allow_label_masks: bool = True, ): """ Input validation for domain adaptation (DA) estimator. If we work in single-source and single target mode, return source and target separately to avoid additional scan for 'sample_domain' array. Parameters ---------- X : array-like of shape (n_samples, n_features) Input features y : array-like of shape (n_samples,) Target variable sample_domain : array-like, scalar, or None, optional (default=None) Array specifying the domain labels for each sample. A scalar value can be provided to assign all samples to the same domain. allow_source : bool, optional (default=True) Allow the presence of source domains. allow_multi_source : bool, optional (default=True) Allow multiple source domains. allow_target : bool, optional (default=True) Allow the presence of target domains. allow_multi_target : bool, optional (default=True) Allow multiple target domains. allow_common_domain_idx : bool, optional (default=True) Allow the same domain index to be used for source and target domains, e.g 1 for a source domain and -1 for a target domain. allow_auto_sample_domain : bool, optional (default=True) Allow automatic generation of sample_domain if not provided. allow_nd : bool, optional (default=False) Allow X and y to be N-dimensional arrays. allow_label_masks : bool, optional (default=True) Allow NaNs in y. Returns ------- X : array Input features y : array Target variable sample_domain : array Array specifying the domain labels for each sample. """ X = check_array(X, input_name='X', allow_nd=allow_nd) y = check_array(y, force_all_finite=not allow_label_masks, ensure_2d=False, input_name='y') check_consistent_length(X, y) if sample_domain is None and not allow_auto_sample_domain: raise ValueError("Either 'sample_domain' or 'allow_auto_sample_domain' " "should be set") elif sample_domain is None and allow_auto_sample_domain: y_type = _check_y_masking(y) sample_domain = _DEFAULT_SOURCE_DOMAIN_LABEL*np.ones_like(y) # labels masked with -1 (for classification) are recognized as targets, # labels masked with nan (for regression) are recognized as targets, # the rest is treated as a source if y_type == Y_Type.DISCRETE: mask = (y == _DEFAULT_MASKED_TARGET_CLASSIFICATION_LABEL) else: mask = (np.isnan(y)) sample_domain[mask] = _DEFAULT_TARGET_DOMAIN_LABEL if np.isscalar(sample_domain): sample_domain = sample_domain*np.ones_like(y) source_idx = extract_source_indices(sample_domain) # xxx(okachaiev): this needs to be re-written to accommodate for a # a new domain labeling convention without "intersections" n_sources = np.unique(sample_domain[source_idx]).shape[0] n_targets = np.unique(sample_domain[~source_idx]).shape[0] if not allow_source and n_sources > 0: raise ValueError(f"Number of sources provided is {n_sources} " "and 'allow_source' is set to False") if not allow_target and n_targets > 0: raise ValueError(f"Number of targets provided is {n_targets} " "and 'allow_target' is set to False") if not allow_multi_source and n_sources > 1: raise ValueError(f"Number of sources provided is {n_sources} " "and 'allow_multi_source' is set to False") if not allow_multi_target and n_sources > 1: raise ValueError(f"Number of targets provided is {n_targets} " "and 'allow_multi_target' is set to False") # Check for unique domain idx if not allow_common_domain_idx: unique_domain_idx = np.unique(sample_domain) unique_domain_idx_abs = np.abs(unique_domain_idx) if len(unique_domain_idx) != len(np.unique(unique_domain_idx_abs)): raise ValueError("Domain labels should be unique: the same domain " "index should not be used both for source and target") return X, y, sample_domain
# xxx(okachaiev): code duplication, just for testing def check_X_domain( X, sample_domain, *, allow_domains: Optional[Set[int]] = None, allow_source: bool = True, allow_multi_source: bool = True, allow_target: bool = True, allow_multi_target: bool = True, allow_common_domain_idx: bool = True, allow_auto_sample_domain: bool = True, allow_nd: bool = False, ): """ Input validation for domain adaptation (DA) estimator. If we work in single-source and single target mode, return source and target separately to avoid additional scan for 'sample_domain' array. Parameters ---------- X : array-like of shape (n_samples, n_features) Input features. sample_domain : array-like, scalar, or None, optional (default=None) Array specifying the domain labels for each sample. A scalar value can be provided to assign all samples to the same domain. allow_domains : set of int, optional (default=None) Set of allowed domain labels. If provided, only these domain labels are allowed. allow_source : bool, optional (default=True) Allow the presence of source domains. allow_multi_source : bool, optional (default=True) Allow multiple source domains. allow_target : bool, optional (default=True) Allow the presence of target domains. allow_multi_target : bool, optional (default=True) Allow multiple target domains. allow_common_domain_idx : bool, optional (default=True) Allow the same domain index to be used for source and target domains, e.g 1 for a source domain and -1 for a target domain. allow_auto_sample_domain : bool, optional (default=True) Allow automatic generation of sample_domain if not provided. allow_nd : bool, optional (default=False) Allow X and y to be N-dimensional arrays. Returns ------- X : array Input features. sample_domain : array Combined domain labels for source and target domains. """ X = check_array(X, input_name='X', allow_nd=allow_nd) if sample_domain is None and not allow_auto_sample_domain: raise ValueError("Either 'sample_domain' or 'allow_auto_sample_domain' " "should be set") elif sample_domain is None and allow_auto_sample_domain: # default target domain when sample_domain is not given # The idea is that with no labels we always assume # target domain (_DEFAULT_TARGET_DOMAIN_ONLY_LABEL) sample_domain = ( _DEFAULT_TARGET_DOMAIN_ONLY_LABEL * np.ones(X.shape[0], dtype=np.int32) ) if np.isscalar(sample_domain): sample_domain = sample_domain * np.ones(X.shape[0], dtype=np.int32) source_idx = extract_source_indices(sample_domain) check_consistent_length(X, sample_domain) if allow_domains is not None: for domain in np.unique(sample_domain): # xxx(okachaiev): re-definition of the wildcards wildcard = np.inf if domain >= 0 else -np.inf if domain not in allow_domains and wildcard not in allow_domains: raise ValueError(f"Unknown domain label '{domain}' given") n_sources = np.unique(sample_domain[source_idx]).shape[0] n_targets = np.unique(sample_domain[~source_idx]).shape[0] if not allow_source and n_sources > 0: raise ValueError(f"Number of sources provided is {n_sources} " "and 'allow_source' is set to False") if not allow_target and n_targets > 0: raise ValueError(f"Number of targets provided is {n_targets} " "and 'allow_target' is set to False") if not allow_multi_source and n_sources > 1: raise ValueError(f"Number of sources provided is {n_sources} " "and 'allow_multi_source' is set to False") if not allow_multi_target and n_sources > 1: raise ValueError(f"Number of targets provided is {n_targets} " "and 'allow_multi_target' is set to False") # Check for unique domain idx if not allow_common_domain_idx: unique_domain_idx = np.unique(sample_domain) unique_domain_idx_abs = np.abs(unique_domain_idx) if len(unique_domain_idx) != len(np.unique(unique_domain_idx_abs)): raise ValueError("Domain labels should be unique: the same domain " "index should not be used both for source and target") return X, sample_domain
[docs] def extract_source_indices(sample_domain): """Extract the indices of the source samples. Parameters ---------- sample_domain : array-like of shape (n_samples,) Array specifying the domain labels for each sample. Returns ------- source_idx : array Boolean array indicating source indices. """ sample_domain = check_array( sample_domain, dtype=np.int32, ensure_2d=False, input_name='sample_domain' ) source_idx = (sample_domain >= 0) return source_idx
[docs] def extract_domains_indices(sample_domain, split_source_target=False): """Extract the indices of the specific domain samples. Parameters ---------- sample_domain : array-like of shape (n_samples,) Array specifying the domain labels for each sample. split_source_target : bool, optional (default=False) Whether to split the source and target domains. Returns ------- domains_idx : dict A dictionary where keys are unique domain labels and values are indexes of the samples belonging to the corresponding domain. If split_source_target is True, then two dictionaries are returned: - source_idx: keys >= 0 - target_idx: keys < 0 """ sample_domain = check_array( sample_domain, dtype=np.int32, ensure_2d=False, input_name='sample_domain' ) domain_idx = {} unique_domains = np.unique(sample_domain) for domain in unique_domains: source_idx = (sample_domain == domain) domain_idx[domain] = np.flatnonzero(source_idx) if split_source_target: source_domain_idx = {k: v for k, v in domain_idx.items() if k >= 0} target_domain_idx = {k: v for k, v in domain_idx.items() if k < 0} return source_domain_idx, target_domain_idx else: return domain_idx
[docs] def source_target_split( *arrays, sample_domain ): r"""Split data into source and target domains Parameters ---------- *arrays : sequence of array-like of identical shape (n_samples, n_features) Input features and target variable(s), and or sample_weights to be split. All arrays should have the same length except if None is given then a couple of None variables are returned to allow for optional sample_weight. sample_domain : array-like of shape (n_samples,) Array specifying the domain labels for each sample. Returns ------- splits : list, length=2 * len(arrays) List containing source-target split of inputs. """ if len(arrays) == 0: raise ValueError("At least one array required as input") check_consistent_length(arrays) source_idx = extract_source_indices(sample_domain) return list(chain.from_iterable( (a[source_idx], a[~source_idx]) if a is not None else (None, None) for a in arrays ))
[docs] def per_domain_split( *arrays, sample_domain): r"""Split data into multiple source and target domains Parameters ---------- *arrays : sequence of array-like of identical shape (n_samples, n_features) Input features and target variable(s), and or sample_weights to be split. All arrays should have the same length except if None is given then a couple of None variables are returned to allow for optional sample_weight. sample_domain : array-like of shape (n_samples,) Array specifying the domain labels for each sample. split_source_target : bool, optional (default=False) Whether to split the source and target domains. Returns ------- source_dict : dict dict of source domains (contain tuple of subset of arrays). target_dict : dict dict of target domains (contain tuple of subset of arrays). """ if len(arrays) == 0: raise ValueError("At least one array required as input") check_consistent_length(arrays) domain_idx = extract_domains_indices(sample_domain, False) source_dict = {} target_dict = {} for domain, idx in domain_idx.items(): if domain >= 0: source_dict[domain] = list(chain.from_iterable( (a[idx],) if a is not None else (None,) for a in arrays )) else: target_dict[domain] = list(chain.from_iterable( (a[idx],) if a is not None else (None,) for a in arrays )) return source_dict, target_dict
[docs] def source_target_merge( *arrays, sample_domain: Optional[np.ndarray] = None ) -> Sequence[np.ndarray]: """Merge source and target domain data based on sample domain labels. Parameters ---------- *arrays : sequence of array-like (, n_features). The number of arrays must be even since we consider pairs of source-target arrays each time. Each pair should have at least one non-empty array. In each pair the first array is considered as the source and the second as the target. If one of the array is None or empty, it's value will be inferred from the other array and the sample_domain (depending on the type of the arrays, they'll have a value of {_DEFAULT_MASKED_TARGET_CLASSIFICATION_LABEL} or {_DEFAULT_MASKED_TARGET_REGRESSION_LABEL}). sample_domain : array-like of shape (n_samples,) Array specifying the domain labels for each sample. If None or empty the domain labels will be inferred from the the *arrays, being a default source and target domain (depending on the type of the arrays, they'll have a value of {_DEFAULT_SOURCE_DOMAIN_LABEL} or {_DEFAULT_TARGET_DOMAIN_LABEL}). Returns ------- merges : list, length=len(arrays)/2 List containing merged data based on the sample domain labels. sample_domain : array-like of shape (n_samples,) Array specifying the domain labels for each sample. Examples -------- >>> X_source = np.array([[1, 2], [3, 4], [5, 6]]) >>> X_target = np.array([[7, 8], [9, 10]]) >>> X, sample_domain = source_target_merge(X_source, X_target) >>> X array([[ 1, 2], [ 3, 4], [ 5, 6], [ 7, 8], [ 9, 10]]) >>> sample_domain array([ 1., 1., 1., -2., -2.]) >>> X_source = np.array([[1, 2], [3, 4], [5, 6]]) >>> X_target = np.array([[7, 8], [9, 10]]) >>> y_source = np.array([0, 1, 1]) >>> y_target = None >>> X, y, _ = source_target_merge(X_source, X_target, y_source, y_target) >>> X array([[ 1, 2], [ 3, 4], [ 5, 6], [ 7, 8], [ 9, 10]]) >>> y array([ 0, 1, 1, {_DEFAULT_MASKED_TARGET_CLASSIFICATION_LABEL}, {_DEFAULT_MASKED_TARGET_CLASSIFICATION_LABEL}]) """ arrays = list(arrays) # Convert to list to be able to modify it # ################ Check *arrays ################# if len(arrays) < 2: raise ValueError("At least two array required as input") if len(arrays) % 2 != 0: raise ValueError("Even number of arrays required as input") for i in range(0, len(arrays), 2): if arrays[i] is None or arrays[i].shape[0] == 0: arrays[i] = np.array([]) if arrays[i+1] is None or arrays[i+1].shape[0] == 0: arrays[i+1] = np.array([]) # Check no pair is empty if (np.size(arrays[i]) == 0 and np.size(arrays[i+1]) == 0): raise ValueError("Only one array can be None or empty in each pair") # Check consistent dim of arrays if (np.size(arrays[i]) != 0 and np.size(arrays[i+1]) != 0): if arrays[i].shape[1:] != arrays[i+1].shape[1:]: raise ValueError( "Inconsistent number of features in source-target arrays" ) # ################ Check sample_domain ################# # If sample_domain is None, we need to infer it from the target array if sample_domain is None or sample_domain.shape[0] == 0: # We need to infer the domain from the target array warnings.warn( "sample_domain is None or empty, it will be inferred from the arrays" ) # By assuming that the first array is the source and the second the target source_assumed_index = 0 target_assumed_index = 1 sample_domain = np.concatenate(( _DEFAULT_SOURCE_DOMAIN_LABEL*np.ones( arrays[source_assumed_index].shape[0] ), _DEFAULT_TARGET_DOMAIN_LABEL*np.ones( arrays[target_assumed_index].shape[0] ) )) # To test afterward if the number of samples in source-target arrays # and the number inferred in the sample_domain are consistent source_indices = extract_source_indices(sample_domain) source_samples = np.count_nonzero(source_indices) target_samples = np.count_nonzero(~source_indices) # ################ Merge arrays ################# merges = [] # List of merged arrays for i in range(0, len(arrays), 2): # If one of the array is empty, we need to infer its values index_is_empty = None # Index of the array that was None if np.size(arrays[i]) == 0: index_is_empty = i if np.size(arrays[i+1]) == 0: index_is_empty = i+1 if index_is_empty is not None: # We need to infer the value of the empty array in the pair # warnings.warn( # "One of the arrays in a pair is empty, it will be inferred" # ) pair_index = i+1 if index_is_empty == i else i # Infer the value of the empty array # only if the shape of the new array is not (0, ...) shape_to_complete = (sample_domain.shape[0] - arrays[pair_index].shape[0],) + arrays[pair_index].shape[1:] if shape_to_complete[0] > 0: y_type = _find_y_type(arrays[pair_index]) if y_type == Y_Type.DISCRETE: default_masked_label = _DEFAULT_MASKED_TARGET_CLASSIFICATION_LABEL elif y_type == Y_Type.CONTINUOUS: default_masked_label = _DEFAULT_MASKED_TARGET_REGRESSION_LABEL arrays[index_is_empty] = ( default_masked_label * np.ones(shape_to_complete) ) # Check consistent number of samples in source-target arrays # and the number inferred in the sample_domain if (sample_domain is not None) and (sample_domain.shape[0] != 0): if ( (arrays[i].shape[0] != source_samples) or (arrays[i+1].shape[0] != target_samples) ): raise ValueError( "Inconsistent number of samples in source-target arrays " "and the number inferred in the sample_domain" ) merges.append( _merge_arrays(arrays[i], arrays[i+1], sample_domain=sample_domain) ) return (*merges, sample_domain)
# Update the docstring to replace placeholders with actual values source_target_merge.__doc__ = source_target_merge.__doc__.format( _DEFAULT_MASKED_TARGET_CLASSIFICATION_LABEL=_DEFAULT_MASKED_TARGET_CLASSIFICATION_LABEL, _DEFAULT_MASKED_TARGET_REGRESSION_LABEL=_DEFAULT_MASKED_TARGET_REGRESSION_LABEL, _DEFAULT_SOURCE_DOMAIN_LABEL=_DEFAULT_SOURCE_DOMAIN_LABEL, _DEFAULT_TARGET_DOMAIN_LABEL=_DEFAULT_TARGET_DOMAIN_LABEL ) def _merge_arrays( array_source, array_target, sample_domain ): """Merge source and target domain data based on sample domain labels. Parameters ---------- array_source : sequence of array-like of identical number of samples (n_samples_source, n_features). array_target : sequence of array-like of identical number of samples (n_samples_target, n_features). sample_domain : array-like of shape (n_samples_source + n_samples_target,) Array specifying the domain labels for each sample. Returns ------- merge : sequence of array-like of identical number of samples (n_samples_source + n_samples_target, n_features) """ if array_source.shape[0] > 0 and array_target.shape[0] > 0: output = np.zeros_like( np.concatenate((array_source, array_target)), dtype=array_source.dtype ) output[sample_domain >= 0] = array_source output[sample_domain < 0] = array_target elif array_source.shape[0] > 0: output = np.zeros_like( array_source, dtype=array_source.dtype ) output[sample_domain >= 0] = array_source else: output = np.zeros_like( array_target, dtype=array_target.dtype ) output[sample_domain < 0] = array_target return output def qp_solve(Q, c=None, A=None, b=None, Aeq=None, beq=None, lb=None, ub=None, x0=None, tol=1e-6, max_iter=1000, verbose=False, log=False, solver="scipy"): r""" Solves a standard quadratic program Solve the following optimization problem: .. math:: \min_x \quad \frac{1}{2}x^TQx + x^Tc lb <= x <= ub Ax <= b A_{eq} x = b_{eq} Return val as None if optimization failed. All constraint parameters are optional, they will be ignored if set as None. Note that the Frank-wolfe solver can only be used to solve optimization problem defined as follows: .. math:: \min_x \quad \frac{1}{2}x^TQx + x^Tc A_{eq} x = b_{eq} x >= 0 Or, .. math:: \min_x \quad \frac{1}{2}x^TQx + x^Tc A x <= b x >= 0 With Aeq and beq of respective dimension (1,d) and (1,), and A and b of respective dimension (1,d) and (1,) or (2,d) and (2,), with A[0] = -A[1] and b[0] >= -b[1]. Parameters ---------- Q : (d,d) ndarray, float64, optional Quadratic cost matrix matrix c : (d,) ndarray, float64, optional Linear cost vector A : (n,d) ndarray, float64, optional Linear inequality constraint matrix. b : (n,) ndarray, float64, optional Linear inequality constraint vector. Aeq : (n,d) ndarray, float64, optional Linear equality constraint matrix . beq : (n,) ndarray, float64, optional Linear equality constraint vector. . lb : (d) ndarray, float64, optional Lower bound constraint, -np.inf if not provided. ub : (d) ndarray, float64, optional Upper bound constraint, np.inf if not provided. x0 : (d,) ndarray, float64, optional Initialization. Ones by default. tol : float, optional Tolerance for termination. max_iter : int, optional Maximum number of iterations to perform. verbose : boolean, optional Print optimization information. log : boolean, optional Return a dictionary with optim information in addition to x and val solver : str, optional default='scipy' Available solvers : 'scipy', 'frank-wolfe' Returns ------- x: (d,) ndarray Optimal solution x val: float optimal value of the objective (None if optimization error) log: dict Optional log output """ solver_list = ["scipy", "frank-wolfe"] if solver == "scipy": return _qp_solve_scipy(Q, c, A, b, Aeq, beq, lb, ub, x0, tol, max_iter, verbose, log) elif solver == "frank-wolfe": return _qp_solve_frank_wolfe(Q, c, A, b, Aeq, beq, x0, max_iter) else: raise ValueError("`solver` argument should be included in %s," " got '%s'" % (solver_list, str(solver))) def _qp_solve_scipy(Q, c=None, A=None, b=None, Aeq=None, beq=None, lb=None, ub=None, x0=None, tol=1e-6, max_iter=1000, verbose=False, log=False): r""" Solves a standard quadratic program Solve the following optimization problem: .. math:: \min_x \quad \frac{1}{2}x^TQx + x^Tc lb <= x <= ub Ax <= b A_{eq} x = b_{eq} All constraint parameters are optional, they will be ignored if set as None. Parameters ---------- Q : (d,d) ndarray, float64, optional Quadratic cost matrix matrix c : (d,) ndarray, float64, optional Linear cost vector A : (n,d) ndarray, float64, optional Linear inequality constraint matrix. b : (n,) ndarray, float64, optional Linear inequality constraint vector. Aeq : (n,d) ndarray, float64, optional Linear equality constraint matrix . beq : (n,) ndarray, float64, optional Linear equality constraint vector. . lb : (d) ndarray, float64, optional Lower bound constraint, -np.inf if not provided. ub : (d) ndarray, float64, optional Upper bound constraint, np.inf if not provided. x0 : (d,) ndarray, float64, optional Initialization. Ones by default. tol : float, optional Tolerance for termination. max_iter : int, optional Maximum number of iterations to perform. verbose : boolean, optional Print optimization information. log : boolean, optional Return a dictionary with optim information in addition to x and val Returns ------- x: (d,) ndarray Optimal solution x val: float optimal value of the objective (None if optimization error) log: dict Optional log output """ # Constraints constraints = [] if A is not None: constraints.append(LinearConstraint(A, ub=b)) if Aeq is not None: constraints.append(LinearConstraint(Aeq, lb=beq, ub=beq)) # Objective function if c is None: def func(x): return (1/2) * x @ (Q @ x) def jac(x): return Q @ x else: def func(x): return (1/2) * x @ (Q @ x) + x @ c def jac(x): return Q @ x + c if x0 is None: x0 = np.ones(Q.shape[0]) # Bounds if lb is None and ub is None: bounds = None else: if lb is None: bounds = [(-np.inf, b) for b in ub] elif ub is None: bounds = [(b, np.inf) for b in lb] else: bounds = [(b1, b2) for b1, b2 in zip(lb, ub)] # Optimization results = minimize(func, x0=x0, method="SLSQP", jac=jac, bounds=bounds, constraints=constraints, tol=tol, options={"maxiter": max_iter, "disp": verbose}) if not results.success: warnings.warn(results.message) outputs = (results['x'], results['fun']) if log: outputs += (results,) return outputs def _qp_solve_frank_wolfe(Q, c=None, A=None, b=None, Aeq=None, beq=None, x0=None, max_iter=1000): r""" Solves a quadratic program with Frank-Wolfe algorithm Solve the following optimization problem: .. math:: \min_x \quad \frac{1}{2}x^TQx + x^Tc A_{eq} x = b_{eq} x >= 0 Or, .. math:: \min_x \quad \frac{1}{2}x^TQx + x^Tc A x <= b x >= 0 With Aeq and beq of respective dimension (1,d) and (1,), and A and b of respective dimension (1,d) and (1,) or (2,d) and (2,), with A[0] = -A[1] and b[0] >= -b[1]. Parameters ---------- Q : (d,d) ndarray, float64, optional Quadratic cost matrix matrix c : (d,) ndarray, float64, optional Linear cost vector A : (n,d) ndarray, float64, optional Linear inequality constraint matrix. b : (n,) ndarray, float64, optional Linear inequality constraint vector. Aeq : (n,d) ndarray, float64, optional Linear equality constraint matrix . beq : (n,) ndarray, float64, optional Linear equality constraint vector. x0 : (d,) ndarray, float64, optional Initialization. Ones by default. max_iter : int, optional Maximum number of iterations to perform. Returns ------- x: (d,) ndarray Optimal solution x val: float optimal value of the objective (None if optimization error) """ if Aeq is not None and Aeq.shape[0] > 1: raise ValueError("`Aeq.shape[0]` must be equal to 1" " when using the 'frank-wolfe' solver," " got '%s'" % str(Aeq.shape[0])) if A is not None: if A.shape[0] > 2: raise ValueError("`A.shape[2]` must be lower than 2" " when using the 'frank-wolfe' solver," " got '%s'" % str(A.shape[0])) if A.shape[0] == 2: if not np.allclose(A[0], -A[1]): raise ValueError("`A[0]` must be equal to `-A[1]`" " when using the 'frank-wolfe' solver" " with A.shape[0]=2") if b[0] < -b[1]: raise ValueError("`b[0]` must be greater or equal to" " `-b[1]` when using the 'frank-wolfe'" " solver with A.shape[0]=2") if c is None: def func(x): return (1/2) * x @ (Q @ x) def jac(x): return Q @ x else: def func(x): return (1/2) * x @ (Q @ x) + x @ c def jac(x): return Q @ x + c if Aeq is not None: x = frank_wolfe(jac, Aeq[0], beq[0], beq[0], x0, max_iter) elif A is not None: if A.shape[0] > 1: x = frank_wolfe(jac, A[0], -b[1], b[0], x0, max_iter) else: x = frank_wolfe(jac, A[0], b[0], b[0], x0, max_iter) else: raise ValueError("`A` or `Aeq` must be given when" " using the 'frank-wolfe' solver") return x, func(x) def frank_wolfe(jac, c, clb=1., cub=1., x0=None, max_iter=1000): r"""Frank-Wolfe algorithm for convex programming Solve the following convex optimization problem: .. math:: \min_x \quad f(x) clb <= \\langle x, c \\rangle <= cub x >= 0 Parameters ---------- jac : callable The jacobian of f, return a vector of shape (d,). c : (d,) ndarray, float64 Linear constraint vector. clb : float64, optional, default=1. Lower bound of the linear constraint. cub : float64, optional, default=1. Upper bound of the linear constraint. max_iter : int, optional, default=1000 Maximum number of iterations to perform. Returns ------- x: (d,) ndarray Optimal solution x """ inv_c = 1. / c if x0 is None: x0 = inv_c * ((cub - clb) / 2) / c.shape[0] x = x0 for k in range(1, max_iter+1): grad = jac(x) product = grad * inv_c index = np.argmin(product) vect = np.zeros(c.shape[0]) if product[index] >= 0: vect[index] = inv_c[index] * clb else: vect[index] = inv_c[index] * cub lr = 2. / (k + 1.) x = (1 - lr) * x + lr * vect return x def torch_minimize(loss, x0, tol=1e-6, max_iter=1000, verbose=False): r""" Solves unconstrained optimization problem using pytorch Solve the following optimization problem: .. math:: \min_x \quad loss(x) Parameters ---------- loss : callable Objective function to be minimized. x0 : list of ndarrays or torch.Tensor Initialization. tol : float, optional Tolerance on the gradient for termination. max_iter : int, optional Maximum number of iterations to perform. verbose : bool, optional If True, print the final gradient norm. Returns ------- x: ndarray or list of ndarrays Optimal solution x val: float final value of the objective """ try: import torch except ImportError: raise ImportError("torch_minimize requires pytorch to be installed") if type(x0) not in (list, tuple): x0 = [x0] x0 = [torch.tensor(x, requires_grad=True, dtype=torch.float64) for x in x0] optimizer = torch.optim.LBFGS( x0, max_iter=max_iter, tolerance_grad=tol, line_search_fn="strong_wolfe" ) def closure(): optimizer.zero_grad() loss_value = loss(*x0) loss_value.backward() return loss_value optimizer.step(closure) grad_norm = torch.cat([x.grad.flatten() for x in x0]).abs().max() if verbose: print(f"Final gradient norm: {grad_norm:.2e}") if grad_norm > tol: warnings.warn( "Optimization did not converge. " f"Final gradient maximum value: {grad_norm:.2e} > {tol:.2e}" ) solution = [x.detach().numpy() for x in x0] if len(solution) == 1: solution = solution[0] loss_val = loss(*x0).item() return solution, loss_val