# -*- coding: utf-8 -*-
"""
High level functions to make complex tasks easier.
Functions
---------
parapply
Run a function on a pandas DataFrame in parralel on a cluster
parapply_summary
Run parapply and merge the results (don't concatenate).
splitrun
Split a file, run a command in parallel, return result.
"""
import os as _os
from six import text_type as _txt
from six import string_types as _str
from six import integer_types as _int
###############################################################################
# Import Ourself #
###############################################################################
from . import run as _run
from . import conf as _conf
from . import logme as _logme
from . import batch_systems as _batch
from .job import Job as _Job
_options = _batch.options
###############################################################################
# Try Import Non-Required Modules #
###############################################################################
try:
import numpy as _np
import pandas as _pd
except ImportError:
_logme.log('Could not import numpy and pandas for helpers', 'debug')
__all__ = ['jobify', 'parapply', 'parapply_summary', 'splitrun']
###############################################################################
# Decorators #
###############################################################################
[docs]def jobify(name=None, profile=None, qtype=None, submit=True, **kwds):
"""Decorator to make any function a job.
Will make any function return a Job object that will execute the function
on the cluster.
If `submit` is `True`, the job will be submitted when it is returned.
Usage:
.. code:: python
@fyrd.jobify(name='my_job', profile='small', mem='8GB',
time='00:10:00', imports=['from time import sleep'])
def do_something(file_path, iteration_count=24):
for i in range(iteration_count):
print(file_path + i)
sleep(1)
return file_path
job = do_something('my_file.txt')
out = job.get()
Parameters
----------
name : str, optional
Optional name of the job. If not defined, guessed. If a job of the same
name is already queued, an integer job number (not the queue number)
will be added, ie. <name>.1
qtype : str, optional
Override the default queue type
profile : str, optional
The name of a profile saved in the conf
submit : bool, optional
Submit the Job before returning it
kwds
*All other keywords are parsed into cluster keywords by the options
system.* For available keywords see `fyrd.option_help()`
Returns
-------
fyrd.job.Job
A Job class initialized with the decorated function.
Examples
--------
>>> import fyrd
>>> @fyrd.jobify(name='test_job', mem='1GB')
... def test(string, iterations=4):
... \"\"\"This does basically nothing!\"\"\"
... outstring = ""
... for i in range(iterations):
... outstring += "Version {0}: {1}".format(i, string)
... return outstring
>>> j = test('hi')
>>> j.get()
'Version 0: hiVersion 1: hiVersion 2: hiVersion 3: hiVersion 4: hi'
"""
def deco(func):
"""This will be the actual decorator for the function."""
# Munge the job docstring to preserve function documentation
fdoc = func.__doc__
if fdoc:
fdoc = "\nOriginal Docstring:\n\n{0}".format(fdoc)
else:
fdoc = ""
def wrapper(*args, **kwargs):
"""This will convert the function into a Job object."""
job = _Job(func, args=args, kwargs=kwargs, name=name, qtype=qtype,
profile=profile, **kwds)
jdoc = job.__doc__
if jdoc:
jdoc = "\n\nJob documentation:\n\n{0}".format(jdoc)
else:
jdoc = ""
job.__doc__ = (
"This is a fyrd.job.Job decorated function, " +
"to get results run `.get()`.\n" + fdoc + jdoc
)
if submit:
job.submit()
return job
wrapper.__doc__ = (
"This is a fyrd.job.Job decorated function.\n" +
"\nWhen you call it it will return a Job object from " +
"which you can get\nthe results with the `.get()` method.\n" +
fdoc
)
return wrapper
return deco
###############################################################################
# parapply #
###############################################################################
[docs]def parapply(jobs, df, func, args=(), profile=None, applymap=False,
merge_axis=0, merge_apply=False, name='parapply', imports=None,
direct=True, **kwds):
"""Split a dataframe, run apply in parallel, return result.
This function will split a dataframe into however many pieces are requested
with the jobs argument, run apply in parallel by submitting the jobs to the
cluster, and then recombine the outputs.
If the 'clean_files' and 'clean_outputs' arguments are not passed, we
delete all intermediate files and output files by default.
This function will take any keyword arguments accepted by Job, which can
be found by running fyrd.options.option_help(). It also accepts any of
the keywords accepted by by pandas.DataFrame.apply(), found
`here <http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.apply.html>`_
Parameters
----------
jobs : int
Number of pieces to split the dataframe into
df : DataFrame
Any pandas DataFrame
args : tuple
Positional arguments to pass to the function, keyword arguments can
just be passed directly.
profile : str
A fyrd cluster profile to use
applymap : bool
Run applymap() instead of apply()
merge_axis : int
Which axis to merge on, 0 or 1, default is 1 as apply transposes
columns
merge_apply : bool
Apply the function on the merged dataframe also
name : str
A prefix name for all of the jobs
imports : list
A list of imports in any format, e.g.
`['import numpy', 'scipy', 'from numpy import mean']`
direct : bool
Whether to run the function directly or to return a Job. Default True.
Any keyword arguments recognized by fyrd will be used for job
submission.
*Additional keyword arguments will be passed to DataFrame.apply()*
Returns
-------
DataFrame
A recombined DataFrame: concatenated version of original split
DataFrame
Example
-------
>>> import numpy
>>> import pandas
>>> import fyrd
>>> df = pandas.DataFrame([[0, 1], [2, 6], [9, 24], [13, 76], [4, 12]])
>>> df['sum'] = fyrd.helpers.parapply(2, df, lambda x: x[0]+x[1], axis=1)
>>> df
0 1 sum
0 0 1 1
1 2 6 8
2 9 24 33
3 13 76 89
4 4 12 16
See Also
--------
parapply_summary: Merge results of parapply using applied function
splitrun: Run a command in parallel on a split file
"""
if direct:
return _parapply(
jobs, df, func, args=args, profile=profile, applymap=applymap,
merge_axis=merge_axis, merge_apply=merge_apply, name=name,
imports=imports, **kwds
)
kwargs = dict(
args=args, profile=profile, applymap=applymap,
merge_axis=merge_axis, merge_apply=merge_apply, name=name,
imports=imports
)
kwds = _options.sanitize_arguments(kwds)
kwargs.update(kwds)
kwargs['imports'] = _run.get_all_imports(func, kwargs)
kwargs['syspaths'] = _run.update_syspaths(func, kwargs)
return _wrap_runner(
_parapply,
*(jobs, df, func),
**kwargs
)
def _parapply(jobs, df, func, args=(), profile=None, applymap=False,
merge_axis=0, merge_apply=False, name='parapply', imports=None,
**kwds):
"""Direct running function for parapply, see parapply docstring."""
# Handle arguments
if not isinstance(jobs, _int):
raise ValueError('Jobs argument must be an integer.')
if not isinstance(df, (_pd.core.frame.DataFrame, _np.ndarray)):
raise ValueError('df must be a dataframe or numpy array, is {}'
.format(type(df)))
if not callable(func):
raise ValueError('function must be callable, current type is {}'
.format(type(func)))
if profile is not None and not isinstance(profile, (_str, _txt)):
raise ValueError('Profile must be a string, is {}'
.format(type(profile)))
fyrd_kwds, pandas_kwds = _options.split_keywords(kwds)
# Get name
name = name if name else 'split_file'
# Set up auto-cleaning
if 'clean_files' not in fyrd_kwds:
fyrd_kwds['clean_files'] = True
if 'clean_outputs' not in fyrd_kwds:
fyrd_kwds['clean_outputs'] = True
# Split dataframe
_logme.log('Splitting dataframe', 'debug')
dfs = _np.array_split(df, jobs)
assert len(dfs) == jobs
# Pick function
sub_func = _run_applymap if applymap else _run_apply
# Some sane imports
imports = _run.listify(imports) if imports else []
imports += ['import numpy as np', 'import numpy', 'import scipy as sp',
'import sp', 'import pandas as pd', 'import pandas',
'from matplotlib import pyplot as plt',
'from scipy import stats']
imports = _run.export_imports(func, {'imports': imports})
# Run the functions
_logme.log('Submitting jobs', 'debug')
outs = []
count = 1
for d in dfs:
nm = '{}_{}_of_{}'.format(name, count, jobs)
outs.append(
_Job(sub_func, (d, func, args, pandas_kwds),
profile=profile, name=nm, imports=imports,
**fyrd_kwds).submit()
)
count += 1
# Get the results
_logme.log('Waiting for results', 'debug')
results = []
for out in outs:
try:
results.append(out.get())
except IOError:
_logme.log('Result getting failed, most likely one of the child ' +
'jobs crashed, check the error files', 'critical')
raise
# Return the recombined DataFrame
_logme.log('Done, joinging', 'debug')
try:
out = _pd.concat(results, axis=merge_axis)
except ValueError:
_logme.log('DataFrame concatenation failed!', 'critical')
raise
if merge_apply:
out = sub_func(out, func, args, pandas_kwds)
return out
[docs]def parapply_summary(jobs, df, func, args=(), profile=None, applymap=False,
name='parapply', imports=None, direct=True, **kwds):
"""Run parapply for a function with summary stats.
Instead of returning the concatenated result, merge the result using
the same function as was used during apply.
This works best for summary functions like `.mean()`, which do a linear
operation on a whole dataframe or series.
Parameters
----------
jobs : int
Number of pieces to split the dataframe into
df : DataFrame
Any pandas DataFrame
args : tuple
Positional arguments to pass to the function, keyword arguments can
just be passed directly.
profile : str
A fyrd cluster profile to use
applymap : bool
Run applymap() instead of apply()
merge_axis : int
Which axis to merge on, 0 or 1, default is 1 as apply transposes
columns
merge_apply : bool
Apply the function on the merged dataframe also
name : str
A prefix name for all of the jobs
imports : list
A list of imports in any format, e.g.
`['import numpy', 'scipy', 'from numpy import mean']`
direct : bool
Whether to run the function directly or to return a Job. Default True.
Any keyword arguments recognized by fyrd will be used for job
submission.
*Additional keyword arguments will be passed to DataFrame.apply()*
Returns
-------
DataFrame
A recombined DataFrame
Example
-------
>>> import numpy
>>> import pandas
>>> import fyrd
>>> df = pandas.DataFrame([[0, 1], [2, 6], [9, 24], [13, 76], [4, 12]])
>>> df = fyrd.helpers.parapply_summary(2, df, numpy.mean)
>>> df
0 6.083333
1 27.166667
dtype: float64
See Also
--------
parapply: Run a command in parallel on a DataFrame without merging the
result
"""
if direct:
return _parapply_summary(
jobs, df, func, args=args, profile=profile, applymap=applymap,
name=name, imports=imports, **kwds
)
kwargs = dict(
args=args, profile=profile, applymap=applymap,
name=name, imports=imports
)
kwds = _options.sanitize_arguments(kwds)
kwargs.update(kwds)
kwargs['imports'] = _run.get_all_imports(func, kwargs)
kwargs['syspaths'] = _run.update_syspaths(func, kwargs)
return _wrap_runner(
_parapply_summary,
*(jobs, df, func),
**kwargs
)
def _parapply_summary(jobs, df, func, args=(), profile=None, applymap=False,
name='parapply', imports=None, **kwds):
"""Direct running function for parapply_sumary, see that docstring."""
imports = _run.export_imports(func, {'imports': imports})
out = parapply(jobs, df, func, args, profile, applymap, name=name,
merge_axis=1, imports=imports, **kwds)
# Pick function and get args
sub_func = _run_applymap if applymap else _run_apply
pandas_kwds = _options.split_keywords(kwds)[1]
# Transpose
out = out.T
# Run the function again
return sub_func(out, func, args, pandas_kwds)
def _run_apply(df, func, args=None, pandas_kwds=None):
"""Run DataFrame.apply().
Parameters
----------
df : DataFrame
Any pandas DataFrame
args : tuple
A tuple of arguments to submit to the function
pandas_kwds : dict
A dictionary of keyword arguments to pass to DataFrame.apply()
Returns
-------
DataFrame
The result of apply()
"""
apply_kwds = {'args': args} if args else {}
if pandas_kwds:
apply_kwds.update(pandas_kwds)
return df.apply(func, **apply_kwds)
def _run_applymap(df, func, args=None, pandas_kwds=None):
"""Run DataFrame.applymap().
Parameters
----------
df : DataFrame
Any pandas DataFrame
args : tuple
A tuple of arguments to submit to the function
pandas_kwds : dict
A dictionary of keyword arguments to pass to DataFrame.apply()
Returns
-------
DataFrame
The result of apply()
"""
apply_kwds = {'args': args} if args else {}
if pandas_kwds:
apply_kwds.update(pandas_kwds)
return df.applymap(func, **apply_kwds)
###############################################################################
# Split Run #
###############################################################################
[docs]def splitrun(jobs, infile, inheader, command, args=None, kwargs=None,
name=None, qtype=None, profile=None, outfile=None,
outheader=False, merge_func=None, direct=True, **kwds):
"""Split a file, run command in parallel, return result.
This function will split a file into however many pieces are requested
with the jobs argument, and run command on each.
Accepts exactly the same arguments as the Job class, with the exception of
the first three and last four arguments, which are:
- the number of jobs
- the file to work on
- whether the input file has a header
- an optional output file
- whether the output file has a header
- an optional function to use to merge the resulting list, only used
if there is no outfile.
- whether to run directly or to return a Job. If direct is True, this
function will just run and thus block until complete, if direct is
False, the function will submit as a Job and return that Job.
**Note**: If command is a string, `.format(file={file})` will be called on
it, where file is each split file. If command is a function, the there must
be an argument in either args or kwargs that contains `{file}`. It will be
replaced with the *path to the file*, again by the format command.
If outfile is specified, there must also be an '{outfile}' line in any
script or an '{outfile}' argument in either args or kwargs. When this
function completes, the file at outfile will contain the concatenated
output files of all of the jobs.
If the 'clean_files' and 'clean_outputs' arguments are not passed, we
delete all intermediate files and output files by default.
The intermediate files will be stored in the 'scriptpath' directory.
Any header line is kept at the top of the file.
Primary return value varies and is decided in this order:
If outfile:
the absolute path to that file
If merge_func:
the result of merge_func(list), where list is the list of outputs.
Else:
a list of results
If direct is False, this function returns a fyrd.job.Job object which
will return the results described above on get().
Parameters
----------
jobs : int
Number of pieces to split the dataframe into
infile : str
The path to the file to be split.
inheader : bool
Does the input file have a header?
command : function/str
The command or function to execute.
args : tuple/dict
Optional arguments to add to command, particularly useful for
functions.
kwargs : dict
Optional keyword arguments to pass to the command, only used for
functions.
name : str
Optional name of the job. If not defined, guessed. If a job of the same
name is already queued, an integer job number (not the queue number)
will be added, ie. <name>.1
qtype : str
Override the default queue type
profile : str
The name of a profile saved in the conf
outfile : str
The path to the expected output file.
outheader : bool
Does the input outfile have a header?
merge_func : function
An optional function used to merge the output list if there is no
outfile.
direct : bool
Whether to run the function directly or to return a Job. Default True.
*All other keywords are parsed into cluster keywords by the options
system. For available keywords see `fyrd.option_help()` *
Returns
-------
Varies
See description above
"""
kwds = _options.check_arguments(kwds)
if direct:
return _splitrun(
jobs, infile, inheader, command, args=args, kwargs=kwargs,
name=name, qtype=qtype, profile=profile, outfile=outfile,
outheader=outheader, merge_func=merge_func, **kwds
)
else:
kk = dict(
args=args, kwargs=kwargs, name=name, qtype=qtype, profile=profile,
outfile=outfile, outheader=outheader, merge_func=merge_func
)
kwds = _options.sanitize_arguments(kwds)
kk.update(_options.check_arguments(kwds))
if callable(command):
kk['imports'] = _run.get_all_imports(command, kk)
kk['syspaths'] = _run.update_syspaths(command, kk)
return _wrap_runner(
_splitrun,
*(jobs, infile, inheader, command),
**kk
)
def _splitrun(jobs, infile, inheader, command, args=None, kwargs=None,
name=None, qtype=None, profile=None, outfile=None,
outheader=False, merge_func=None, **kwds):
"""This is the direct running function for `splitrun()`.
Please see that function's docstring for information and do not call this
function directly.
"""
# Handle arguments
if not isinstance(jobs, _int):
raise ValueError('Jobs argument must be an integer.')
if profile is not None and not isinstance(profile, (_str, _txt)):
raise ValueError('Profile must be a string, is {}'
.format(type(profile)))
kwds = _options.check_arguments(kwds)
# Get name
name = name if name else 'split_file'
# Check file
infile = _os.path.abspath(_os.path.expandvars(_os.path.expanduser(infile)))
if not _os.path.isfile(infile):
raise OSError('Cannot find file {}'.format(infile))
# Set up auto-cleaning
if 'clean_files' not in kwds:
kwds['clean_files'] = True
if 'clean_outputs' not in kwds:
kwds['clean_outputs'] = True
# Get script path, returns: kwds, runpath, outpath, scriptpath
cpath = _conf.get_job_paths(kwds)[3]
# Prep function
if callable(command):
# Add all imports from the function file to globals
kwds['imports'] = _run.export_imports(command, kwds)
# Split file
_logme.log('Splitting file', 'debug')
files = _run.split_file(infile, jobs, outpath=cpath, keep_header=inheader)
assert len(files) == jobs
# Run the functions
_logme.log('Submitting jobs', 'debug')
outs = []
count = 1
for f in files:
if outfile:
o = f + '.out'
nm = '{}_{}_of_{}'.format(name, count, jobs)
if callable(command):
try:
runargs, runkwargs = _run.replace_argument(
[args, kwargs], '{file}', f)
except ValueError:
raise ValueError("No '{file}' argument in either args or " +
"kwargs")
if outfile:
runargs, runkwargs = _run.replace_argument(
[runargs, runkwargs], '{outfile}', o)
outs.append(
_Job(command, args=runargs, kwargs=runkwargs, name=nm,
qtype=qtype, profile=profile, **kwds).submit()
)
else:
if outfile:
cmnd = command.format(file=f, outfile=o)
else:
cmnd = command.format(file=f)
outs.append(
_Job(cmnd, name=nm, qtype=qtype, profile=profile,
**kwds).submit()
)
count += 1
# Get the results
_logme.log('Waiting for results', 'debug')
results = []
for out in outs:
try:
results.append(out.get())
except IOError:
_logme.log('Result getting failed, most likely one of the child ' +
'jobs crashed, check the error files', 'critical')
raise
# Delete intermediate files
_logme.log('Removing intermediate files', 'debug')
for f in files:
assert _os.path.isfile(f)
_os.remove(f)
# Return the recombined DataFrame
_logme.log('Done, joining', 'debug')
if outfile:
with open(outfile, 'w') as fout:
if outheader:
with open(files[0]) as fin:
fout.write(fin.readline())
for f in files:
with open(f + '.out') as fin:
if outheader:
fin.readline()
fout.write(fin.read())
out = _os.path.abspath(outfile)
elif merge_func:
assert callable(merge_func)
out = merge_func(results)
elif isinstance(results[0], list):
out = _run.merge_lists(results)
else:
out = results
return out
###############################################################################
# Helper Functions #
###############################################################################
def _wrap_runner(command, *args, **kkk):
"""Run a command as a job, for use with the functions in this file.
*Note*: Only uses the 'profile', 'imports', 'syspaths', 'mem', 'clean_files',
'clean_outputs', and 'partition' keyword arguments for job submission,
all others are ignored.
Returns
-------
fyrd.job.Job
A Job class (not submitted) for the command.
"""
mem = kkk['mem'] if 'mem' in kkk else 2000
cln_f = kkk['clean_files'] if 'clean_files' in kkk else True
cln_o = kkk['clean_outputs'] if 'clean_outputs' in kkk else True
kkk['clean_files'] = cln_f
kkk['clean_outputs'] = cln_o
add_args = dict(
cores=1, mem=mem, clean_files=cln_f, clean_outputs=cln_o
)
if 'partition' in kkk:
add_args.update(dict(partition=kkk['partition']))
if 'profile' in kkk:
add_args.update(dict(profile=kkk['profile']))
if callable(command):
_logme.log('Running function {} with export run'
.format(command.__name__), 'debug')
kkk['imports'] = _run.export_imports(command, kkk)
spth = [_run.get_function_path(command)]
kkk['syspaths'] = spth + _run.listify(kkk['syspaths']) \
if 'syspaths' in kkk else spth
add_args.update(dict(imports=kkk['imports'], syspaths=kkk['syspaths']))
_logme.log('export_run imports: {}'.format(kkk['imports']), 'debug')
_logme.log('export_run syspath: {}'.format(kkk['syspaths']), 'debug')
else:
_logme.log('Running command {} with export run'
.format(command), 'debug')
return _Job(
_run.export_run, (command, args, kkk), **add_args
)