Source code for fyrd.conf

# -*- coding: utf-8 -*-
"""
Get and set config file options.

The functions defined here provide an easy way to access the config file
defined by CONFIG_FILE (default ~/.fyrd/config.txt) and the config.get('jobs',
'profile_file') (default ~/.fyrd/profile.txt).

Both files are managed by Python's ConfigParser class.

To work with profiles, use the `get_profile()` and `set_profile()` functions.
Note that all options must be allowed in the `options.py` file before they can
be added to a profile.

Options will also be pre-sanitized before being added to profile. e.g. 'mem':
    '2GB' will become 'mem': 2000.

Functions
---------
get_option: Get a single key or section.
set_option: Set a single key or section.
set_option: Set a single key or section.
delete: Delete a config item.
create_config: Create an initial config from a dictionary and the defaults
create_config_interactive: Interact with the user to create a new config

Examples
--------
>>> import fyrd
>>> i = fyrd.conf.get_option('queue', 'sleep_len')
>>> c = fyrd.conf.set_option('queue', 'sleep_len', 0.125)
>>> fyrd.conf.get_option('queue', 'sleep_len')
0.125

"""
from __future__ import print_function
import re
import os       as _os
import readline as _rl
from textwrap import dedent as _dnt
try:
    import configparser as _configparser
except ImportError:
    import ConfigParser as _configparser

import six

from . import run     as _run
from . import logme   as _logme
from . import batch_systems as _batch
_opt = _batch.options


###############################################################################
#                            Configurable Defaults                            #
###############################################################################

"""
Where configuration files will be kept
"""
CONFIG_PATH  = _os.path.join(_os.environ['HOME'], '.fyrd')

"""
Where the main config will be kept.
"""
CONFIG_FILE  = _os.path.join(CONFIG_PATH, 'config.txt')

# Set default options
DEFAULTS = {
    'queue': {
        'max_jobs':         1000,
        'sleep_len':        1,
        'queue_update':     2,
        'res_time':         2700, # Time to wait if job is postponed before fail
        'queue_type':       'auto',
        'sbatch':           None, # Path to sbatch command
        'qsub':             None, # Path to qsub command
        'progressbar':      True,
        'queue_maximums':   _os.path.join(
            CONFIG_PATH, 'queue_maximums.json'
        ),
    },
    'jobs': {
        'clean_files':     True,
        'clean_outputs':   False,
        'file_block_time': 30,
        'scriptpath':      None,
        'outpath':         None,
        'suffix':          'cluster',
        'auto_submit':     True,
        'generic_python':  False,
        'profile_file':    _os.path.join(
            CONFIG_PATH, 'profiles.txt'
        )
    },
    'notify': {
        'mode': 'linux',  # Or SMTP
        'wait_time': 240, # time in seconds
        'notify_address': None,
        'smtp_host': 'smtp.gmail.com',
        'smtp_port': 587,
        'smtp_tls': True,
        'smtp_from': None,
        'smtp_user': None,
        'smtp_passfile': None
    },
    'local': {
        'server_uri':      None,
        'max_jobs':        None,
        'local_clean_days': 7,
    }
}

