# -*- coding: utf-8 -*-
"""
File management and execution functions.
"""
import os
import re
import sys
import bz2
import gzip
import argparse
from subprocess import Popen
from subprocess import PIPE
from time import sleep
from . import logme
__all__ = ['cmd', 'which', 'open_zipped']
###############################################################################
# Useful Classes #
###############################################################################
class CustomFormatter(argparse.ArgumentDefaultsHelpFormatter,
argparse.RawDescriptionHelpFormatter):
"""Custom argparse formatting."""
pass
class CommandError(Exception):
"""A custom exception."""
pass
###############################################################################
# Useful Functions #
###############################################################################
[docs]def open_zipped(infile, mode='r'):
"""Open a regular, gzipped, or bz2 file.
If infile is a file handle or text device, it is returned without
changes.
Returns:
text mode file handle.
"""
mode = mode[0] + 't'
if hasattr(infile, 'write'):
return infile
if isinstance(infile, str):
if infile.endswith('.gz'):
return gzip.open(infile, mode)
if infile.endswith('.bz2'):
if hasattr(bz2, 'open'):
return bz2.open(infile, mode)
else:
return bz2.BZ2File(infile, mode)
return open(infile, mode)
def opt_split(opt, split_on):
"""Split opt by chars in split_on, merge all into single list."""
if not isinstance(opt, (list, tuple, set)):
opt = [opt]
if not isinstance(split_on, (list, tuple, set)):
split_on = [split_on]
final_list = []
for o in opt:
final_list += re.split('[{}]'.format(''.join(split_on)), o)
return list(set(final_list)) # Return unique options only, order lost.
[docs]def cmd(command, args=None, stdout=None, stderr=None, tries=1):
"""Run command and return status, output, stderr.
Args:
command (str): Path to executable.
args (tuple): Tuple of arguments.
stdout (str): File or open file like object to write STDOUT to.
stderr (str): File or open file like object to write STDERR to.
tries (int): Number of times to try to execute. 1+
Returns:
tuple: exit_code, STDOUT, STDERR
"""
tries = int(tries)
assert tries > 0
count = 1
if isinstance(command, (list, tuple)):
if args:
raise Exception('Cannot submit list/tuple command as i' +
'well as args argument')
command = ' '.join(command)
assert isinstance(command, str)
if args:
if isinstance(args, (list, tuple)):
args = ' '.join(args)
args = command + args
else:
args = command
logme.log('Running {} as {}'.format(command, args), 'verbose')
while True:
try:
pp = Popen(args, shell=True, universal_newlines=True,
stdout=PIPE, stderr=PIPE)
except FileNotFoundError:
logme.log('{} does not exist'.format(command), 'critical')
raise
out, err = pp.communicate()
code = pp.returncode
if code == 0 or count == tries:
break
logme.log('Command {} failed with code {}, retrying.'
.format(command, code), 'warn')
sleep(1)
count += 1
logme.log('{} completed with code {}'.format(command, code), 'debug')
if stdout:
with open_zipped(stdout, 'w') as fout:
fout.write(out)
if stderr:
with open_zipped(stderr, 'w') as fout:
fout.write(err)
return code, out.rstrip(), err.rstrip()
[docs]def is_exe(fpath):
"""Return True is fpath is executable."""
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
[docs]def which(program):
"""Replicate the UNIX which command.
Taken verbatim from:
stackoverflow.com/questions/377017/test-if-executable-exists-in-python
Args:
program: Name of executable to test.
Returns:
Path to the program or None on failure.
"""
fpath, program = os.path.split(program)
if fpath:
if is_exe(program):
return os.path.abspath(program)
else:
for path in os.environ["PATH"].split(os.pathsep):
path = path.strip('"')
exe_file = os.path.join(path, program)
if is_exe(exe_file):
return os.path.abspath(exe_file)
return None
[docs]def file_type(infile):
"""Return file type after stripping gz or bz2."""
name_parts = infile.split('.')
if name_parts[-1] == 'gz' or name_parts[-1] == 'bz2':
name_parts.pop()
return name_parts[-1]
[docs]def is_file_type(infile, types):
"""Return True if infile is one of types.
Args:
infile: Any file name
types: String or list/tuple of strings (e.g ['bed', 'gtf'])
Returns:
True or False
"""
if hasattr(infile, 'write'):
return False
if isinstance(types, str):
types = [types]
if not isinstance(types, (list, tuple)):
raise Exception('types must be string list or tuple')
for typ in types:
if file_type(infile) == typ:
return True
return False
[docs]def write_iterable(iterable, outfile):
"""Write all elements of iterable to outfile."""
with open_zipped(outfile, 'w') as fout:
fout.write('\n'.join(iterable))
[docs]def indent(string, prefix=' '):
"""Replicate python3's textwrap.indent for python2.
Args:
string (str): Any string.
prefix (str): What to indent with.
Returns:
str: Indented string
"""
out = ''
for i in string.split('\n'):
out += '{}{}\n'.format(prefix, i)
return out
[docs]def check_pid(pid):
"""Check For the existence of a unix pid."""
try:
os.kill(pid, 0)
except OSError:
return False
else:
return True
def _get_input(message):
"""Run either input or raw input depending on python version."""
if sys.version_info.major == 2:
return raw_input(message)
else:
return input(message)
###############################################################################
# Scripts to Write to File #
###############################################################################
SCRP_RUNNER = """\
#!/bin/bash
{precmd}
mkdir -p $LOCAL_SCRATCH > /dev/null 2>/dev/null
if [ -f {script} ]; then
{command}
else
echo "{script} does not exist, make sure you set your filepath to a "
echo "directory that is available to the compute nodes."
exit 1
fi
"""
SCRP_RUNNER_TRACK = """\
#!/bin/bash
{precmd}
mkdir -p $LOCAL_SCRATCH > /dev/null 2>/dev/null
if [ -f {script} ]; then
cd {usedir}
date +'%y-%m-%d-%H:%M:%S'
echo "Running {name}"
{command}
exitcode=$?
echo Done
date +'%y-%m-%d-%H:%M:%S'
if [[ $exitcode != 0 ]]; then
echo Exited with code: $exitcode >&2
fi
else
echo "{script} does not exist, make sure you set your filepath to a "
echo "directory that is available to the compute nodes."
exit 1
fi
"""
CMND_RUNNER = """\
#!/bin/bash
{precmd}
mkdir -p $LOCAL_SCRATCH > /dev/null 2>/dev/null
cd {usedir}
{command}
exitcode=$?
if [[ $exitcode != 0 ]]; then
echo Exited with code: $exitcode >&2
fi
"""
CMND_RUNNER_TRACK = """\
#!/bin/bash
{precmd}
mkdir -p $LOCAL_SCRATCH > /dev/null 2>/dev/null
cd {usedir}
date +'%y-%m-%d-%H:%M:%S'
echo "Running {name}"
{command}
exitcode=$?
echo Done
date +'%y-%m-%d-%H:%M:%S'
if [[ $exitcode != 0 ]]; then
echo Exited with code: $exitcode >&2
fi
"""
FUNC_RUNNER = r"""\
'''
Run a function remotely and pickle the result.
To try and make this as resistent to failure as possible, we import everything
we can, this sometimes results in duplicate imports and always results in
unnecessary imports, but given the context we don't care, as we just want the
thing to run successfully on the first try, no matter what.
'''
import os
import sys
import socket
from subprocess import Popen, PIPE
# Try to use dill, revert to pickle if not found
try:
import dill as pickle
except ImportError:
try:
import cPickle as pickle # For python2
except ImportError:
import pickle
out = None
try:
{imports}
{modimpstr}
except Exception as e:
out = e
ERR_MESSAGE = '''\
Failed to import your function. This usually happens when you have a module
installed locally that is not available on the compute nodes.\n In this case
the module is {{}}.
However, I can only catch the first uninstalled module. To make sure all of
your modules are installed on the compute nodes, do this::
freeze --local | grep -v ^\-e | cut -d = -f 1 "> module_list.txt\n
Then, submit a job to the compute nodes with this command::
cat module_list.txt | xargs pip install --user
'''
def run_function(func_c, args=None, kwargs=None):
'''Run a function with arglist and return output.'''
if not hasattr(func_c, '__call__'):
raise Exception('{{}} is not a callable function.'
.format(func_c))
if args and kwargs:
ot = func_c(*args, **kwargs)
elif args:
try:
iter(args)
except TypeError:
args = (args,)
if isinstance(args, str):
args = (args,)
ot = func_c(*args)
elif kwargs:
ot = func_c(**kwargs)
else:
ot = func_c()
return ot
if __name__ == "__main__":
# If an Exception was raised during import, skip this
if not out:
with open('{pickle_file}', 'rb') as fin:
# Try to install packages first
try:
function_call, args, kwargs = pickle.load(fin)
except ImportError as e:
module = str(e).split(' ')[-1]
node = socket.gethostname()
sys.stderr.write(ERR_MESSAGE.format(module))
out = ImportError(('Module {{}} is not installed on compute '
'node {{}}').format(module, node))
try:
if not out:
out = run_function(function_call, args, kwargs)
except Exception as e:
out = e
with open('{out_file}', 'wb') as fout:
pickle.dump(out, fout)
"""