Source code for fyrd.basic

# -*- coding: utf-8 -*-
"""
Wrapper functions to make some common jobs easier.

Functions
---------
submit
    Submit a script to the cluster
make_job
    Make a job compatible with the chosen cluster but do not submit
make_job_file
    Make a job file compatible with the chosen cluster.
clean
    Delete all files in jobs list or single Job object.
submit_file
    Submit an existing job file to the cluster.
clean_work_dirs
    Clean all files in the scriptpath and outpath directories.
clean_dir
    Delete all files made by this module in directory.
wait
    Wait for jobs to finish.
get
    Get results of jobs when they complete.
"""
import os  as _os
import sys as _sys
from time import sleep as _sleep
from subprocess import CalledProcessError as _CalledProcessError

###############################################################################
#                               Import Ourself                                #
###############################################################################


from . import run   as _run
from . import conf  as _conf
from . import queue as _queue
from . import logme as _logme
from . import batch_systems as _batch
from . import ClusterError as _ClusterError
from .job import Job

__all__ = ['submit', 'make_job', 'make_job_file', 'submit_file', 'clean_dir',
           'clean_work_dirs', 'clean', 'wait', 'get']

###############################################################################
#                            Submission Functions                             #
###############################################################################


[docs]def submit(command, args=None, kwargs=None, name=None, qtype=None, profile=None, queue=None, **kwds): """Submit a script to the cluster. Parameters ---------- command : function/str The command or function to execute. args : tuple/dict, optional Optional arguments to add to command, particularly useful for functions. kwargs : dict, optional Optional keyword arguments to pass to the command, only used for functions. name : str, optional Optional name of the job. If not defined, guessed. If a job of the same name is already queued, an integer job number (not the queue number) will be added, ie. <name>.1 qtype : str, optional Override the default queue type profile : str, optional The name of a profile saved in the conf queue : fyrd.queue.Queue, optional An already initiated Queue class to use. kwds *All other keywords are parsed into cluster keywords by the options system.* For available keywords see `fyrd.option_help()` Returns ------- Job object """ _batch.check_queue() # Make sure the queue.MODE is usable job = Job(command=command, args=args, kwargs=kwargs, name=name, qtype=qtype, profile=profile, queue=queue, **kwds) job.write() job.submit() job.update() return job
######################### # Job file generation # #########################
[docs]def make_job(command, args=None, kwargs=None, name=None, qtype=None, profile=None, queue=None, **kwds): """Make a job compatible with the chosen cluster but do not submit. Parameters ---------- command : function/str The command or function to execute. args : tuple/dict, optional Optional arguments to add to command, particularly useful for functions. kwargs : dict, optional Optional keyword arguments to pass to the command, only used for functions. name : str, optional Optional name of the job. If not defined, guessed. If a job of the same name is already queued, an integer job number (not the queue number) will be added, ie. <name>.1 qtype : str, optional Override the default queue type profile : str, optional The name of a profile saved in the conf queue : fyrd.queue.Queue, optional An already initiated Queue class to use. kwds *All other keywords are parsed into cluster keywords by the options system.* For available keywords see `fyrd.option_help()` Returns ------- Job object """ _batch.check_queue() # Make sure the queue.MODE is usable job = Job(command=command, args=args, kwargs=kwargs, name=name, qtype=qtype, profile=profile, queue=queue, **kwds) # Return the path to the script return job
[docs]def make_job_file(command, args=None, kwargs=None, name=None, qtype=None, profile=None, queue=None, **kwds): """Make a job file compatible with the chosen cluster. Parameters ---------- command : function/str The command or function to execute. args : tuple/dict, optional Optional arguments to add to command, particularly useful for functions. kwargs : dict, optional Optional keyword arguments to pass to the command, only used for functions. name : str, optional Optional name of the job. If not defined, guessed. If a job of the same name is already queued, an integer job number (not the queue number) will be added, ie. <name>.1 qtype : str, optional Override the default queue type profile : str, optional The name of a profile saved in the conf queue : fyrd.queue.Queue, optional An already initiated Queue class to use. kwds *All other keywords are parsed into cluster keywords by the options system.* For available keywords see `fyrd.option_help()` Returns ------- str Path to job file """ _batch.check_queue() # Make sure the queue.MODE is usable job = Job(command=command, args=args, kwargs=kwargs, name=name, qtype=qtype, profile=profile, queue=queue, **kwds) job = job.write() # Return the path to the script return job.submission.file_name
############## # Cleaning # ##############
[docs]def clean(jobs, clean_outputs=False): """Delete all files in jobs list or single Job object. Parameters ---------- jobs : fyrd.job.Job or list of fyrd.job.Job Job objects to clean clean_outputs : bool Also clean outputs. """ jobs = _run.listify(jobs) for job in jobs: job.clean(delete_outputs=clean_outputs)
############################################################################### # Job Object Independent Functions # ###############################################################################
[docs]def submit_file(script_file, dependencies=None, qtype=None, submit_args=None): """Submit an existing job file to the cluster. This function is independent of the Job object and just submits a file using a cluster appropriate method. Parameters ---------- script_file : str The path to the file to submit dependencies: str or list of strings, optional A job number or list of job numbers to depend on qtype : str, optional The name of the queue system to use, auto-detected if not given. submit_args : dict A dictionary of keyword arguments for the submission script. Returns ------- job_number : str """ qtype = qtype if qtype else _batch.get_cluster_environment() _batch.check_queue(qtype) dependencies = _run.listify(dependencies) batch = _batch.get_batch_system(qtype) return batch.submit(script_file, dependencies)
def clean_work_dirs(outputs=False, confirm=False): """Clean all files in the scriptpath and outpath directories. Parameters ---------- outputs : bool Also delete output files. confirm : bool Confirm on command line before deleteing. Returns ------- files : list A list of deleted files. """ files = [] scriptpath = _conf.get_option('jobs', 'scriptpath') outpath = _conf.get_option('jobs', 'outpath') if scriptpath: files += clean_dir(scriptpath, delete_outputs=outputs, confirm=confirm) if outputs and outpath and outpath != scriptpath: files += clean_dir(outpath, delete_outputs=True, confirm=confirm) return files
[docs]def clean_dir(directory=None, suffix=None, qtype=None, confirm=False, delete_outputs=None): """Delete all files made by this module in directory. CAUTION: The clean() function will delete **EVERY** file with extensions matching those these:: .<suffix>.err .<suffix>.out .<suffix>.out.func.pickle .<suffix>.sbatch & .<suffix>.script for slurm mode .<suffix>.qsub for torque mode .<suffix>.job for local mode _func.<suffix>.py _func.<suffix>.py.pickle.in _func.<suffix>.py.pickle.out .. note:: This function will change in the future to use batch system defined paths. Parameters ---------- directory : str The directory to run in, defaults to the current directory. suffix : str Override the default suffix. qtype : str Only run on files of this qtype confirm : bool Ask the user before deleting the files delete_outputs : bool Delete all output files too. Returns ------- list A set of deleted files """ _batch.check_queue(qtype) # Make sure the queue.MODE is usable if delete_outputs is None: delete_outputs = _conf.get_option('jobs', 'clean_outputs') # Sanitize arguments directory = _os.path.abspath(directory if directory else '.') suffix = suffix if suffix else _conf.get_option('jobs', 'suffix') # Extension patterns to delete extensions = ['_func.' + suffix + '.py'] if delete_outputs: extensions += ['.' + suffix + '.err', '.' + suffix + '.out', '_func.' + suffix + '.py.pickle.out', '.' + suffix + '.out.func.pickle', '.' + suffix + '.job'] if qtype: if qtype == 'local': extensions.append('.' + suffix) elif qtype == 'slurm': extensions += ['.' + suffix + '.sbatch', '.' + suffix + '.script'] elif qtype== 'torque': extensions.append('.' + suffix + '.qsub') else: extensions.append('.' + suffix) extensions.append('_func.' + suffix + '.py.pickle.in') extensions += ['.' + suffix + '.sbatch', '.' + suffix + '.script'] extensions.append('.' + suffix + '.qsub') files = [_os.path.join(directory, i) for i in _os.listdir(directory)] files = [i for i in files if _os.path.isfile(i)] if not files: _logme.log('No files found.', 'debug') return [] deleted = [] for f in files: for extension in extensions: if f.endswith(extension): deleted.append(f) deleted = sorted(deleted) delete = False if confirm: if deleted: prompt = [_os.path.basename(i) for i in deleted] _sys.stdout.write('Directory: {}\n'.format(directory)) _sys.stdout.write('Files to delete::\n\t') _sys.stdout.write('\n\t'.join(prompt) + '\n') answer = _run.get_input("Do you want to delete these files? [Y/n]", 'yesno', 'y') if answer == 'y': delete = True _sys.stdout.write('Deleting...\n') else: _sys.stdout.write('Aborting\n') delete = False deleted = [] else: _sys.stdout.write('No files to delete.\n') else: delete = True if delete and deleted: for f in deleted: _os.remove(f) if confirm: _sys.stdout.write('Done\n') return deleted
############################################################################### # Simple Wrapper to Wait on Queue and Get Outputs # ############################################################################### def wait(jobs, notify=True, queue=None): """Wait for jobs to finish. Only works on user jobs by default. To work on jobs so someone else, initialize a fyrd.queue.Queue class with their user info and pass as an argument to queue. Parameters ---------- jobs : fyrd.job.Job or str or list of either (mixed list fine) A single job or list of jobs, either Job objects or job numbers notify : str, True, or False, optional If True, both notification address and wait_time must be set in the [notify] section of the config. A notification email will be sent if the time exceeds this time. This is the default. If a string is passed, notification is forced and the string must be the to address. False means no notification queue : fyrd.queue.Queue, optional An already initiated Queue class to use. Returns ------- success : bool True if all jobs successful, false otherwise """ q = queue if queue else _queue.default_queue() return q.wait(jobs, notify=notify) def get(jobs, queue=None): """Get results of jobs when they complete. Only works on user jobs by default. To work on jobs so someone else, initialize a fyrd.queue.Queue class with their user info and pass as an argument to queue. Parameters ---------- jobs : fyrd.job.Job or list of fyrd.job.Job Returns ------- list Outputs (STDOUT or return value) of jobs queue : fyrd.queue.Queue, optional An already initiated Queue class to use. .. note:: This function also modifies the input Job objects, so they will contain all outputs and state information. """ q = queue if queue else _queue.default_queue() return q.get(jobs)