Source code for mlens.parallel.base

"""ML-Ensemble

:author: Sebastian Flennerhag
:copyright: 2017-2018
:license: MIT

Base classes for parallel estimation


Schedulers for global setups:
   0:
      Base setups - independent of other features:
         IndexMixin._setup_0_index

   1:
      Global setups - reserved for aggregating classes:
         Layer._setup_1_global

   2:
      Dependents on 0:
         ProbaMixin.__setup_2_multiplier

   3:
      Dependents on 0, 2:
         OutputMixin.__setup_3__output_columns

Note that schedulers are experimental and may change without a deprecation
cycle.
"""
import warnings
from abc import abstractmethod
import numpy as np

from ._base_functions import check_stack, check_params
from .. import config
from ..utils.exceptions import ParallelProcessingError
from ..externals.sklearn.base import clone, BaseEstimator as _BaseEstimator


[docs]class ParamMixin(_BaseEstimator, object): """Parameter Mixin Mixin for protecting static parameters from changes after fitting. .. Note:: To use this mixin the instance inheriting it must set ``__static__=list()`` and ``_static_fit_params_=dict()`` in ``__init__``. """
[docs] def _store_static_params(self): """Record current static params for future comparison.""" if self.__static__: for key, val in self.get_params(deep=False).items(): if key in self.__static__: self._static_fit_params[key] = clone(val, safe=False)
[docs] def _check_static_params(self): """Check if current static params are identical to previous params""" current_static_params = { k: v for k, v in self.get_params(deep=False).items() if k in self.__static__} return check_params(self._static_fit_params, current_static_params)
[docs]class IndexMixin(object): """Indexer mixin Mixin for handling indexers. .. note:: To use this mixin the instance inheriting it must set the ``indexer`` or ``indexers`` attribute in ``__init__`` (not both). """ @property def __indexer__(self): """Flag for existence of indexer""" return hasattr(self, 'indexer') or hasattr(self, 'indexers')
[docs] def _check_indexer(self, indexer): """Check consistent indexer classes""" cls = indexer.__class__.__name__.lower() if 'index' not in cls: ValueError("Passed indexer does not appear to be valid indexer") lcls = [idx.__class__.__name__.lower() for idx in self._get_indexers()] if lcls: if 'blendindex' in lcls and cls != 'blendindex': raise ValueError( "Instance has blendindex, but was passed full type") elif 'blendindex' not in lcls and cls == 'blendindex': raise ValueError( "Instance has full type index, but was passed blendindex")
[docs] def _get_indexers(self): """Return list of indexers""" if not self.__indexer__: raise AttributeError("No indexer or indexers attribute available") indexers = [getattr(self, 'indexer', None)] if None in indexers: indexers = getattr(self, 'indexers', [None]) return indexers
def _setup_0_index(self, X, y, job): indexers = self._get_indexers() for indexer in indexers: indexer.fit(X, y, job)
[docs]class OutputMixin(IndexMixin): """Output Mixin Mixin class for interfacing with ParallelProcessing when outputs are desired. .. note:: To use this mixin the instance inheriting it must set the ``feature_span`` attribute and ``__no_output__`` flag in ``__init__``. """
[docs] @abstractmethod def set_output_columns(self, X, y, job, n_left_concats=0): """Set output columns for prediction array""" pass
[docs] def _setup_3_output_columns(self, X, y, job, n_left_concats=0): """Set output columns for prediction array. Used during setup""" if not self.__no_output__: self.set_output_columns(X, y, job, n_left_concats)
[docs] def shape(self, job): """Prediction array shape""" if not hasattr(self, 'feature_span'): raise ParallelProcessingError( "Instance dose not set the feature_span attribute " "in the constructor.") if not self.feature_span: raise ValueError("Columns not set. Call set_output_columns.") return self.size(job), self.feature_span[1]
[docs] def size(self, attr): """Get size of dim 0""" if attr not in ['n_test_samples', 'n_samples']: attr = 'n_test_samples' if attr != 'predict' else 'n_samples' indexers = self._get_indexers() sizes = list() for indexer in indexers: sizes.append(getattr(indexer, attr)) sizes = np.unique(sizes) if not sizes.shape[0] == 1: warnings.warn( "Inconsistent output sizes generated by indexers " "(sizes: %r from indexers %r).\n" "outputs will be zero-padded" % (sizes.tolist(), indexers)) return max(sizes) return sizes[0]
[docs]class ProbaMixin(object): """"Probability Mixin Mixin for probability features on objects interfacing with :class:`~mlens.parallel.backend.ParallelProcessing` .. note:: To use this mixin the instance inheriting it must set the ``proba`` and the ``_classes(=None)``attribute in ``__init__``. """ def _setup_2_multiplier(self, X, y, job=None): if self.proba and y is not None: self.classes_ = y def _get_multiplier(self, X, y, alt=1): if self.proba: multiplier = self.classes_ else: multiplier = alt return multiplier @property def _predict_attr(self): return 'predict' if not self.proba else 'predict_proba' @property def classes_(self): """Prediction classes during proba""" return self._classes @classes_.setter def classes_(self, y): """Set classes given input y""" self._classes = np.unique(y).shape[0]
[docs]class BaseBackend(object): """Base class for parallel backend Implements default backend settings. """
[docs] def __init__(self, backend=None, n_jobs=-1, dtype=None, raise_on_exception=True): self.n_jobs = n_jobs self.dtype = dtype if dtype is not None else config.get_dtype() self.backend = backend if backend is not None else config.get_backend() self.raise_on_exception = raise_on_exception
@abstractmethod def __iter__(self): yield
[docs]class BaseParallel(BaseBackend): """Base class for parallel objects Parameters ---------- name : str name of instance. Should be unique. backend : str or object (default = 'threading') backend infrastructure to use during call to :class:`mlens.externals.joblib.Parallel`. See Joblib for further documentation. To set global backend, see :func:`~mlens.config.set_backend`. raise_on_exception : bool (default = True) whether to issue warnings on soft exceptions or raise error. Examples include lack of layers, bad inputs, and failed fit of an estimator in a layer. If set to ``False``, warnings are issued instead but estimation continues unless exception is fatal. Note that this can result in unexpected behavior unless the exception is anticipated. verbose : int or bool (default = False) level of verbosity. n_jobs : int (default = -1) Degree of concurrency in estimation. Set to -1 to maximize, 1 runs on a single process (or thread). dtype : obj (default = np.float32) data type to use, must be compatible with a numpy array dtype. """
[docs] def __init__(self, name, *args, **kwargs): super(BaseParallel, self).__init__(*args, **kwargs) self.name = name self.__no_output__ = False
[docs] @abstractmethod def __iter__(self): """Iterator for process manager""" yield
[docs] def setup(self, X, y, job, skip=None, **kwargs): """Setup instance for estimation""" skip = ['_setup_%s' % s for s in skip] if skip else [] funs = [f for f in dir(self) if f.startswith('_setup_') and f not in skip] for f in sorted(funs): func = getattr(self, f) args = func.__func__.__code__.co_varnames fargs = {k: v for k, v in kwargs.items() if k in args} func(X, y, job, **fargs)
[docs]class BaseEstimator(ParamMixin, _BaseEstimator, BaseParallel): """Base Parallel Estimator class Modified Scikit-learn class to handle backend params that we want to protect from changes. """
[docs] def __init__(self, *args, **kwargs): super(BaseEstimator, self).__init__(*args, **kwargs) self.__static__ = list() self._static_fit_params = dict()
[docs] def get_params(self, deep=True): out = super(BaseEstimator, self).get_params(deep=deep) for name in BaseBackend.__init__.__code__.co_varnames: if name not in ['self']: out[name] = getattr(self, name) return out
@property @abstractmethod def __fitted__(self): """Fit status""" return self._check_static_params()
[docs]class BaseStacker(BaseEstimator): """Base class for instanes that stack job estimators"""
[docs] def __init__(self, stack=None, verbose=False, *args, **kwargs): super(BaseStacker, self).__init__(*args, **kwargs) if stack and not isinstance(stack, list): raise ValueError("Stack must be a list. Got %r:" % type(stack)) self.stack = stack if stack else list() self._verbose = verbose
[docs] @abstractmethod def __iter__(self): yield
[docs] def push(self, *stack): """Push onto stack""" check_stack(stack, self.stack) for item in stack: self.stack.append(item) attr = item.name.replace('-', '_').replace(' ', '').strip() setattr(self, attr, item) return self
[docs] def replace(self, idx, item): """Replace a current member of the stack with a new instance""" attr = item.name.replace('-', '_').replace(' ', '').strip() setattr(self, attr, item) self.stack[idx] = item
[docs] def pop(self, idx): """Pop a previous push with index idx""" return self.stack.pop(idx)
[docs] def get_params(self, deep=True): """Get parameters for this estimator. Parameters ---------- deep : boolean, optional whether to return nested parameters. """ out = super(BaseStacker, self).get_params(deep=deep) if not deep: return out for item in self.stack: out[item.name] = item for key, val in item.get_params(deep=True).items(): out['%s__%s' % (item.name, key)] = val return out
@property def __fitted__(self): """Fitted status""" if not self.stack or not self._check_static_params(): return False return all([g.__fitted__ for g in self.stack]) @property def __stack__(self): """Check stack""" if not isinstance(self.stack, list): raise ValueError( "Stack corrupted. Extected list. Got %r" % type(self.stack)) return len(self.stack) > 0 @property def verbose(self): """Verbosity""" return self._verbose @verbose.setter def verbose(self, verbose): """Set verbosity""" self._verbose = verbose for g in self.stack: g.verbose = verbose