ML-Ensemble is designed to provide an easy user interface. But it is also designed to be extremely flexible, all the wile providing maximum concurrency at minimal memory consumption. The lower-level API that builds the ensemble and manages the computations is constructed in as modular a fashion as possible.
The low-level API introduces a computational graph-like environment that you can
directly exploit to gain further control over your ensemble. In fact, building
your ensemble through the low-level API is almost as straight forward as using the
high-level API. In this tutorial, we will walk through the core
The purpose of the
ParallelProcessing class is to provide a streamlined
interface for scheduling and allocating jobs in a nested sequence of tasks. The
typical case is a sequence of
Layer instances where the output of one layer
becomes the input to the next. While the layers must therefore be fitted sequentially,
each layer should be fitted in parallel. We might be interested in propagating some of the
features from one layer to the next, in which case we need to take care of the array allocation.
Basic map ¨¨¨¨¨¨¨¨¨
In the simplest case, we have a
caller that has a set of
task``s that needs to be
evaluated in parallel. For instance, the ``caller might be a
each task being a fit job for a given cross-validation fold. In this simple case,
we want to perform an embarrassingly parallel for-loop of each fold, which we can
achieve with the
map method of the
from mlens.parallel import ParallelProcessing, Job, Learner from mlens.index import FoldIndex from mlens.utils.dummy import OLS import numpy as np np.random.seed(2) X = np.arange(20).reshape(10, 2) y = np.random.rand(10) indexer = FoldIndex(folds=2) learner = Learner(estimator=OLS(), indexer=indexer, name='ols') manager = ParallelProcessing(n_jobs=-1) out = manager.map(learner, 'fit', X, y, return_preds=True) print(out)
[[0.3665536 ] [0.36335272] [0.3601518 ] [0.3569509 ] [0.35375 ] [0.48689735] [0.52471155] [0.56252575] [0.60033995] [0.63815415]]
Stacking a set of parallel jobs¶
Suppose instead that we have a sequence of learners, where we want to fit
each on the errors of the previous learner. We can achieve this by using
stack method and a preprocessing pipeline for computing the errors.
First, we need to construct a preprocessing class to transform the input,
which will be the preceding learner’s predictions, into errors.
from mlens.parallel import Transformer, Pipeline from mlens.utils.dummy import Scale from sklearn.base import BaseEstimator, TransformerMixin def error_scorer(p, y): return np.abs(p - y) class Error(BaseEstimator, TransformerMixin): """Transformer that computes the errors of a base learners""" def __init__(self, scorer): self.scorer = scorer def fit(self, X, y): return self def transform(self, X, y): return self.scorer(X, y), y
Now, we construct a sequence of tasks to compute, where the output of one
task will be the input to the next. Hence, we want a sequence of the form
[learner, transformer, ..., learner]:
tasks =  for i in range(3): if i != 0: pipeline = Pipeline([('err', Error(error_scorer))], return_y=True) transformer = Transformer( estimator=pipeline, indexer=indexer, name='sc-%i' % (i + 1) ) tasks.append(transformer) learner = Learner( estimator=OLS(), preprocess='sc-%i' % (i+1) if i != 0 else None, indexer=indexer, name='ols-%i' % (i + 1) ) tasks.append(learner)
To fit the stack, we call the
stack method on the
manager, and since
each learner must have access to their transformer, we set
(otherwise each task will have a separate sub-cache, sealing them off
from each other).
out = manager.stack( tasks, 'fit', X, y, return_preds=True, split=False) print(out)
[[0.22054635] [0.22811265] [0.235679 ] [0.24324532] [0.25081167] [0.27373537] [0.23783192] [0.20192847] [0.16602501] [0.09709236]]
If we instead want to append these errors as features, we can simply
alter our transformer to concatenate the errors to the original data.
Alternatively, we can automate the process by instead using the
Manual initialization and processing¶
Under the hood, both
stack first call
initialize on the
manager, followed by a call to
process with some default arguments.
For maximum control, we can manually do the initialization and processing step.
When we initialize, an instance of
Job is created that collect arguments
relevant for of the job as well as handles for data to be used. For instance,
we can specify that we want the predictions of all layers, as opposed to just the
out = manager.initialize( 'fit', X, y, None, return_preds=['ols-1', 'ols-3'], stack=True, split=False)
initialize method primarily allocates memory of input data and
puts it on the
job instance. Not that if the input is a string pointing
to data on disk,
initialize will attempt to load the data into memory.
If the backend of the manger is
threading, keeping the data on the parent
process is sufficient for workers to reach it. With
the backend, data will be memory-mapped to avoid serialization.
initialize method returns an
out dictionary that specified
what type of output we want when running the manager on the assigned job.
To run the manager, we call
process with out
out = manager.process(tasks, out) print(out)
[array([[0.3665536 ], [0.36335272], [0.3601518 ], [0.3569509 ], [0.35375 ], [0.48689735], [0.52471155], [0.56252575], [0.60033995], [0.63815415]], dtype=float32), array([[0.22054635], [0.22811265], [0.235679 ], [0.24324532], [0.25081167], [0.27373537], [0.23783192], [0.20192847], [0.16602501], [0.09709236]], dtype=float32)]
The output now is a list of arrays, the first contains the same predictions
as we got in the
map call, the last is the equivalent to the predicitons
we got in the
stack call. Note that this functionality is available
also in the
When running the manager, it will read and write to memory buffers. This is
less of a concern when the
threading backend is used, as data is kept
in the parent process. But when data is loaded from file path, or when
multiprocessing is used, we want to clean up after us. Thus, when we
are through with the
manager, it is important to call the
method. This will however destroy any ephemeral data stored on the instance.
clear method will remove any files in the specified path.
If the path specified in the
initialize call includes files other than
those generated in the
process call, these will ALSO be removed.
ALWAYS use a clean temporary cache for processing jobs.
To minimize the risk of forgetting this last step, the
class can be used as context manager, automatically cleaning up the cache
when exiting the context:
learner = Learner(estimator=OLS(), indexer=indexer) with ParallelProcessing() as mananger: manager.stack(learner, 'fit', X, y, split=False) out = manager.stack(learner, 'predict', X, split=False)
Total running time of the script: ( 0 minutes 0.817 seconds)