CONF_HELP = {
    'summary': _dnt(
        """
        The following options and sections are recognized and defined by the
        DEFAULTS dictionary in the config.py file. The can be updated in the
        config file.

        Any options added to the config file not present here will be ignored
        or deleted, any options added which do not match the types below will
        be overwritten with the defaults.
        """
    ),
    'queue': _dnt(
        """
        [queue]
        Define options for queue handling

        Options
        -------
        max_jobs : int
            sets the maximum number of running jobs before submission will
            pause and wait for the queue to empty
        sleep_len : int
            sets the amount of time the program will wait between submission
            attempts
        queue_update : int
            sets the amount of time between refreshes of the queue.
        res_time : int
            Time in seconds to wait if a job is in an uncertain state, usually
            preempted or suspended. These jobs often resolve into running or
            completed again after some time so it makes sense to wait a bit,
            but not forever.  The default is 45 minutes: 2700 seconds.
        queue_type : str
            the type of queue to use, one of the batch systems (e.g. 'slurm')
            or 'auto'. Default is auto to auto-detect the queue.
        sbatch : str
            A path to the sbatch executable, only required for slurm mode if
            sbatch is not in the PATH.
        qsub : str
            A path to the qsub executable, only required for torque mode if
            sbatch is not in the PATH.
        progressbar : bool
            Show a progress bar when waiting for jobs
        queue_maximums : str
            Path to a JSON file that sets the queue maximums, the file must
            be in the format {'<queue>': {'max_<keyword>': val}}, where
            <queue> is a batch system partition/queue and <keyword> is a valid
            keyword argument (e.g. cores, mem, time, etc)
        """
    ),
    'jobs': _dnt(
        """
        [jobs]
        Set the options for managing job submission and getting

        Options
        -------
        clean_files : bool
            means that by default files will be deleted when job completes
        clean_outputs : bool
            is the same but for output files (they are saved first)
        file_block_time : int
            Max amount of time to block after job completes in the queue while
            waiting for output files to appear.  Some queues can take a long
            time to copy files under load, so it is worth setting this high, it
            won't block unless the files do not appear.
        scriptpath : str
            Path to write all script files by default, must be globally cluster
            accessible. Note: this is *not* the runtime path, just where files
            are written to.
        outpath : str
            Path to write all output files to by by default, must be globally
            cluster accessible.
        suffix : str
            The suffix to use when writing scripts and output files
        auto_submit : bool
            If wait() or get() are called prior to submission, auto-submit the
            job. Otherwise throws an error and returns None
        generic_python : bool
            Use /usr/bin/env python instead of the current executable, not
            advised, but sometimes necessary.
        profile_file : str
            the config file where profiles are defined.
        """
    ),
    'notify': _dnt(
        """
        [notify]
        User notification option, used mostly to email completion info.

        Options
        -------
        mode : 'linux', 'smtp', or None
            If mode is linux, all other entries optional and `mail` is used.
            If mode is smtp, all other entries required, `smtp_passfile` must
            point to a read only file accessible only to the user that
            contains the password for smtp.
        wait_time : int
            Time to wait before auto-email, in seconds.
        notify_address : str
            The address to send an email to. Can also be specified at runtime
        smtp_host : str
            e.g. smtp.gmail.com
        smtp_port : int
            e.g. 587
        smtp_tls : bool
            Use TLS?
        smtp_from : str
            Server email address, e.g. a dedicated gmail adrress
        smtp_user : str, optional
            If not provided, smtp_from used
        smtp_passfile : str
            Path to a plain text, read-only, user accessible password file with
            the SMTP password. Perissions should be 400.
        """
    ),
    'local': _dnt(
        """
        [local]
        Set options just for the local queue batch system.

        Options
        -------
        server_uri : str, optional
            Hardcode the server_uri for the local queue server, this allows
            you to set the server as a remote machine if you want, but you
            must have the remote drives mounted in that case, these mounted
            drives need only be in the 'scriptpath' and 'outpath' directories.
        max_jobs : int, optional
            Set the maximum locally running jobs, defaults to all available
            cores minus 1.
        local_clean_days : int, optional
            The number of days to keep jobs in the local queue. Any jobs older
            than this will be purged from the database
        """
    )
}

# Pre-defined profiles, 'DEFAULT' is required.
DEFAULT_PROFILES = {
    'DEFAULT':     {'nodes': 1,
                    'cores': 1,
                    'mem':   4000,
                    'time':  '04:00:00'},
    'large':       {'nodes': 1,
                    'cores': 16,
                    'mem':   32000,
                    'time':  '24:00:00'},
    'small':       {'nodes': 1,
                    'cores': 1,
                    'mem':   1000,
                    'time':  '01:00:00'},
    'long':        {'nodes': 1,
                    'cores': 1,
                    'mem':   4000,
                    'time':  '96:00:00'},
    'small_clean': {'nodes': 1,
                    'cores': 1,
                    'mem':   1000,
                    'time':  '01:00:00',
                    'clean_files':   True,
                    'clean_outputs': True},
}
"""
This defines the default profiles that will be stored in the profile file.
It is intended mostly as an example and should be edited in the profile file
to customize the profiles for each cluster.

The only required profile is 'DEFAULT', it must be set and is the fallback
profile if no profile is requested on job creation.
"""


