Source code for mlens.config

"""ML-Ensemble

:author: Sebastian Flennerhag
:license: MIT
:copyright: 2017-2018

Global backend configurations.

Variables

1. ``DTYPE``: data type of prediction arrays. Must be a numpy dtype.
   Default is ``float32``.

2. ``TMPDIR``: path to directory where temprorary caches will be hosted.
   Default is to use system ``tmp`` structure.

3. ``PREFIX``: cache prefix. Default is ``'.mlens_tmp_cache_'``

4. ``BACKEND``: global default backend. Default is ``'threading'``

5. ``START_METHOD``: global start method (if ``backend='multiprocessing'``)
   Default is ``'fork'``

6. ``VERBOSE``: verbose import. Set to ``Y`` for verbose. Needs to be
   set before import (i.e. ``export MLENS_VERBOSE=0``).

7. ``IVALS``: load exception handling interval. Default is ``(0.01, 120)``.

Environmental variables can be set by ::

    export MLENS_[VARIABLE]=VALUE

For changing defaults during a session, use
``set_[variable]`` and ``get_[variable]``, where ``[variable]`` is replaced
with the lower case name of the environmental variable to change.

Changing global configurations in-session is experimental: Please report any
unexpected behavior.
"""
# pylint: disable=protected-access
# pylint: disable=global-statement
# pylint: disable=not-callable

from __future__ import print_function

import os
import sys
import shutil
import tempfile
import warnings
import sysconfig
import subprocess
from multiprocessing import current_process

import numpy

###############################################################################
# Variables

_DTYPE = getattr(numpy, os.environ.get('MLENS_DTYPE', 'float32'))
_TMPDIR = os.environ.get('MLENS_TMPDIR', tempfile.gettempdir())
_PREFIX = os.environ.get('MLENS_PREFIX', ".mlens_tmp_cache_")
_BACKEND = os.environ.get('MLENS_BACKEND', 'threading')
_START_METHOD = os.environ.get('MLENS_START_METHOD', '')
_VERBOSE = os.environ.get('MLENS_VERBOSE', 'Y')

_IVALS = os.environ.get('MLENS_IVALS', '0.01_120').split('_')
_IVALS = (float(_IVALS[0]), float(_IVALS[1]))

_PY_VERSION = float(sysconfig._PY_VERSION_SHORT)


###############################################################################
# dispatcjh configs

[docs]def get_ivals(): """Return _IVALS""" return _IVALS
[docs]def get_dtype(): """Return dtype""" return _DTYPE
[docs]def get_prefix(): """Return cache prefix""" return _PREFIX
[docs]def get_backend(): """Return backend""" return _BACKEND
[docs]def get_start_method(): """Return start method""" return _START_METHOD
[docs]def get_tmpdir(): """Return start method""" return _TMPDIR
############################################################################### # Configuration calls
[docs]def set_tmpdir(tmp): """Set the root directory for temporary caches during estimation. Parameters ---------- tmp : str directory path """ global _TMPDIR _TMPDIR = tmp
[docs]def set_prefix(prefix): """Set the prefix assigned to temporary directories during estimation. Parameters ---------- prefix : str cache file name prefix """ global _PREFIX _PREFIX = prefix
[docs]def set_dtype(dtype): """Set the dtype to use during estimation. Parameters ---------- dtype : object numpy dtype """ global _DTYPE _DTYPE = dtype
[docs]def set_backend(backend): """Set the parallel backend to use during estimation. Parameters ---------- backend : str backend type, one of 'multiprocessing', 'threading', 'sequential' """ global _BACKEND _BACKEND = backend
[docs]def set_start_method(method): """Set the method for starting multiprocess worker pool. Parameters ---------- method : str Methods available: 'fork', 'spawn', 'forkserver'. """ global _START_METHOD _START_METHOD = method os.environ['JOBLIB_START_METHOD'] = _START_METHOD
[docs]def set_ivals(interval, limit): """Set the parallel backend to use during estimation. Parameters ---------- interval : int number of seconds between each check limit : int number of seconds to wait. """ global _IVALS _IVALS = (interval, limit)
def __get_default_start_method(method): """Determine default backend.""" # Check for environmental variables win = sys.platform.startswith('win') or sys.platform.startswith('cygwin') if method == '': method = 'fork' if not win else 'spawn' return method ############################################################################### # Handlers
[docs]def clear_cache(tmp): """ Check that cache directory is empty. Checks that a specified directory do not contain any directories with the ML-Ensemble temporary cache signature. Attempts to remove any found directories. Parameters ---------- tmp : str the directory to check for residual caches in. """ global _PREFIX residuals = [i for i in os.walk(tmp) if os.path.split(i[0])[-1].startswith(_PREFIX)] n = len(residuals) if n > 0: print("[MLENS] Found %i residual cache(s):" % n, file=sys.stderr) size = 0 for i, res in enumerate(residuals): s = os.path.getsize(res[0]) size += s print(" %i (%i): %s" % (i + 1, s, res[0]), file=sys.stderr) print(" Total size: %i\n[MLENS] Removing..." % size, end=" ", file=sys.stderr) for res in residuals: try: shutil.rmtree(res[0]) except OSError: try: subprocess.Popen('rmdir /S /Q %s' % res[0], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except OSError: warnings.warn("Failed to delete cache at %s." % res[0]) print("done.", file=sys.stderr)
############################################################################### # Set up def print_settings(): """Print package settings on system.""" if _VERBOSE != 'Y': return if _BACKEND == 'threading': msg = "[MLENS] backend: %s" arg = _BACKEND, else: msg = "[MLENS] backend: %s | start method: %s" arg = (_BACKEND, _START_METHOD) print(msg % arg, file=sys.stderr) if current_process().name == 'MainProcess': _START_METHOD = __get_default_start_method(_START_METHOD) set_start_method(_START_METHOD) print_settings() clear_cache(_TMPDIR)