# -*- coding: utf-8 -*-
"""
Get and set config file options.
The functions defined here provide an easy way to access the config file
defined by CONFIG_FILE (default ~/.fyrd/config.txt) and the config.get('jobs',
'profile_file') (default ~/.fyrd/profile.txt).
Both files are managed by Python's ConfigParser class.
To work with profiles, use the `get_profile()` and `set_profile()` functions.
Note that all options must be allowed in the `options.py` file before they can
be added to a profile.
Options will also be pre-sanitized before being added to profile. e.g. 'mem':
'2GB' will become 'mem': 2000.
Functions
---------
get_option: Get a single key or section.
set_option: Set a single key or section.
set_option: Set a single key or section.
delete: Delete a config item.
create_config: Create an initial config from a dictionary and the defaults
create_config_interactive: Interact with the user to create a new config
Examples
--------
>>> import fyrd
>>> i = fyrd.conf.get_option('queue', 'sleep_len')
>>> c = fyrd.conf.set_option('queue', 'sleep_len', 0.125)
>>> fyrd.conf.get_option('queue', 'sleep_len')
0.125
"""
from __future__ import print_function
import re
import os as _os
import readline as _rl
from textwrap import dedent as _dnt
try:
import configparser as _configparser
except ImportError:
import ConfigParser as _configparser
import six
from . import run as _run
from . import logme as _logme
from . import batch_systems as _batch
_opt = _batch.options
###############################################################################
# Configurable Defaults #
###############################################################################
"""
Where configuration files will be kept
"""
CONFIG_PATH = _os.path.join(_os.environ['HOME'], '.fyrd')
"""
Where the main config will be kept.
"""
CONFIG_FILE = _os.path.join(CONFIG_PATH, 'config.txt')
# Set default options
DEFAULTS = {
'queue': {
'max_jobs': 1000,
'sleep_len': 1,
'queue_update': 2,
'res_time': 2700, # Time to wait if job is postponed before fail
'queue_type': 'auto',
'sbatch': None, # Path to sbatch command
'qsub': None, # Path to qsub command
'progressbar': True,
'queue_maximums': _os.path.join(
CONFIG_PATH, 'queue_maximums.json'
),
},
'jobs': {
'clean_files': True,
'clean_outputs': False,
'file_block_time': 30,
'scriptpath': None,
'outpath': None,
'suffix': 'cluster',
'auto_submit': True,
'generic_python': False,
'profile_file': _os.path.join(
CONFIG_PATH, 'profiles.txt'
)
},
'notify': {
'mode': 'linux', # Or SMTP
'wait_time': 240, # time in seconds
'notify_address': None,
'smtp_host': 'smtp.gmail.com',
'smtp_port': 587,
'smtp_tls': True,
'smtp_from': None,
'smtp_user': None,
'smtp_passfile': None
},
'local': {
'server_uri': None,
'max_jobs': None,
'local_clean_days': 7,
}
}
CONF_HELP = {
'summary': _dnt(
"""
The following options and sections are recognized and defined by the
DEFAULTS dictionary in the config.py file. The can be updated in the
config file.
Any options added to the config file not present here will be ignored
or deleted, any options added which do not match the types below will
be overwritten with the defaults.
"""
),
'queue': _dnt(
"""
[queue]
Define options for queue handling
Options
-------
max_jobs : int
sets the maximum number of running jobs before submission will
pause and wait for the queue to empty
sleep_len : int
sets the amount of time the program will wait between submission
attempts
queue_update : int
sets the amount of time between refreshes of the queue.
res_time : int
Time in seconds to wait if a job is in an uncertain state, usually
preempted or suspended. These jobs often resolve into running or
completed again after some time so it makes sense to wait a bit,
but not forever. The default is 45 minutes: 2700 seconds.
queue_type : str
the type of queue to use, one of the batch systems (e.g. 'slurm')
or 'auto'. Default is auto to auto-detect the queue.
sbatch : str
A path to the sbatch executable, only required for slurm mode if
sbatch is not in the PATH.
qsub : str
A path to the qsub executable, only required for torque mode if
sbatch is not in the PATH.
progressbar : bool
Show a progress bar when waiting for jobs
queue_maximums : str
Path to a JSON file that sets the queue maximums, the file must
be in the format {'<queue>': {'max_<keyword>': val}}, where
<queue> is a batch system partition/queue and <keyword> is a valid
keyword argument (e.g. cores, mem, time, etc)
"""
),
'jobs': _dnt(
"""
[jobs]
Set the options for managing job submission and getting
Options
-------
clean_files : bool
means that by default files will be deleted when job completes
clean_outputs : bool
is the same but for output files (they are saved first)
file_block_time : int
Max amount of time to block after job completes in the queue while
waiting for output files to appear. Some queues can take a long
time to copy files under load, so it is worth setting this high, it
won't block unless the files do not appear.
scriptpath : str
Path to write all script files by default, must be globally cluster
accessible. Note: this is *not* the runtime path, just where files
are written to.
outpath : str
Path to write all output files to by by default, must be globally
cluster accessible.
suffix : str
The suffix to use when writing scripts and output files
auto_submit : bool
If wait() or get() are called prior to submission, auto-submit the
job. Otherwise throws an error and returns None
generic_python : bool
Use /usr/bin/env python instead of the current executable, not
advised, but sometimes necessary.
profile_file : str
the config file where profiles are defined.
"""
),
'notify': _dnt(
"""
[notify]
User notification option, used mostly to email completion info.
Options
-------
mode : 'linux', 'smtp', or None
If mode is linux, all other entries optional and `mail` is used.
If mode is smtp, all other entries required, `smtp_passfile` must
point to a read only file accessible only to the user that
contains the password for smtp.
wait_time : int
Time to wait before auto-email, in seconds.
notify_address : str
The address to send an email to. Can also be specified at runtime
smtp_host : str
e.g. smtp.gmail.com
smtp_port : int
e.g. 587
smtp_tls : bool
Use TLS?
smtp_from : str
Server email address, e.g. a dedicated gmail adrress
smtp_user : str, optional
If not provided, smtp_from used
smtp_passfile : str
Path to a plain text, read-only, user accessible password file with
the SMTP password. Perissions should be 400.
"""
),
'local': _dnt(
"""
[local]
Set options just for the local queue batch system.
Options
-------
server_uri : str, optional
Hardcode the server_uri for the local queue server, this allows
you to set the server as a remote machine if you want, but you
must have the remote drives mounted in that case, these mounted
drives need only be in the 'scriptpath' and 'outpath' directories.
max_jobs : int, optional
Set the maximum locally running jobs, defaults to all available
cores minus 1.
local_clean_days : int, optional
The number of days to keep jobs in the local queue. Any jobs older
than this will be purged from the database
"""
)
}
# Pre-defined profiles, 'DEFAULT' is required.
DEFAULT_PROFILES = {
'DEFAULT': {'nodes': 1,
'cores': 1,
'mem': 4000,
'time': '04:00:00'},
'large': {'nodes': 1,
'cores': 16,
'mem': 32000,
'time': '24:00:00'},
'small': {'nodes': 1,
'cores': 1,
'mem': 1000,
'time': '01:00:00'},
'long': {'nodes': 1,
'cores': 1,
'mem': 4000,
'time': '96:00:00'},
'small_clean': {'nodes': 1,
'cores': 1,
'mem': 1000,
'time': '01:00:00',
'clean_files': True,
'clean_outputs': True},
}
"""
This defines the default profiles that will be stored in the profile file.
It is intended mostly as an example and should be edited in the profile file
to customize the profiles for each cluster.
The only required profile is 'DEFAULT', it must be set and is the fallback
profile if no profile is requested on job creation.
"""
###############################################################################
# Do Not Edit Below This Point #
###############################################################################
# Create the global config objects
config = _configparser.ConfigParser(allow_no_value=True)
"""
This is the globally accessible ConfigParser object for the config.txt file.
"""
profiles = _configparser.ConfigParser(defaults=DEFAULT_PROFILES['DEFAULT'],
allow_no_value=True)
"""
This is the globally accessible ConfigParser object for handling profiles.
"""
__all__ = ['set_option', 'get_option', 'delete', 'create_config',
'create_config_interactive', 'set_profile',
'get_profile', 'del_profile', 'create_profiles']
###############################################################################
# Config Manipulation Functions #
###############################################################################
def get_executable(section, key, default):
"""Get an executable (e.g. qsub) from the conf if it exists.
Parameters
----------
section : str
The config section to use (e.g. queue), if None, all sections returned.
key : str
The config key to get (e.g. 'max_jobs'), if None, whole section
returned.
default
If the key does not exist, create it with this default value.
Returns
-------
str or None
Path to executable if found, else None
"""
exe = get_option(section, key, default)
if not _os.path.dirname(exe):
exe = _run.which(exe)
if _run.is_exe(exe):
return exe
return None
[docs]def get_option(section=None, key=None, default=None):
"""Get a single key or section.
All args are optional, if they are missing, the parent section or entire
config will be returned.
Parameters
----------
section : str
The config section to use (e.g. queue), if None, all sections returned.
key : str
The config key to get (e.g. 'max_jobs'), if None, whole section
returned.
default
If the key does not exist, create it with this default value.
Returns
-------
option_value
Option value if key exists, None if no key exists.
See Also
--------
set_option : Set an option
get_config : Get the entire config
"""
load_config()
if not section:
out = _config_to_dict(config)
elif section and section not in _sections(config):
_logme.log('{} not in the config file'.format(section), 'error')
if section in DEFAULTS:
_logme.log('CreAting {} from DEFAULTS'.format(section), 'info')
config.add_section(section)
_config_from_dict(config, DEFAULTS[section], section=section)
write_config()
out = get_option(section, key, default)
else:
raise ValueError('Section not in the config file or DEFAULTS')
elif key:
sect = _section_to_dict(config.items(section))
if key in sect:
out = sect[key]
else:
_logme.log('{} not in the {} section of the config file'
.format(key, section), 'warn')
if default:
_logme.log('Creating new config entry {}:{} with val {}'
.format(section, key, default))
set_option(section, key, default)
out = get_option(section, key)
elif key in DEFAULTS[section]:
_logme.log('Creating new config entry {}:{} with val {}'
.format(section, key, DEFAULTS[section][key]))
set_option(section, key, DEFAULTS[section][key])
out = get_option(section, key)
else:
raise ValueError('Option not in the config file')
elif section:
_logme.log('Getting the whole section: {}'.format(section), 'debug')
out = _section_to_dict(config.items(section))
else:
_logme.log('No option specified, returning config dictionary', 'debug')
out = _config_to_dict(config)
return _typecast_items(out)
[docs]def set_option(section, key, value):
"""Write a config key to the config file.
Parameters
----------
section : str
Section of the config file to use.
key : str
Key to add.
value
Value to add for key.
Returns
-------
ConfigParser
"""
# Sanitize arguments
section = str(section)
key = str(key)
value = str(value)
# Edit the globals in this file
load_config()
if not config.has_section(section):
_logme.log('The {} section is not in the config file and cannot be'
.format(section) + 'and cannot be created', 'warn')
return None
config.set(section, key, value)
write_config()
return config
[docs]def delete(section, key):
"""Delete a config item.
Parameters
----------
section : str
Section of config file.
key : str
Key to delete
Returns
-------
ConfigParger
"""
load_config()
config.remove_option(section, key)
write_config()
return config
[docs]def load_config():
"""Load config from the config file.
If any section or key from DEFAULTS is not present in the config, it is
added back, enforcing a minimal configuration.
Returns
-------
ConfigParser
"""
if _os.path.isfile(CONFIG_FILE):
config.read(CONFIG_FILE)
else:
create_config()
for section in DEFAULTS:
if section not in _sections(config):
config.add_section(section)
_config_from_dict(config, DEFAULTS[section], section)
write_config()
for key, val in DEFAULTS[section].items():
if key not in dict(config.items(section)):
config.set(section, key, str(val))
write_config()
return config
def get_config():
"""Return a dictionary representation of the entire config."""
return _config_to_dict(load_config())
[docs]def write_config():
"""Write the current config to CONFIG_FILE."""
with open(CONFIG_FILE, 'w') as fout:
config.write(fout)
###############################################################################
# Initialization Functions #
###############################################################################
[docs]def create_config_interactive(prompt=True):
"""Interact with the user to create a new config.
Uses readline autocompletion to make setup easier.
Parameters
----------
prompt : bool
As for confirmation before beginning wizard.
"""
# Use tab completion
t = _TabCompleter()
_rl.set_completer_delims('\t')
_rl.parse_and_bind("tab: complete")
# Get permission
if prompt:
t.createListCompleter(['y', 'n'])
_rl.set_completer(t.list_completer)
print("Do you want to initialize your config at {}"
.format(CONFIG_FILE))
print("This will erase your current configuration (if it exists)")
choice = _run.get_input("Initialize config? [y/N] ").strip().lower()
if not choice == 'y':
return
cnf = DEFAULTS
# Get path
_rl.set_completer(_path_completer)
# Not implemented yet
# print("\nThis module uses a database to store job information.",
# "This database should remain relatively small, but can get quite",
# "large if many jobs are submitted at once.\n"
# "It only needs to be accessible from the submit host, but should",
# "be somewhere with sufficient disk space (>500MB free).")
# print("Where would you like to put the db file?\n")
# file_path = _os.path.expanduser(
# _run.get_input('PATH: [{}] '.format(config.get('queue', 'db')))
# ).strip(' ').lower()
# file_path = file_path if file_path else cnf['queue']['db']
# cnf['queue']['db'] = _os.path.expanduser(file_path)
print("We store job profile information in a small config file.")
file_path = _os.path.expanduser(
_run.get_input('Where would you like that file to go? [{}]'
.format(
_run.exp_file(config.get('jobs', 'profile_file'))
))
).strip(' ').lower()
file_path = file_path if file_path else cnf['jobs']['profile_file']
cnf['jobs']['profile_file'] = _run.exp_file(file_path)
# Temp file directory
print("\nIt is possible to write all temporary and output files to a single",
"temp file directory, regardless of where the job is run from.\n" +
"As this library allows you to retrieve output files from the class",
"directly, this is a good way of keeping your work directory tidy.",
"If you do not choose to have files auto-deleted though, you will",
"need to keep the directory tidy yourself.\n"
"The value of this option is that if you run from a machine where",
"some places are not cluster-accessible, jobs will run anyway if",
"just one directory is accessible to the cluster.\n"
"This option can always be overridden at runtime with the filepath",
"argument.\nIf you leave the below option blank, the default will",
"be to use the runtime path, which is usually the current working",
"directory.\n")
t.createListCompleter(['y', 'n'])
_rl.set_completer(t.list_completer)
if _run.get_yesno('Would you like to set a script file path?', 'y'):
file_path = _get_set_path('Where would you like to put that file?')
cnf['jobs']['scriptpath'] = file_path
if _run.get_yesno('Would you like to set an output file path?', 'n'):
file_path = _get_set_path('Where would you like to put that file?')
cnf['jobs']['outpath'] = file_path
# Cleaning
t.createListCompleter(['y', 'n'])
_rl.set_completer(t.list_completer)
print('\nWe can automatically delete script and/or output files after',
'results have been retrieved.\n'
'This option can be overridden at run time on a per-job basis.\n'
'Do you want to autoclean:\n')
clean_files = _run.get_yesno('Autoclean script files?', 'y')
cnf['jobs']['clean_files'] = clean_files
clean_outs = _run.get_yesno('Autoclean output files (e.g. .out and .err)?',
'n')
cnf['jobs']['clean_outputs'] = clean_outs
# Wait times
t.createListCompleter(cnf['queue'].values())
_rl.set_completer(t.list_completer)
max_len = _run.get_input("\nWhat is the maximum number of jobs allowed " +
"in your queue? [{}] "
.format(cnf['queue']['max_jobs']))
max_len = max_len if max_len else cnf['queue']['max_jobs']
cnf['queue']['max_jobs'] = int(max_len)
# Options
print('\nIs there a default queue you wish to submit to if no other',
'options are given?\nIf so enter the name below, or leave blank',
'to ignore.\n')
def_queue = _run.get_input('Default queue: ').strip()
print('\nThank you, configuring options.\n'
'Please edit the file directly to inspect or edit your config.\n')
create_config(cnf, def_queue)
[docs]def create_config(cnf=None, def_queue=None):
"""Create an initial config file.
Gets all information from the file-wide DEFAULTS constant and overwrites
specific keys using the values in cnf.
This means that any records in the cnf dict that are not present in
DEFAULTS will be ignored, and any records that are absent will be
populated from DEFAULTS.
Parameters
----------
cnf : dict
A dictionary of config defaults.
def_queue : str
A name for a queue to add to the default profile.
"""
global config
config = _configparser.ConfigParser(allow_no_value=True)
if _os.path.exists(CONFIG_FILE):
_os.remove(CONFIG_FILE)
init_conf = {}
if not cnf or not isinstance(cnf, dict):
cnf = {}
for section, items in DEFAULTS.items():
init_conf[section] = {}
if not section in cnf:
cnf[section] = {}
for key, value in items.items():
val = cnf[section][key] if key in cnf[section] else value
init_conf[section][key] = str(val)
_config_from_dict(config, init_conf)
with open(CONFIG_FILE, 'w') as fout:
config.write(fout)
if def_queue:
def_prof = get_profile('DEFAULT')
def_prof.args.update({'partition': def_queue})
def_prof.write()
###############################################################################
# Config Helper Fuctions #
###############################################################################
def get_job_paths(kwds):
"""Parse keyword arguments to get important paths.
Parameters
----------
kwds : dict
Keyword arguments accepted by fyrd.job.Job
Returns
-------
kwds : dict
A modified version of the input kwds dictionary with path requests
removed
runpath : sctr
The path where the job will execute
outpath : sctr
The path where job outputs will go
scriptpath : sctr
The path where job script files will be stored
"""
runpath = _os.path.abspath(kwds['dir'] if 'dir' in kwds else '.')
kwds['dir'] = runpath
# Set the output path
cpath = get_option('jobs', 'outpath')
if 'outpath' in kwds:
outpath = kwds.pop('outpath')
elif cpath:
outpath = cpath
else:
outpath = runpath
outpath = _os.path.abspath(outpath)
# Set the script path
cpath = get_option('jobs', 'scriptpath')
if 'scriptpath' in kwds:
scriptpath = kwds.pop('scriptpath')
elif cpath:
scriptpath = cpath
else:
scriptpath = outpath
scriptpath = _os.path.abspath(scriptpath)
return kwds, runpath, outpath, scriptpath
###############################################################################
# Profiles #
###############################################################################
[docs]class Profile(object):
"""A job submission profile. Just a thin wrapper around a dict.
Attributes
----------
name : str
kwds : dict
Methods
-------
write : Write self to config file
"""
def __init__(self, name, kwds):
"""Set up bare minimum attributes.
Parameters
----------
name : str
Name of the profile
kwds : dict
Dictionary of keyword arguments (will be validated).
"""
name = 'DEFAULT' if name.lower() == 'default' else name
self.name = name
self.args = kwds
[docs] def write(self):
"""Write self to config file."""
set_profile(self.name, self.args)
def __getattr__(self, key):
"""Access dict items as attributes."""
if key in self.args:
return self.args[key]
def __setattr__(self, key, value):
"""Force checking of keywords."""
if key == 'name':
object.__setattr__(self, key, value)
elif key == 'args':
if not isinstance(value, dict):
raise Exception('Keyword arguments must be a dict')
value = _opt.check_arguments(value)
object.__setattr__(self, key, value)
else:
opt, arg = list(_opt.check_arguments({key: value}).items())[0]
self.args[opt] = arg
def __len__(self):
"""Return arg count."""
return len(self.args)
def __repr__(self):
"""Display useful info."""
return "{}<{}>".format(self.name, self.args)
def __str__(self):
"""Pretty print."""
title = 'DEFAULT' if self.name.lower() == 'default' else self.name
return "{}:\n\t{}".format(
title,
'\n\t'.join(['{:<11}{:}'.format(i, j) \
for i, j in self.args.items()])
)
[docs]def get_profile(profile=None, allow_none=True):
"""Return a profile if it exists, if None, return all profiles.
Will return None if profile is supplied but does not exist.
Parameters
----------
profile : str
The name of a profile to search for.
allow_none : bool
If True, return None if no profile matches, otherwise raise a
ValueError.
Returns
-------
fyrd.conf.Profile
The requested profile.
"""
load_profiles()
# Allow lowercase default profile
if profile and profile.lower() == 'default':
profile = 'DEFAULT'
if profile in _sections(profiles):
return Profile(profile, _section_to_dict(profiles.items(profile)))
else:
if profile.lower() == 'default':
_logme.log('default profile missing, recreating. You can '
'override the defaults by editing {}'
.format(CONFIG_FILE), 'warn')
prof = Profile('default', DEFAULT_PROFILES['default'])
prof.write()
return prof
if allow_none:
return None
else:
raise ValueError('Requested profile ({}) does not exist'
.format(profile))
def get_profiles(profs=None, allow_none=True):
"""Return a dictionary of profiles from profiles.
Returns all profiles if profiles argument is None.
Parameters
----------
profs : list
A list of profiles to get.
allow_none : bool
If True, return None if no profile matches, otherwise raise a
ValueError.
Returns:
dict:
A ditionary of fyrd.conf.Profile objects
"""
if profs:
profs = _run.listify(profs)
pfls = {}
for profile in profs:
pfls[profile] = get_profile(profile, allow_none)
else:
if not allow_none:
raise ValueError('Profile required')
pfls = {
'DEFAULT': Profile(
'DEFAULT', _section_to_dict(profiles.items('DEFAULT'))
)
}
for section in _sections(profiles):
pfls[section] = Profile(
section, _section_to_dict(profiles.items(section))
)
return pfls
[docs]def set_profile(name, kwds, update=True):
"""Write profile to config file.
Parameters
----------
name : str
The name of the profile to add/edit.
kwds : dict
Keyword arguments to add to the profile.
update : bool
Update the profile rather than overwriting it.
"""
load_profiles()
name = 'DEFAULT' if name.lower() == 'default' else name
if not isinstance(kwds, dict):
raise Exception('Profile arguments must be a dictionary')
kwds = _opt.check_arguments(kwds)
if name in _sections(profiles):
if not update:
_logme.log('Profile {} already exists, overwriting'.format(name),
'debug')
profiles.remove_section(name)
profiles.add_section(name)
else:
profiles.add_section(name)
_config_from_dict(profiles, kwds, name)
write_profiles()
return get_profile(name)
def del_profile(name):
"""Delete a profile.
Parameters
----------
name : str
The name of the profile to delete.
"""
load_profiles()
if name.lower() == 'default':
for key in _section_to_dict(profiles.items('DEFAULT')):
if key not in DEFAULT_PROFILES['DEFAULT']:
profiles.remove_option('DEFAULT', key)
else:
profiles.set('DEFAULT', key,
str(DEFAULT_PROFILES['DEFAULT'][key]))
elif name in _sections(profiles):
_logme.log('Removing profile {}'.format(name))
profiles.remove_section(name)
else:
_logme.log('Profile {} does not exist, cannot delete'.format(name),
'warn')
write_profiles()
def create_profiles(profs=None):
"""Create an initial profiles file.
Gets all information from the file-wide DEFAULT_PROFILES constant and
overwrites specific keys using the values in cnf.
This means that any records in the cnf dict that are not present in
DEFAULT_PROFILES will be ignored, and any records that are absent will be
populated from DEFAULT_PROFILES.
Parameters
----------
profs : dict
A dictionary of config defaults.
"""
global profiles
profiles = _configparser.ConfigParser(
defaults=DEFAULT_PROFILES['DEFAULT'],
allow_no_value=True,
)
if _os.path.exists(_run.exp_file(config.get('jobs', 'profile_file'))):
_os.remove(_run.exp_file(config.get('jobs', 'profile_file')))
init_conf = {}
if not profs or not isinstance(profs, dict):
profs = {}
for section, items in DEFAULT_PROFILES.items():
init_conf[section] = {}
if section not in profs:
profs[section] = {}
for key, value in items.items():
val = profs[section][key] if key in profs[section] else value
init_conf[section][key] = str(val)
_config_from_dict(profiles, init_conf)
with open(_run.exp_file(config.get('jobs', 'profile_file')), 'w') as fout:
profiles.write(fout)
def load_profiles():
"""Load the profiles, create them if they don't exist.
Returns
-------
ConfigParser
profiles
"""
if not _os.path.isfile(_run.exp_file(config.get('jobs', 'profile_file'))):
create_profiles()
profiles.read(_run.exp_file(config.get('jobs', 'profile_file')))
# Recreate DEFAULT if necessary.
def_prof = _config_to_dict(profiles)['DEFAULT']
changed = False
for key, val in DEFAULT_PROFILES['DEFAULT'].items():
if key not in def_prof:
profiles.set('DEFAULT', key, val)
changed = True
if changed:
write_profiles()
return profiles
def write_profiles():
"""Write the profiles to the file.
Returns
-------
ConfigParser
profiles
"""
with open(_run.exp_file(config.get('jobs', 'profile_file')), 'w') as fout:
profiles.write(fout)
return load_profiles()
###############################################################################
# Helper Functions #
###############################################################################
def _sections(cnf, inc_anyway=False):
"""Include default in sections if it has items.
Parameters
----------
cnf : ConfigParser
Any ConfigParser object
inc_anyway : bool
Include the DEFAULT section even if empty
Returns
-------
list
A list of sections in cnf, including DEFAULT if defined.
"""
merged_sections = cnf.sections() + ['DEFAULT']
return merged_sections if cnf.items('DEFAULT') or inc_anyway \
else cnf.sections()
def _config_from_dict(cnf, dct, section=None):
"""Python 2 ConfigParsers cannot handle dictionaries, so we do it here.
Parameters
----------
cnf : ConfigParser
A ConfigParser object.
dct : dict
A dictionary of {'key': 'value'} if section or {'section': {'key':
'value'}}
section : str
An optional section name to update
Returns
-------
ConfigParser
The original ConfigParser object, updated.
"""
assert isinstance(cnf, _configparser.ConfigParser)
assert isinstance(dct, dict)
if section:
for v in dct.values():
assert not isinstance(v, (tuple, list, dict, set))
else:
for v in dct.values():
assert isinstance(v, dict)
if section:
if not section in cnf.sections() + ['DEFAULT']:
cnf.add_section(section)
for k, v in _dict_to_strings(dct).items():
cnf.set(section, k, v)
else:
for sect, vals in dct.items():
if not sect in cnf.sections() + ['DEFAULT']:
cnf.add_section(sect)
for k, v in _dict_to_strings(vals).items():
cnf.set(sect, k, v)
return cnf
def _dict_to_strings(kwds):
"""Convert all values in a dictionary to strings."""
ot = {}
assert isinstance(kwds, dict)
for k, v in kwds.items():
ot[k] = str(v)
return ot
def _section_to_dict(section):
"""Convert a ConfigParser list of tuples to a dictionary.
Parameters
----------
section : list
Output of ConfigParser.items(section)
Returns
-------
dict
A dictionary of the above with typed values
"""
dct = dict(section)
out = {}
for key, val in dct.items():
out[key] = _typecast_items(val)
return out
def _typecast_items(x):
"""Try to convert a variable into the true type.
Avoids eval() as we will be used on config files.
For example: 'True' becomes True and '0.125' becomes 0.125
"""
c = re.compile(r', *')
r = re.compile(r': *')
if not isinstance(x, (six.string_types, six.text_type)):
return x
if x == 'True':
return True
if x == 'False':
return False
if x == 'None':
return None
if x.isdigit():
return int(x)
try:
return float(x)
except (ValueError, TypeError):
pass
try:
return complex(x)
except (ValueError, TypeError):
pass
try:
if x.startswith('[') and x.endswith(']'):
return [_typecast_items(i) for i in c.split(x.strip('[]'))]
if x.startswith('{') and x.endswith('}'):
if ':' in x:
return {
_typecast_items(k): _typecast_items(v) for k, v in [
r.split(i) for i in c.split(x.strip('{}'))
]
}
return {_typecast_items(i) for i in c.split(x.strip('{}'))}
except (ValueError, TypeError):
pass
if isinstance(x, (six.string_types)):
return x.strip('\'"')
return x
def _typecast_items(x):
"""Try to convert a variable into the true type.
Avoids eval() as we will be used on config files.
For example: 'True' becomes True and '0.125' becomes 0.125
"""
c = re.compile(r', *')
r = re.compile(r': *')
if not isinstance(x, (six.string_types, six.text_type)):
return x
if x == 'True':
return True
if x == 'False':
return False
if x == 'None':
return None
if x.isdigit():
return int(x)
try:
return float(x)
except (ValueError, TypeError):
pass
try:
return complex(x)
except (ValueError, TypeError):
pass
try:
if x.startswith('[') and x.endswith(']'):
return [_typecast_items(i) for i in c.split(x.strip('[]'))]
if x.startswith('{') and x.endswith('}'):
if ':' in x:
return {
_typecast_items(k): _typecast_items(v) for k, v in [
r.split(i) for i in c.split(x.strip('{}'))
]
}
return {_typecast_items(i) for i in c.split(x.strip('{}'))}
except (ValueError, TypeError):
pass
if isinstance(x, (six.string_types)):
return x.strip('\'"')
return x
def _config_to_dict(cnf):
"""Return a dictionary of all items from a ConfigParser object."""
out = {}
def_items = cnf.items('DEFAULT')
if def_items:
out['DEFAULT'] = _section_to_dict(def_items)
for sect in _sections(cnf):
out[sect] = _section_to_dict(cnf.items(sect))
return out
###############################################################################
# Completion #
###############################################################################
class _TabCompleter(object):
"""
A tab completer that can either complete from the filesystem or from a
list.
Taken from `https://gist.github.com/iamatypeofwalrus/5637895`_
"""
list_completer = None
def createListCompleter(self,ll):
"""
This is a closure that creates a method that autocompletes from
the given list.
Since the autocomplete function can't be given a list to complete from
a closure is used to create the list_completer function with a list to
complete from.
"""
def list_completer(_, state):
"""Make a completer from a list."""
line = _rl.get_line_buffer()
if not line:
return [c + " " for c in ll][state]
else:
return [c + " " for c in ll if c.startswith(line)][state]
self.list_completer = list_completer
def _path_completer(_, state):
"""
This is the tab completer for systems paths.
Only tested on *nix systems
"""
line = _rl.get_line_buffer()
if not line:
return _complete_path('.')[state]
else:
return _complete_path(line)[state]
def _listdir(root):
"List directory 'root' appending the path separator to subdirs."
res = []
for name in _os.listdir(root):
path = _os.path.join(root, name)
if _os.path.isdir(path):
name += _os.sep
res.append(name)
return res
def _complete_path(path=None):
"Perform completion of filesystem path."
if not path:
return _os.listdir('.')
path = _os.path.expanduser(path)
dirname, rest = _os.path.split(path)
tmp = dirname if dirname else '.'
res = [_os.path.join(dirname, p)
for p in _os.listdir(tmp) if p.startswith(rest)]
# more than one match, or single match which does not exist (typo)
if len(res) > 1 or not _os.path.exists(path):
return res
# resolved to a single directory, so return list of files below it
if _os.path.isdir(path):
return [_os.path.join(path, p) for p in _os.listdir(path)]
# exact file match terminates this completion
return [path + ' ']
def _get_set_path(message):
"""Get a path from the user, make the directory if necessary."""
t = _TabCompleter()
while True:
_rl.set_completer(_path_completer)
file_path = _run.get_input(message + ' ')
file_path = file_path.strip().lower()
if file_path:
file_path = _os.path.abspath(file_path)
if _os.path.isdir(file_path):
break
else:
t.createListCompleter(['y', 'n'])
_rl.set_completer(t.list_completer)
ans = _run.get_input(
'That directory does not exist, would you like to ' +
'try to create it? [y/N] ')
if ans.lower() == 'y':
try:
_os.makedirs(file_path)
break
except OSError:
print("Failed to make {}".format(file_path),
"Please make it manually or choose another",
"directory\n")
continue
else:
continue
return file_path
###############################################################################
# Set Constants from config, initialize config on import #
###############################################################################
# Create directory if it doesn't exist
if not _os.path.isdir(CONFIG_PATH):
if _os.path.exists(CONFIG_PATH):
_os.remove(CONFIG_PATH)
_os.makedirs(CONFIG_PATH)
if not _os.path.isfile(CONFIG_FILE):
create_config()
# Load config
config = load_config()
# Load profiles
profiles = load_profiles()