###############################################################################
#                         Do Not Edit Below This Point                        #
###############################################################################

# Create the global config objects
config   = _configparser.ConfigParser(allow_no_value=True)
"""
This is the globally accessible ConfigParser object for the config.txt file.
"""
profiles = _configparser.ConfigParser(defaults=DEFAULT_PROFILES['DEFAULT'],
                                      allow_no_value=True)
"""
This is the globally accessible ConfigParser object for handling profiles.
"""

__all__ = ['set_option', 'get_option', 'delete', 'create_config',
           'create_config_interactive', 'set_profile',
           'get_profile', 'del_profile', 'create_profiles']


###############################################################################
#                        Config Manipulation Functions                        #
###############################################################################


def get_executable(section, key, default):
    """Get an executable (e.g. qsub) from the conf if it exists.

    Parameters
    ----------
    section : str
        The config section to use (e.g. queue), if None, all sections returned.
    key : str
        The config key to get (e.g. 'max_jobs'), if None, whole section
        returned.
    default
        If the key does not exist, create it with this default value.

    Returns
    -------
    str or None
        Path to executable if found, else None
    """
    exe = get_option(section, key, default)
    if not _os.path.dirname(exe):
        exe = _run.which(exe)
    if _run.is_exe(exe):
        return exe
    return None


