Source code for fyrd.basic

# -*- coding: utf-8 -*-
"""
Functions to allow simple job and file submission without the Job class.
"""
import os  as _os
import sys as _sys
from time import sleep as _sleep
from subprocess import CalledProcessError as _CalledProcessError

###############################################################################
#                               Import Ourself                                #
###############################################################################


from . import run   as _run
from . import conf  as _conf
from . import queue as _queue
from . import local as _local
from . import logme as _logme
from . import ClusterError as _ClusterError
from .job import Job

__all__ = ['submit', 'make_job', 'make_job_file', 'submit_file', 'clean_dir']

###############################################################################
#                            Submission Functions                             #
###############################################################################


[docs]def submit(command, args=None, kwargs=None, name=None, qtype=None, profile=None, **kwds): """Submit a script to the cluster. Args: command (function/str): The command or function to execute. args (tuple/dict): Optional arguments to add to command, particularly useful for functions. kwargs (dict): Optional keyword arguments to pass to the command, only used for functions. name (str): Optional name of the job. If not defined, guessed. If a job of the same name is already queued, an integer job number (not the queue number) will be added, ie. <name>.1 qtype (str): Override the default queue type profile (str): The name of a profile saved in the conf *All other keywords are parsed into cluster keywords by the options system. For available keywords see `fyrd.option_help()`* Returns: Job object """ _queue.check_queue() # Make sure the queue.MODE is usable job = Job(command=command, args=args, kwargs=kwargs, name=name, qtype=qtype, profile=profile, **kwds) job.write() job.submit() job.update() return job
######################### # Job file generation # #########################
[docs]def make_job(command, args=None, kwargs=None, name=None, qtype=None, profile=None, **kwds): """Make a job file compatible with the chosen cluster. If mode is local, this is just a simple shell script. Args: command (function/str): The command or function to execute. args (tuple/dict): Optional arguments to add to command, particularly useful for functions. kwargs (dict): Optional keyword arguments to pass to the command, only used for functions. name (str): Optional name of the job. If not defined, guessed. If a job of the same name is already queued, an integer job number (not the queue number) will be added, ie. <name>.1 qtype (str): Override the default queue type profile (str): The name of a profile saved in the conf *All other keywords are parsed into cluster keywords by the options system. For available keywords see `fyrd.option_help()`* Returns: Job object """ _queue.check_queue() # Make sure the queue.MODE is usable job = Job(command=command, args=args, kwargs=kwargs, name=name, qtype=qtype, profile=profile, **kwds) # Return the path to the script return job
[docs]def make_job_file(command, args=None, kwargs=None, name=None, qtype=None, profile=None, **kwds): """Make a job file compatible with the chosen cluster. Args: command (function/str): The command or function to execute. args (tuple/dict): Optional arguments to add to command, particularly useful for functions. kwargs (dict): Optional keyword arguments to pass to the command, only used for functions. name (str): Optional name of the job. If not defined, guessed. If a job of the same name is already queued, an integer job number (not the queue number) will be added, ie. <name>.1 qtype (str): Override the default queue type profile (str): The name of a profile saved in the conf *All other keywords are parsed into cluster keywords by the options system. For available keywords see `fyrd.option_help()`* Returns: Job object """ _queue.check_queue() # Make sure the queue.MODE is usable job = Job(command=command, args=args, kwargs=kwargs, name=name, qtype=qtype, profile=profile, **kwds) job = job.write() # Return the path to the script return job.submission
############## # Cleaning # ##############
[docs]def clean(jobs): """Delete all files in jobs list or single Job object.""" if isinstance(jobs, Job): jobs = [jobs] if not isinstance(jobs, (list, tuple)): raise _ClusterError('Job list must be a Job, list, or tuple') for job in jobs: job.clean()
############################################################################### # Job Object Independent Functions # ###############################################################################
[docs]def submit_file(script_file, dependencies=None, threads=None, qtype=None): """Submit a job file to the cluster. If qtype or queue.MODE is torque, qsub is used; if it is slurm, sbatch is used; if it is local, the file is executed with subprocess. This function is independent of the Job object and just submits a file. Args: dependencies: A job number or list of job numbers. In slurm: `--dependency=afterok:` is used For torque: `-W depend=afterok:` is used threads: Total number of threads to use at a time, defaults to all. ONLY USED IN LOCAL MODE Returns: job number for torque or slurm multiprocessing job object for local mode """ _queue.check_queue() # Make sure the queue.MODE is usable if not qtype: qtype = _queue.get_cluster_environment() # Check dependencies if dependencies: if isinstance(dependencies, (str, int)): dependencies = [dependencies] if not isinstance(dependencies, (list, tuple)): raise Exception('dependencies must be a list, int, or string.') dependencies = [str(i) for i in dependencies] if qtype == 'slurm': if dependencies: dependencies = '--dependency=afterok:{}'.format( ':'.join([str(d) for d in dependencies])) args = ['sbatch', dependencies, script_file] else: args = ['sbatch', script_file] # Try to submit job 5 times count = 0 while True: code, stdout, stderr = _run.cmd(args) if code == 0: job = int(stdout.split(' ')[-1]) break else: if count == 5: _logme.log('sbatch failed with code {}\n'.format(code), 'stdout: {}\nstderr: {}'.format(stdout, stderr), 'critical') raise _CalledProcessError(code, args, stdout, stderr) _logme.log('sbatch failed with err {}. Resubmitting.'.format( stderr), 'debug') count += 1 _sleep(1) continue break return job elif qtype == 'torque': if dependencies: dependencies = '-W depend={}'.format( ','.join(['afterok:' + d for d in dependencies])) args = ['qsub', dependencies, script_file] else: args = ['qsub', script_file] # Try to submit job 5 times count = 0 while True: code, stdout, stderr = _run.cmd(args) if code == 0: job = int(stdout.split('.')[0]) break else: if count == 5: _logme.log('qsub failed with code {}\n'.format(code), 'stdout: {}\nstderr: {}'.format(stdout, stderr), 'critical') raise _CalledProcessError(code, args, stdout, stderr) _logme.log('qsub failed with err {}. Resubmitting.'.format( stderr), 'debug') count += 1 _sleep(1) continue break return job elif qtype == 'local': # Normal mode dependency tracking uses only integer job numbers depends = [] if dependencies: for depend in dependencies: if isinstance(depend, Job): depends.append(int(depend.id)) else: depends.append(int(depend)) command = 'bash {}'.format(script_file) # Make sure the global job pool exists if not _local.JQUEUE or not _local.JQUEUE.runner.is_alive(): _local.JQUEUE = _local.JobQueue(cores=threads) return _local.JQUEUE.add(_run.cmd, (command,), dependencies=depends)
[docs]def clean_dir(directory='.', suffix=None, qtype=None, confirm=False, delete_outputs=None): """Delete all files made by this module in directory. CAUTION: The clean() function will delete **EVERY** file with extensions matching those these:: .<suffix>.err .<suffix>.out .<suffix>.out.func.pickle .<suffix>.sbatch & .<suffix>.script for slurm mode .<suffix>.qsub for torque mode .<suffix> for local mode _func.<suffix>.py _func.<suffix>.py.pickle.in _func.<suffix>.py.pickle.out Args: directory (str): The directory to run in, defaults to the current directory. suffix (str): Override the default suffix. qtype (str): Only run on files of this qtype confirm (bool): Ask the user before deleting the files delete_outputs (bool): Delete all output files too. Returns: A set of deleted files """ _queue.check_queue(qtype) # Make sure the queue.MODE is usable if delete_outputs is None: delete_outputs = _conf.get_option('jobs', 'clean_outputs') # Sanitize arguments if not directory: directory = '.' if not suffix: suffix = _conf.get_option('jobs', 'suffix') # Extension patterns to delete extensions = ['_func.' + suffix + '.py'] if delete_outputs: extensions += ['.' + suffix + '.err', '.' + suffix + '.out', '_func.' + suffix + '.py.pickle.out', '.' + suffix + '.out.func.pickle'] if qtype: if qtype == 'local': extensions.append('.' + suffix) elif qtype == 'slurm': extensions += ['.' + suffix + '.sbatch', '.' + suffix + '.script'] elif qtype== 'torque': extensions.append('.' + suffix + '.qsub') else: extensions.append('.' + suffix) extensions.append('_func.' + suffix + '.py.pickle.in') extensions += ['.' + suffix + '.sbatch', '.' + suffix + '.script'] extensions.append('.' + suffix + '.qsub') files = [i for i in _os.listdir(_os.path.abspath(directory)) if _os.path.isfile(i)] if not files: _logme.log('No files found.', 'debug') return [] deleted = [] for f in files: for extension in extensions: if f.endswith(extension): deleted.append(f) deleted = sorted(deleted) delete = False if confirm: if deleted: _sys.stdout.write('Files to delete::\n\t') _sys.stdout.write('\n\t'.join(deleted) + '\n') answer = _run.get_input("Do you want to delete these files? [Y/n]", ['y', 'n']) if answer == 'y': delete = True _sys.stdout.write('Deleting...\n') else: _sys.stdout.write('Aborting\n') delete = False deleted = [] else: _sys.stdout.write('No files to delete.\n') else: delete = True if delete and deleted: for f in deleted: _os.remove(f) if confirm: _sys.stdout.write('Done\n') return deleted