[docs]def get_option(section=None, key=None, default=None): """Get a single key or section. All args are optional, if they are missing, the parent section or entire config will be returned. Parameters ---------- section : str The config section to use (e.g. queue), if None, all sections returned. key : str The config key to get (e.g. 'max_jobs'), if None, whole section returned. default If the key does not exist, create it with this default value. Returns ------- option_value Option value if key exists, None if no key exists. See Also -------- set_option : Set an option get_config : Get the entire config """ load_config() if not section: out = _config_to_dict(config) elif section and section not in _sections(config): _logme.log('{} not in the config file'.format(section), 'error') if section in DEFAULTS: _logme.log('CreAting {} from DEFAULTS'.format(section), 'info') config.add_section(section) _config_from_dict(config, DEFAULTS[section], section=section) write_config() out = get_option(section, key, default) else: raise ValueError('Section not in the config file or DEFAULTS') elif key: sect = _section_to_dict(config.items(section)) if key in sect: out = sect[key] else: _logme.log('{} not in the {} section of the config file' .format(key, section), 'warn') if default: _logme.log('Creating new config entry {}:{} with val {}' .format(section, key, default)) set_option(section, key, default) out = get_option(section, key) elif key in DEFAULTS[section]: _logme.log('Creating new config entry {}:{} with val {}' .format(section, key, DEFAULTS[section][key])) set_option(section, key, DEFAULTS[section][key]) out = get_option(section, key) else: raise ValueError('Option not in the config file') elif section: _logme.log('Getting the whole section: {}'.format(section), 'debug') out = _section_to_dict(config.items(section)) else: _logme.log('No option specified, returning config dictionary', 'debug') out = _config_to_dict(config) return _typecast_items(out)
[docs]def set_option(section, key, value): """Write a config key to the config file. Parameters ---------- section : str Section of the config file to use. key : str Key to add. value Value to add for key. Returns ------- ConfigParser """ # Sanitize arguments section = str(section) key = str(key) value = str(value) # Edit the globals in this file load_config() if not config.has_section(section): _logme.log('The {} section is not in the config file and cannot be' .format(section) + 'and cannot be created', 'warn') return None config.set(section, key, value) write_config() return config
[docs]def delete(section, key): """Delete a config item. Parameters ---------- section : str Section of config file. key : str Key to delete Returns ------- ConfigParger """ load_config() config.remove_option(section, key) write_config() return config
[docs]def load_config(): """Load config from the config file. If any section or key from DEFAULTS is not present in the config, it is added back, enforcing a minimal configuration. Returns ------- ConfigParser """ if _os.path.isfile(CONFIG_FILE): config.read(CONFIG_FILE) else: create_config() for section in DEFAULTS: if section not in _sections(config): config.add_section(section) _config_from_dict(config, DEFAULTS[section], section) write_config() for key, val in DEFAULTS[section].items(): if key not in dict(config.items(section)): config.set(section, key, str(val)) write_config() return config
def get_config(): """Return a dictionary representation of the entire config.""" return _config_to_dict(load_config())
[docs]def write_config(): """Write the current config to CONFIG_FILE.""" with open(CONFIG_FILE, 'w') as fout: config.write(fout)
############################################################################### # Initialization Functions # ###############################################################################
[docs]def create_config_interactive(prompt=True): """Interact with the user to create a new config. Uses readline autocompletion to make setup easier. Parameters ---------- prompt : bool As for confirmation before beginning wizard. """ # Use tab completion t = _TabCompleter() _rl.set_completer_delims('\t') _rl.parse_and_bind("tab: complete") # Get permission if prompt: t.createListCompleter(['y', 'n']) _rl.set_completer(t.list_completer) print("Do you want to initialize your config at {}" .format(CONFIG_FILE)) print("This will erase your current configuration (if it exists)") choice = _run.get_input("Initialize config? [y/N] ").strip().lower() if not choice == 'y': return cnf = DEFAULTS # Get path _rl.set_completer(_path_completer) # Not implemented yet # print("\nThis module uses a database to store job information.", # "This database should remain relatively small, but can get quite", # "large if many jobs are submitted at once.\n" # "It only needs to be accessible from the submit host, but should", # "be somewhere with sufficient disk space (>500MB free).") # print("Where would you like to put the db file?\n") # file_path = _os.path.expanduser( # _run.get_input('PATH: [{}] '.format(config.get('queue', 'db'))) # ).strip(' ').lower() # file_path = file_path if file_path else cnf['queue']['db'] # cnf['queue']['db'] = _os.path.expanduser(file_path) print("We store job profile information in a small config file.") file_path = _os.path.expanduser( _run.get_input('Where would you like that file to go? [{}]' .format( _run.exp_file(config.get('jobs', 'profile_file')) )) ).strip(' ').lower() file_path = file_path if file_path else cnf['jobs']['profile_file'] cnf['jobs']['profile_file'] = _run.exp_file(file_path) # Temp file directory print("\nIt is possible to write all temporary and output files to a single", "temp file directory, regardless of where the job is run from.\n" + "As this library allows you to retrieve output files from the class", "directly, this is a good way of keeping your work directory tidy.", "If you do not choose to have files auto-deleted though, you will", "need to keep the directory tidy yourself.\n" "The value of this option is that if you run from a machine where", "some places are not cluster-accessible, jobs will run anyway if", "just one directory is accessible to the cluster.\n" "This option can always be overridden at runtime with the filepath", "argument.\nIf you leave the below option blank, the default will", "be to use the runtime path, which is usually the current working", "directory.\n") t.createListCompleter(['y', 'n']) _rl.set_completer(t.list_completer) if _run.get_yesno('Would you like to set a script file path?', 'y'): file_path = _get_set_path('Where would you like to put that file?') cnf['jobs']['scriptpath'] = file_path if _run.get_yesno('Would you like to set an output file path?', 'n'): file_path = _get_set_path('Where would you like to put that file?') cnf['jobs']['outpath'] = file_path # Cleaning t.createListCompleter(['y', 'n']) _rl.set_completer(t.list_completer) print('\nWe can automatically delete script and/or output files after', 'results have been retrieved.\n' 'This option can be overridden at run time on a per-job basis.\n' 'Do you want to autoclean:\n') clean_files = _run.get_yesno('Autoclean script files?', 'y') cnf['jobs']['clean_files'] = clean_files clean_outs = _run.get_yesno('Autoclean output files (e.g. .out and .err)?', 'n') cnf['jobs']['clean_outputs'] = clean_outs # Wait times t.createListCompleter(cnf['queue'].values()) _rl.set_completer(t.list_completer) max_len = _run.get_input("\nWhat is the maximum number of jobs allowed " + "in your queue? [{}] " .format(cnf['queue']['max_jobs'])) max_len = max_len if max_len else cnf['queue']['max_jobs'] cnf['queue']['max_jobs'] = int(max_len) # Options print('\nIs there a default queue you wish to submit to if no other', 'options are given?\nIf so enter the name below, or leave blank', 'to ignore.\n') def_queue = _run.get_input('Default queue: ').strip() print('\nThank you, configuring options.\n' 'Please edit the file directly to inspect or edit your config.\n') create_config(cnf, def_queue)
[docs]def create_config(cnf=None, def_queue=None): """Create an initial config file. Gets all information from the file-wide DEFAULTS constant and overwrites specific keys using the values in cnf. This means that any records in the cnf dict that are not present in DEFAULTS will be ignored, and any records that are absent will be populated from DEFAULTS. Parameters ---------- cnf : dict A dictionary of config defaults. def_queue : str A name for a queue to add to the default profile. """ global config config = _configparser.ConfigParser(allow_no_value=True) if _os.path.exists(CONFIG_FILE): _os.remove(CONFIG_FILE) init_conf = {} if not cnf or not isinstance(cnf, dict): cnf = {} for section, items in DEFAULTS.items(): init_conf[section] = {} if not section in cnf: cnf[section] = {} for key, value in items.items(): val = cnf[section][key] if key in cnf[section] else value init_conf[section][key] = str(val) _config_from_dict(config, init_conf) with open(CONFIG_FILE, 'w') as fout: config.write(fout) if def_queue: def_prof = get_profile('DEFAULT') def_prof.args.update({'partition': def_queue}) def_prof.write()
############################################################################### # Config Helper Fuctions # ############################################################################### def get_job_paths(kwds): """Parse keyword arguments to get important paths. Parameters ---------- kwds : dict Keyword arguments accepted by fyrd.job.Job Returns ------- kwds : dict A modified version of the input kwds dictionary with path requests removed runpath : sctr The path where the job will execute outpath : sctr The path where job outputs will go scriptpath : sctr The path where job script files will be stored """ runpath = _os.path.abspath(kwds['dir'] if 'dir' in kwds else '.') kwds['dir'] = runpath # Set the output path cpath = get_option('jobs', 'outpath') if 'outpath' in kwds: outpath = kwds.pop('outpath') elif cpath: outpath = cpath else: outpath = runpath outpath = _os.path.abspath(outpath) # Set the script path cpath = get_option('jobs', 'scriptpath') if 'scriptpath' in kwds: scriptpath = kwds.pop('scriptpath') elif cpath: scriptpath = cpath else: scriptpath = outpath scriptpath = _os.path.abspath(scriptpath) return kwds, runpath, outpath, scriptpath ############################################################################### # Profiles # ###############################################################################
[docs]class Profile(object): """A job submission profile. Just a thin wrapper around a dict. Attributes ---------- name : str kwds : dict Methods ------- write : Write self to config file """ def __init__(self, name, kwds): """Set up bare minimum attributes. Parameters ---------- name : str Name of the profile kwds : dict Dictionary of keyword arguments (will be validated). """ name = 'DEFAULT' if name.lower() == 'default' else name self.name = name self.args = kwds
[docs] def write(self): """Write self to config file.""" set_profile(self.name, self.args)
def __getattr__(self, key): """Access dict items as attributes.""" if key in self.args: return self.args[key] def __setattr__(self, key, value): """Force checking of keywords.""" if key == 'name': object.__setattr__(self, key, value) elif key == 'args': if not isinstance(value, dict): raise Exception('Keyword arguments must be a dict') value = _opt.check_arguments(value) object.__setattr__(self, key, value) else: opt, arg = list(_opt.check_arguments({key: value}).items())[0] self.args[opt] = arg def __len__(self): """Return arg count.""" return len(self.args) def __repr__(self): """Display useful info.""" return "{}<{}>".format(self.name, self.args) def __str__(self): """Pretty print.""" title = 'DEFAULT' if self.name.lower() == 'default' else self.name return "{}:\n\t{}".format( title, '\n\t'.join(['{:<11}{:}'.format(i, j) \ for i, j in self.args.items()]) )
[docs]def get_profile(profile=None, allow_none=True): """Return a profile if it exists, if None, return all profiles. Will return None if profile is supplied but does not exist. Parameters ---------- profile : str The name of a profile to search for. allow_none : bool If True, return None if no profile matches, otherwise raise a ValueError. Returns ------- fyrd.conf.Profile The requested profile. """ load_profiles() # Allow lowercase default profile if profile and profile.lower() == 'default': profile = 'DEFAULT' if profile in _sections(profiles): return Profile(profile, _section_to_dict(profiles.items(profile))) else: if profile.lower() == 'default': _logme.log('default profile missing, recreating. You can ' 'override the defaults by editing {}' .format(CONFIG_FILE), 'warn') prof = Profile('default', DEFAULT_PROFILES['default']) prof.write() return prof if allow_none: return None else: raise ValueError('Requested profile ({}) does not exist' .format(profile))
def get_profiles(profs=None, allow_none=True): """Return a dictionary of profiles from profiles. Returns all profiles if profiles argument is None. Parameters ---------- profs : list A list of profiles to get. allow_none : bool If True, return None if no profile matches, otherwise raise a ValueError. Returns: dict: A ditionary of fyrd.conf.Profile objects """ if profs: profs = _run.listify(profs) pfls = {} for profile in profs: pfls[profile] = get_profile(profile, allow_none) else: if not allow_none: raise ValueError('Profile required') pfls = { 'DEFAULT': Profile( 'DEFAULT', _section_to_dict(profiles.items('DEFAULT')) ) } for section in _sections(profiles): pfls[section] = Profile( section, _section_to_dict(profiles.items(section)) ) return pfls
[docs]def set_profile(name, kwds, update=True): """Write profile to config file. Parameters ---------- name : str The name of the profile to add/edit. kwds : dict Keyword arguments to add to the profile. update : bool Update the profile rather than overwriting it. """ load_profiles() name = 'DEFAULT' if name.lower() == 'default' else name if not isinstance(kwds, dict): raise Exception('Profile arguments must be a dictionary') kwds = _opt.check_arguments(kwds) if name in _sections(profiles): if not update: _logme.log('Profile {} already exists, overwriting'.format(name), 'debug') profiles.remove_section(name) profiles.add_section(name) else: profiles.add_section(name) _config_from_dict(profiles, kwds, name) write_profiles() return get_profile(name)
def del_profile(name): """Delete a profile. Parameters ---------- name : str The name of the profile to delete. """ load_profiles() if name.lower() == 'default': for key in _section_to_dict(profiles.items('DEFAULT')): if key not in DEFAULT_PROFILES['DEFAULT']: profiles.remove_option('DEFAULT', key) else: profiles.set('DEFAULT', key, str(DEFAULT_PROFILES['DEFAULT'][key])) elif name in _sections(profiles): _logme.log('Removing profile {}'.format(name)) profiles.remove_section(name) else: _logme.log('Profile {} does not exist, cannot delete'.format(name), 'warn') write_profiles() def create_profiles(profs=None): """Create an initial profiles file. Gets all information from the file-wide DEFAULT_PROFILES constant and overwrites specific keys using the values in cnf. This means that any records in the cnf dict that are not present in DEFAULT_PROFILES will be ignored, and any records that are absent will be populated from DEFAULT_PROFILES. Parameters ---------- profs : dict A dictionary of config defaults. """ global profiles profiles = _configparser.ConfigParser( defaults=DEFAULT_PROFILES['DEFAULT'], allow_no_value=True, ) if _os.path.exists(_run.exp_file(config.get('jobs', 'profile_file'))): _os.remove(_run.exp_file(config.get('jobs', 'profile_file'))) init_conf = {} if not profs or not isinstance(profs, dict): profs = {} for section, items in DEFAULT_PROFILES.items(): init_conf[section] = {} if section not in profs: profs[section] = {} for key, value in items.items(): val = profs[section][key] if key in profs[section] else value init_conf[section][key] = str(val) _config_from_dict(profiles, init_conf) with open(_run.exp_file(config.get('jobs', 'profile_file')), 'w') as fout: profiles.write(fout) def load_profiles(): """Load the profiles, create them if they don't exist. Returns ------- ConfigParser profiles """ if not _os.path.isfile(_run.exp_file(config.get('jobs', 'profile_file'))): create_profiles() profiles.read(_run.exp_file(config.get('jobs', 'profile_file'))) # Recreate DEFAULT if necessary. def_prof = _config_to_dict(profiles)['DEFAULT'] changed = False for key, val in DEFAULT_PROFILES['DEFAULT'].items(): if key not in def_prof: profiles.set('DEFAULT', key, val) changed = True if changed: write_profiles() return profiles def write_profiles(): """Write the profiles to the file. Returns ------- ConfigParser profiles """ with open(_run.exp_file(config.get('jobs', 'profile_file')), 'w') as fout: profiles.write(fout) return load_profiles() ############################################################################### # Helper Functions # ############################################################################### def _sections(cnf, inc_anyway=False): """Include default in sections if it has items. Parameters ---------- cnf : ConfigParser Any ConfigParser object inc_anyway : bool Include the DEFAULT section even if empty Returns ------- list A list of sections in cnf, including DEFAULT if defined. """ merged_sections = cnf.sections() + ['DEFAULT'] return merged_sections if cnf.items('DEFAULT') or inc_anyway \ else cnf.sections() def _config_from_dict(cnf, dct, section=None): """Python 2 ConfigParsers cannot handle dictionaries, so we do it here. Parameters ---------- cnf : ConfigParser A ConfigParser object. dct : dict A dictionary of {'key': 'value'} if section or {'section': {'key': 'value'}} section : str An optional section name to update Returns ------- ConfigParser The original ConfigParser object, updated. """ assert isinstance(cnf, _configparser.ConfigParser) assert isinstance(dct, dict) if section: for v in dct.values(): assert not isinstance(v, (tuple, list, dict, set)) else: for v in dct.values(): assert isinstance(v, dict) if section: if not section in cnf.sections() + ['DEFAULT']: cnf.add_section(section) for k, v in _dict_to_strings(dct).items(): cnf.set(section, k, v) else: for sect, vals in dct.items(): if not sect in cnf.sections() + ['DEFAULT']: cnf.add_section(sect) for k, v in _dict_to_strings(vals).items(): cnf.set(sect, k, v) return cnf def _dict_to_strings(kwds): """Convert all values in a dictionary to strings.""" ot = {} assert isinstance(kwds, dict) for k, v in kwds.items(): ot[k] = str(v) return ot def _section_to_dict(section): """Convert a ConfigParser list of tuples to a dictionary. Parameters ---------- section : list Output of ConfigParser.items(section) Returns ------- dict A dictionary of the above with typed values """ dct = dict(section) out = {} for key, val in dct.items(): out[key] = _typecast_items(val) return out def _typecast_items(x): """Try to convert a variable into the true type. Avoids eval() as we will be used on config files. For example: 'True' becomes True and '0.125' becomes 0.125 """ c = re.compile(r', *') r = re.compile(r': *') if not isinstance(x, (six.string_types, six.text_type)): return x if x == 'True': return True if x == 'False': return False if x == 'None': return None if x.isdigit(): return int(x) try: return float(x) except (ValueError, TypeError): pass try: return complex(x) except (ValueError, TypeError): pass try: if x.startswith('[') and x.endswith(']'): return [_typecast_items(i) for i in c.split(x.strip('[]'))] if x.startswith('{') and x.endswith('}'): if ':' in x: return { _typecast_items(k): _typecast_items(v) for k, v in [ r.split(i) for i in c.split(x.strip('{}')) ] } return {_typecast_items(i) for i in c.split(x.strip('{}'))} except (ValueError, TypeError): pass if isinstance(x, (six.string_types)): return x.strip('\'"') return x def _typecast_items(x): """Try to convert a variable into the true type. Avoids eval() as we will be used on config files. For example: 'True' becomes True and '0.125' becomes 0.125 """ c = re.compile(r', *') r = re.compile(r': *') if not isinstance(x, (six.string_types, six.text_type)): return x if x == 'True': return True if x == 'False': return False if x == 'None': return None if x.isdigit(): return int(x) try: return float(x) except (ValueError, TypeError): pass try: return complex(x) except (ValueError, TypeError): pass try: if x.startswith('[') and x.endswith(']'): return [_typecast_items(i) for i in c.split(x.strip('[]'))] if x.startswith('{') and x.endswith('}'): if ':' in x: return { _typecast_items(k): _typecast_items(v) for k, v in [ r.split(i) for i in c.split(x.strip('{}')) ] } return {_typecast_items(i) for i in c.split(x.strip('{}'))} except (ValueError, TypeError): pass if isinstance(x, (six.string_types)): return x.strip('\'"') return x def _config_to_dict(cnf): """Return a dictionary of all items from a ConfigParser object.""" out = {} def_items = cnf.items('DEFAULT') if def_items: out['DEFAULT'] = _section_to_dict(def_items) for sect in _sections(cnf): out[sect] = _section_to_dict(cnf.items(sect)) return out ############################################################################### # Completion # ############################################################################### class _TabCompleter(object): """ A tab completer that can either complete from the filesystem or from a list. Taken from `https://gist.github.com/iamatypeofwalrus/5637895`_ """ list_completer = None def createListCompleter(self,ll): """ This is a closure that creates a method that autocompletes from the given list. Since the autocomplete function can't be given a list to complete from a closure is used to create the list_completer function with a list to complete from. """ def list_completer(_, state): """Make a completer from a list.""" line = _rl.get_line_buffer() if not line: return [c + " " for c in ll][state] else: return [c + " " for c in ll if c.startswith(line)][state] self.list_completer = list_completer def _path_completer(_, state): """ This is the tab completer for systems paths. Only tested on *nix systems """ line = _rl.get_line_buffer() if not line: return _complete_path('.')[state] else: return _complete_path(line)[state] def _listdir(root): "List directory 'root' appending the path separator to subdirs." res = [] for name in _os.listdir(root): path = _os.path.join(root, name) if _os.path.isdir(path): name += _os.sep res.append(name) return res def _complete_path(path=None): "Perform completion of filesystem path." if not path: return _os.listdir('.') path = _os.path.expanduser(path) dirname, rest = _os.path.split(path) tmp = dirname if dirname else '.' res = [_os.path.join(dirname, p) for p in _os.listdir(tmp) if p.startswith(rest)] # more than one match, or single match which does not exist (typo) if len(res) > 1 or not _os.path.exists(path): return res # resolved to a single directory, so return list of files below it if _os.path.isdir(path): return [_os.path.join(path, p) for p in _os.listdir(path)] # exact file match terminates this completion return [path + ' '] def _get_set_path(message): """Get a path from the user, make the directory if necessary.""" t = _TabCompleter() while True: _rl.set_completer(_path_completer) file_path = _run.get_input(message + ' ') file_path = file_path.strip().lower() if file_path: file_path = _os.path.abspath(file_path) if _os.path.isdir(file_path): break else: t.createListCompleter(['y', 'n']) _rl.set_completer(t.list_completer) ans = _run.get_input( 'That directory does not exist, would you like to ' + 'try to create it? [y/N] ') if ans.lower() == 'y': try: _os.makedirs(file_path) break except OSError: print("Failed to make {}".format(file_path), "Please make it manually or choose another", "directory\n") continue else: continue return file_path ############################################################################### # Set Constants from config, initialize config on import # ############################################################################### # Create directory if it doesn't exist if not _os.path.isdir(CONFIG_PATH): if _os.path.exists(CONFIG_PATH): _os.remove(CONFIG_PATH) _os.makedirs(CONFIG_PATH) if not _os.path.isfile(CONFIG_FILE): create_config() # Load config config = load_config() # Load profiles profiles = load_profiles()