API Reference¶
fyrd.queue¶
The core class in this file is the Queue() class which does most of the queue management. In addition, get_cluster_environment() attempts to autodetect the cluster type (torque, slurm, normal) and sets the global cluster type for the whole file. Finally, the wait() function accepts a list of jobs and will block until those jobs are complete.
The Queue class relies on a few simple queue parsers defined by the torque_queue_parser and slurm_queue_parser functions. These call qstat -x or squeue and sacct to get job information, and yield a simple tuple of that data with the following members:
job_id, name, userid, partition, state, node-list, node-count, cpu-per-node, exit-code
The Queue class then converts this information into a Queue.QueueJob object and adds it to the internal jobs dictionary within the Queue class. This list is now the basis for all of the other functionality encoded by the Queue class. It can be accessed directly, or sliced by accessing the completed, queued, and running attributes of the Queue class, these are used to simply divide up the jobs dictionary to make finding information easy.
fyrd.queue.Queue¶
-
class
fyrd.queue.
Queue
(user=None, partition=None, qtype=None)[source]¶ Bases:
object
A wrapper for torque, slurm, or local queues.
-
jobs
¶ dict – {jobid: Queue.QueueJob}
-
max_jobs
¶ int – The maximum number of jobs allowed in the queue
-
job_states
¶ int – A list of the different states of jobs in this queue
-
active_job_count
¶ int – A count of all jobs that are either queued or running in the current queue
-
can_submit
¶ bool – True if total active jobs is less than max_jobs
Can filter by user, queue type or partition on initialization.
Parameters: - user (str) – Optional usernameto filter the queue with. If user=’self’ or ‘current’, the current user will be used.
- partition (str) – Optional partition to filter the queue with.
- qtype (str) – ‘torque’, ‘slurm’, or ‘local’, defaults to auto-detect.
-
Methods¶
-
Queue.
wait
(jobs)[source]¶ Block until all jobs in jobs are complete.
Update time is dependant upon the queue_update parameter in your ~/.fyrd file.
In addition, wait() will not return until between 1 and 3 seconds after a job has completed, irrespective of queue_update time. This allows time for any copy operations to complete after the job exits.
Parameters: jobs – A job or list of jobs to check. Can be one of: Job or multiprocessing.pool.ApplyResult objects, job ID (int/str), or a object or a list/tuple of multiple Jobs or job IDs. Returns: True on success False or None on failure.
-
Queue.
wait_to_submit
(max_jobs=None)[source]¶ Block until fewer running/queued jobs in queue than max_jobs.
Parameters: max_jobs (int) – Override self.max_jobs
fyrd.queue functions¶
parsers¶
-
fyrd.queue.
queue_parser
(qtype=None, user=None, partition=None)[source]¶ Call either torque or slurm qtype parsers depending on qtype.
Parameters: - qtype – Either ‘torque’ or ‘slurm’, defaults to current MODE
- user – optional user name to pass to queue to filter queue with
Yields: tuple –
- job_id, name, userid, partition, state, nodelist, numnodes,
ntpernode, exit_code
-
fyrd.queue.
torque_queue_parser
(user=None, partition=None)[source]¶ Iterator for torque queues.
Use the qstat -x command to get an XML queue for compatibility.
Parameters: - user – optional user name to pass to qstat to filter queue with
- partiton – optional partition to filter the queue with
Yields: tuple –
- job_id, name, userid, partition, state, nodelist, numnodes,
ntpernode, exit_code
numcpus is currently always 1 as most torque queues treat every core as a node.
-
fyrd.queue.
slurm_queue_parser
(user=None, partition=None)[source]¶ Iterator for slurm queues.
Use the squeue -O command to get standard data across implementation, supplement this data with the results of sacct. sacct returns data only for the current user but retains a much longer job history. Only jobs not returned by squeue are added with sacct, and they are added to the end of the returned queue, i.e. out of order with respect to the actual queue.
Parameters: - user – optional user name to filter queue with
- partition – optional partition to filter queue with
Yields: tuple –
- job_id, name, userid, partition, state, nodelist, numnodes,
ntpernode, exit_code
fyrd.job¶
Job management is handled by the Job() class. This is a very large class that defines all the methods required to build and submit a job to the cluster.
It accepts keyword arguments defined in fyrd.options on initialization, which are then fleshed out using profile information from the config files defined by fyrd.conf.
The primary argument on initialization is the function or script to submit.
Examples:
Job('ls -lah | grep myfile')
Job(print, ('hi',))
Job('echo hostname', profile='tiny')
Job(huge_function, args=(1,2) kwargs={'hi': 'there'},
profile='long', cores=28, mem='200GB')
fyrd.job.Job¶
-
class
fyrd.
Job
(command, args=None, kwargs=None, name=None, qtype=None, profile=None, **kwds)[source]¶ Bases:
object
Information about a single job on the cluster.
Holds information about submit time, number of cores, the job script, and more.
Below are the core attributes and methods required to use this class.
-
out
¶ str – The output of the function or a copy of stdout for a script
-
stdout
¶ str – Any output to STDOUT
-
stderr
¶ str – Any output to STDERR
-
exitcode
¶ int – The exitcode of the running processes (the script runner if the Job is a function.
-
start
¶ datetime – A datetime object containing time execution started on the remote node.
-
end
¶ datetime – Like start but when execution ended.
-
runtime
¶ timedelta – A timedelta object containing runtime.
-
files
¶ list – A list of script files associated with this class
-
done
¶ bool – True if the job has completed
-
submit
()¶ submit the job if it is ready
-
wait
()¶ block until the job is done
-
get
()¶ block until the job is done and then return the output (stdout if job is a script), by default saves all outputs to self (i.e. .out, .stdout, .stderr) and deletes all intermediate files before returning. If save argument is False, does not delete the output files by default.
-
clean
()¶ delete any files created by this object
Printing or reproducing the class will display detailed job information.
Both wait() and get() will update the queue every few seconds (defined by the queue_update item in the config) and add queue information to the job as they go.
If the job disappears from the queue with no information, it will be listed as ‘completed’.
All jobs have a .submission attribute, which is a Script object containing the submission script for the job and the file name, plus a ‘written’ bool that checks if the file exists.
In addition, SLURM jobs have a .exec_script attribute, which is a Script object containing the shell command to _run. This difference is due to the fact that some SLURM systems execute multiple lines of the submission file at the same time.
Finally, if the job command is a function, this object will also contain a .function attribute, which contains the script to run the function.
Initialization function arguments.
Parameters: - command (function/str) – The command or function to execute.
- args (tuple/dict) – Optional arguments to add to command, particularly useful for functions.
- kwargs (dict) – Optional keyword arguments to pass to the command, only used for functions.
- name (str) – Optional name of the job. If not defined, guessed. If a job of the same name is already queued, an integer job number (not the queue number) will be added, ie. <name>.1
- qtype (str) – Override the default queue type
- profile (str) – The name of a profile saved in the conf
- other keywords are parsed into cluster keywords by the (*All) –
- system. For available keywords see `fyrd.option_help()`* (options) –
-
Methods¶
-
Job.
write
(overwrite=True)[source]¶ Write all scripts.
Parameters: overwrite (bool) – Overwrite existing files, defaults to True.
-
Job.
clean
(delete_outputs=None, get_outputs=True)[source]¶ Delete all scripts created by this module, if they were written.
Parameters: - delete_outputs (bool) – also delete all output and err files, but get their contents first.
- get_outputs (bool) – if delete_outputs, save outputs before deleting.
-
Job.
submit
(wait_on_max_queue=True)[source]¶ Submit this job.
Parameters: wait_on_max_queue (str) – Block until queue limit is below the maximum before submitting. To disable max_queue_len, set it to 0. None will allow override by the default settings in the config file, and any positive integer will be interpretted to be the maximum queue length.
Returns: self
-
Job.
get
(save=True, cleanup=None, delete_outfiles=None, del_no_save=None)[source]¶ Block until job completed and return output of script/function.
By default saves all outputs to this class and deletes all intermediate files.
Parameters: - save (bool) – Save all outputs to the class also (advised)
- cleanup (bool) – Clean all intermediate files after job completes.
- delete_outfiles (bool) – Clean output files after job completes.
- del_no_save (bool) – Delete output files even if save is False
Returns: Function output if Function, else STDOUT
Return type: str
-
Job.
get_output
(save=True, delete_file=None, update=True)[source]¶ Get output of function or script.
This is the same as stdout for a script, or the function output for a function.
By default, output file is kept unless delete_file is True or self.auto_delete is True.
Parameters: - save (bool) – Save the output to self.out, default True. Would be a good idea to set to False if the output is huge.
- delete_file (bool) – Delete the output file when getting
- update (bool) – Update job info from queue first.
Returns: The output of the script or function. Always a string if script.
-
Job.
get_stdout
(save=True, delete_file=None, update=True)[source]¶ Get stdout of function or script, same for both.
By default, output file is kept unless delete_file is True or self.auto_delete is True.
Parameters: - save (bool) – Save the output to self.stdout, default True. Would be a good idea to set to False if the output is huge.
- delete_file (bool) – Delete the stdout file when getting
- update (bool) – Update job info from queue first.
Returns: - The contents of STDOUT, with runtime info and trailing
newline removed.
Return type: str
Also sets self.start and self.end from the contents of STDOUT if possible.
-
Job.
get_stderr
(save=True, delete_file=None, update=True)[source]¶ Get stderr of function or script, same for both.
By default, output file is kept unless delete_file is True or self.auto_delete is True.
Parameters: - save (bool) – Save the output to self.stdout, default True. Would be a good idea to set to False if the output is huge.
- delete_file (bool) – Delete the stdout file when getting
- update (bool) – Update job info from queue first.
Returns: The contents of STDERR, with trailing newline removed.
Return type: str
-
Job.
get_times
(update=True)[source]¶ Get stdout of function or script, same for both.
Parameters: update (bool) – Update job info from queue first. Returns: start, end as two datetime objects. Return type: tuple Also sets self.start and self.end from the contents of STDOUT if possible.
-
Job.
get_exitcode
(update=True)[source]¶ Try to get the exitcode.
Parameters: update (bool) – Update job info from queue first. Returns: The exitcode of the running process. Return type: int
-
Job.
fetch_outputs
(save=True, delete_files=None)[source]¶ Save all outputs in their current state. No return value.
This method does not wait for job completion, but merely gets the outputs. To wait for job completion, use get() instead.
Parameters: - save (bool) – Save all outputs to the class also (advised)
- delete_files (bool) – Delete the output files when getting, only used if save is True
fyrd.submission_scripts¶
This module defines to classes that are used to build the actual jobs for submission, including writing the files. Function is actually a child class of Script.
-
class
fyrd.submission_scripts.
Script
(file_name, script)[source]¶ Bases:
object
A script string plus a file name.
Initialize the script and file name.
-
class
fyrd.submission_scripts.
Function
(file_name, function, args=None, kwargs=None, imports=None, pickle_file=None, outfile=None)[source]¶ Bases:
fyrd.submission_scripts.Script
A special Script used to run a function.
Create a function wrapper.
NOTE: Function submission will fail if the parent file’s code is not wrapped in an if __main__ wrapper.
Parameters: - file_name (str) – A root name to the outfiles
- function (callable) – Function handle.
- args (tuple) – Arguments to the function as a tuple.
- kwargs (dict) – Named keyword arguments to pass in the function call
- imports (list) – A list of imports, if not provided, defaults to all current imports, which may not work if you use complex imports. The list can include the import call, or just be a name, e.g [‘from os import path’, ‘sys’]
- pickle_file (str) – The file to hold the function.
- outfile (str) – The file to hold the output.
fyrd.options¶
All keyword arguments are defined in dictionaries in the options.py file, alongside function to manage those dictionaries. Of particular importance is option_help(), which can display all of the keyword arguments as a string or a table. check_arguments() checks a dictionary to make sure that the arguments are allowed (i.e. defined), it is called on all keyword arguments in the package.
To see keywords, run fyrd keywords from the console or fyrd.option_help() from a python session.
The way that option handling works in general, is that all hard-coded keyword arguments must contain a dictionary entry for ‘torque’ and ‘slurm’, as well as a type declaration. If the type is NoneType, then the option is assumed to be a boolean option. If it has a type though, check_argument() attempts to cast the type and specific idiosyncrasies are handled in this step, e.g. memory is converted into an integer of MB. Once the arguments are sanitized format() is called on the string held in either the ‘torque’ or the ‘slurm’ values, and the formatted string is then used as an option. If the type is a list/tuple, the ‘sjoin’ and ‘tjoin’ dictionary keys must exist, and are used to handle joining.
The following two functions are used to manage this formatting step.
option_to_string() will take an option/value pair and return an appropriate string that can be used in the current queue mode. If the option is not implemented in the current mode, a debug message is printed to the console and an empty string is returned.
options_to_string() is a wrapper around option_to_string() and can handle a whole dictionary of arguments, it explicitly handle arguments that cannot be managed using a simple string format.
-
fyrd.options.
option_help
(mode='string', qtype=None, tablefmt='simple')[source]¶ Print a sting to stdout displaying information on all options.
Parameters: - mode (str) – string: Return a formatted string print: Print the string to stdout list: Return a simple list of keywords table: Return a table of lists merged_table: Combine all keywords into a single table
- qtype (str) – If provided only return info on that queue type.
- tablefmt (str) –
A tabulate-style table format, one of:
'plain', 'simple', 'grid', 'pipe', 'orgtbl', 'rst', 'mediawiki', 'latex', 'latex_booktabs'
Returns: A formatted string
Return type: str
-
fyrd.options.
split_keywords
(kwargs)[source]¶ Split a dictionary of keyword arguments into two dictionaries.
The first dictionary will contain valid arguments for fyrd, the second will contain all others.
Returns: (dict, dict) — valid args for fyrd, other args Return type: tuple
-
fyrd.options.
check_arguments
(kwargs)[source]¶ Make sure all keywords are allowed.
Raises OptionsError on error, returns sanitized dictionary on success.
- Note: Checks in SYNONYMS if argument is not recognized, raises OptionsError
- if it is not found there either.
-
fyrd.options.
options_to_string
(option_dict, qtype=None)[source]¶ Return a multi-line string for slurm or torque job submission.
Parameters: - option_dict (dict) – Dict in format {option: value} where value can be None. If value is None, default used.
- qtype (str) – ‘torque’, ‘slurm’, or ‘local’: override queue.MODE
Returns: A multi-line string of torque or slurm options.
Return type: str
-
fyrd.options.
option_to_string
(option, value=None, qtype=None)[source]¶ Return a string with an appropriate flag for slurm or torque.
Parameters: - option – An allowed option definied in options.all_options
- value – A value for that option if required (if None, default used)
- qtype – ‘torque’, ‘slurm’, or ‘local’: override queue.MODE
Returns: A string with the appropriate flags for the active queue.
Return type: str
fyrd.conf¶
fyrd.conf handles the config (~/.fyrd/config.txt) file and the profiles (~/.fyrd/profiles.txt) file.
Profiles are combinations of keyword arguments that can be called in any of the submission functions. Both the config and profiles are just ConfigParser objects, conf.py merely adds an abstraction layer on top of this to maintain the integrity of the files.
config¶
The config has three sections (and no defaults):
- queue — sets options for handling the queue
- jobs — sets options for submitting jobs
- jobqueue — local option handling, will be removed in the future
For a complete reference, see the config documentation : Configuration
Options can be managed with the get_option() and set_option() functions, but it is actually easier to use the console script:
fyrd conf list
fyrd conf edit max_jobs 3000
-
fyrd.conf.
get_option
(section=None, key=None, default=None)[source]¶ Get a single key or section.
All args are optional, if they are missing, the parent section or entire config will be returned.
Parameters: - section (str) – The config section to use (e.g. queue), if None, all sections returned.
- key (str) – The config key to get (e.g. ‘max_jobs’), if None, whole section returned.
- default – If the key does not exist, create it with this default value.
Returns: Option value if key exists, None if no key exists.
-
fyrd.conf.
set_option
(section, key, value)[source]¶ Write a config key to the config file.
Parameters: - section (str) – Section of the config file to use.
- key (str) – Key to add.
- value – Value to add for key.
Returns: ConfigParser
-
fyrd.conf.
delete
(section, key)[source]¶ Delete a config item.
Parameters: - section (str) – Section of config file.
- key (str) – Key to delete
Returns: ConfigParger
-
fyrd.conf.
load_config
()[source]¶ Load config from the config file.
If any section or key from DEFAULTS is not present in the config, it is added back, enforcing a minimal configuration.
Returns: Config options. Return type: ConfigParser
-
fyrd.conf.
create_config
(cnf=None, def_queue=None)[source]¶ Create an initial config file.
Gets all information from the file-wide DEFAULTS constant and overwrites specific keys using the values in cnf.
This means that any records in the cnf dict that are not present in DEFAULTS will be ignored, and any records that are absent will be populated from DEFAULTS.
Parameters: - cnf (dict) – A dictionary of config defaults.
- def_queue (str) – A name for a queue to add to the default profile.
profiles¶
Profiles are wrapped in a Profile() class to make attribute access easy, but they are fundamentally just dictionaries of keyword arguments. They can be created with cluster.conf.Profile(name, {keywds}) and then written to a file with the write() method.
The easiest way to interact with profiles is not with class but with the get_profile(), set_profile(), and del_profile() functions. These make it very easy to go from a dictionary of keywords to a profile.
Profiles can then be called with the profile= keyword in any submission function or Job class.
As with the config, profile management is the easiest and most stable when using the console script:
fyrd profile list
fyrd profile add very_long walltime:120:00:00
fyrd profile edit default partition:normal cores:4 mem:10GB
fyrd profile delete small
fyrd.conf.Profile¶
-
class
fyrd.conf.
Profile
(name, kwds)[source]¶ Bases:
object
A job submission profile. Just a thin wrapper around a dict.
Set up bare minimum attributes.
Parameters: - name (str) – Name of the profile
- kwds (dict) – Dictionary of keyword arguments (will be validated).
fyrd.helpers¶
The helpers are all high level functions that are not required for the library but make difficult jobs easy to assist in the goal of trivially easy cluster submission.
The functions in fyrd.basic below are different in that they provide simple job submission and management, while the functions in fyrd.helpers allow the submission of many jobs.
-
fyrd.helpers.
parapply
(jobs, df, func, args=(), profile=None, applymap=False, merge_axis=0, merge_apply=False, name='parapply', imports=None, **kwds)[source]¶ Split a dataframe, run apply in parallel, return result.
This function will split a dataframe into however many pieces are requested with the jobs argument, run apply in parallel by submitting the jobs to the cluster, and then recombine the outputs.
If the ‘clean_files’ and ‘clean_outputs’ arguments are not passed, we delete all intermediate files and output files by default.
This function will take any keyword arguments accepted by Job, which can be found by running fyrd.options.option_help(). It also accepts any of the keywords accepted by by pandas.DataFrame.apply(), found here
Parameters: - jobs (int) – Number of pieces to split the dataframe into
- df (DataFrame) – Any pandas DataFrame
- args (tuple) – Positional arguments to pass to the function, keyword arguments can just be passed directly.
- profile (str) – A fyrd cluster profile to use
- applymap (bool) – Run applymap() instead of apply()
- merge_axis (int) – Which axis to merge on, 0 or 1, default is 1 as apply transposes columns
- merge_apply (bool) – Apply the function on the merged dataframe also
- name (str) – A prefix name for all of the jobs
- imports (list) – A list of imports in any format, e.g. [‘import numpy’, ‘scipy’, ‘from numpy import mean’]
- keyword arguments recognized by fyrd will be used for job (Any) –
- submission. –
- keyword arguments will be passed to DataFrame.apply()* (*Additional) –
Returns: A recombined DataFrame
Return type: DataFrame
-
fyrd.helpers.
parapply_summary
(jobs, df, func, args=(), profile=None, applymap=False, name='parapply', imports=None, **kwds)[source]¶ Run parapply for a function with summary stats.
Instead of returning the concatenated result, merge the result using the same function as was used during apply.
This works best for summary functions like .mean(), which do a linear operation on a whole dataframe or series.
Parameters: - jobs (int) – Number of pieces to split the dataframe into
- df (DataFrame) – Any pandas DataFrame
- args (tuple) – Positional arguments to pass to the function, keyword arguments can just be passed directly.
- profile (str) – A fyrd cluster profile to use
- applymap (bool) – Run applymap() instead of apply()
- merge_axis (int) – Which axis to merge on, 0 or 1, default is 1 as apply transposes columns
- merge_apply (bool) – Apply the function on the merged dataframe also
- name (str) – A prefix name for all of the jobs
- imports (list) – A list of imports in any format, e.g. [‘import numpy’, ‘scipy’, ‘from numpy import mean’]
- keyword arguments recognized by fyrd will be used for job (Any) –
- submission. –
- keyword arguments will be passed to DataFrame.apply()* (*Additional) –
Returns: A recombined DataFrame
Return type: DataFrame
-
fyrd.helpers.
split_file
(infile, parts, outpath='', keep_header=True)[source]¶ Split a file in parts and return a list of paths.
NOTE: Linux specific (uses wc).
Parameters: - outpath – The directory to save the split files.
- keep_header – Add the header line to the top of every file.
Returns: Paths to split files.
Return type: list
fyrd.basic¶
This module holds high level functions to make job submission easy, allowing the user to skip multiple steps and to avoid using the Job class directly.
submit(), make_job(), and make_job_file() all create Job objects in the background and allow users to submit jobs. All of these functions accept the exact same arguments as the Job class does, and all of them return a Job object.
submit_file() is different, it simply submits a pre-formed job file, either one that has been written by this software or by any other method. The function makes no attempt to fix arguments to allow submission on multiple clusters, it just submits the file.
clean() takes a list of job objects and runs the clean() method on all of them, clean_dir() uses known directory and suffix information to clean out all job files from any directory.
-
fyrd.basic.
submit
()[source]¶ Submit a script to the cluster.
Parameters: - command (function/str) – The command or function to execute.
- args (tuple/dict) – Optional arguments to add to command, particularly useful for functions.
- kwargs (dict) – Optional keyword arguments to pass to the command, only used for functions.
- name (str) – Optional name of the job. If not defined, guessed. If a job of the same name is already queued, an integer job number (not the queue number) will be added, ie. <name>.1
- qtype (str) – Override the default queue type
- profile (str) – The name of a profile saved in the conf
- other keywords are parsed into cluster keywords by the (*All) –
- system. For available keywords see `fyrd.option_help()`* (options) –
Returns: Job object
-
fyrd.basic.
make_job
()[source]¶ Make a job file compatible with the chosen cluster.
If mode is local, this is just a simple shell script.
Parameters: - command (function/str) – The command or function to execute.
- args (tuple/dict) – Optional arguments to add to command, particularly useful for functions.
- kwargs (dict) – Optional keyword arguments to pass to the command, only used for functions.
- name (str) – Optional name of the job. If not defined, guessed. If a job of the same name is already queued, an integer job number (not the queue number) will be added, ie. <name>.1
- qtype (str) – Override the default queue type
- profile (str) – The name of a profile saved in the conf
- other keywords are parsed into cluster keywords by the (*All) –
- system. For available keywords see `fyrd.option_help()`* (options) –
Returns: Job object
-
fyrd.basic.
make_job_file
()[source]¶ Make a job file compatible with the chosen cluster.
Parameters: - command (function/str) – The command or function to execute.
- args (tuple/dict) – Optional arguments to add to command, particularly useful for functions.
- kwargs (dict) – Optional keyword arguments to pass to the command, only used for functions.
- name (str) – Optional name of the job. If not defined, guessed. If a job of the same name is already queued, an integer job number (not the queue number) will be added, ie. <name>.1
- qtype (str) – Override the default queue type
- profile (str) – The name of a profile saved in the conf
- other keywords are parsed into cluster keywords by the (*All) –
- system. For available keywords see `fyrd.option_help()`* (options) –
Returns: Job object
-
fyrd.basic.
submit_file
()[source]¶ Submit a job file to the cluster.
If qtype or queue.MODE is torque, qsub is used; if it is slurm, sbatch is used; if it is local, the file is executed with subprocess.
This function is independent of the Job object and just submits a file.
Parameters: - dependencies – A job number or list of job numbers. In slurm: –dependency=afterok: is used For torque: -W depend=afterok: is used
- threads – Total number of threads to use at a time, defaults to all. ONLY USED IN LOCAL MODE
Returns: job number for torque or slurm multiprocessing job object for local mode
-
fyrd.basic.
clean_dir
()[source]¶ Delete all files made by this module in directory.
- CAUTION: The clean() function will delete EVERY file with
- extensions matching those these::
- .<suffix>.err .<suffix>.out .<suffix>.out.func.pickle .<suffix>.sbatch & .<suffix>.script for slurm mode .<suffix>.qsub for torque mode .<suffix> for local mode _func.<suffix>.py _func.<suffix>.py.pickle.in _func.<suffix>.py.pickle.out
Parameters: - directory (str) – The directory to run in, defaults to the current directory.
- suffix (str) – Override the default suffix.
- qtype (str) – Only run on files of this qtype
- confirm (bool) – Ask the user before deleting the files
- delete_outputs (bool) – Delete all output files too.
Returns: A set of deleted files
fyrd.local¶
The local queue implementation is based on the multiprocessing library and is not intended to be used directly, it should always be used via the Job class because it is somewhat temperamental. The essential idea behind it is that we can have one JobQueue class that is bound to the parent process, it exclusively manages a single child thread that runs the job_runner() function. The two process communicate using a multiprocessing.Queue object, and pass fyrd.local.Job objects back and forth between them.
The Job objects (different from the Job objects in job.py) contain information about the task to run, including the number of cores required. The job runner manages a pool of multiprocessing.Pool tasks directly, and keeps the total running cores below the total allowed (default is the system max, can be set with the threads keyword). It backfills smaller jobs and holds on to larger jobs until there is enough space free.
This is close to what torque and slurm do, but vastly more crude. It serves as a stopgap to allow parallel software written for compute clusters to run on a single machine in a similar fashion, without the need for a pipeline alteration. The reason I have reimplemented a process pool is that I need dependency tracking and I need to allow some processes to run on multiple cores (e.g. 6 of the available 24 on the machine).
The job_runner() and Job objects should never be accessed except by the JobQueue. Only one JobQueue should run at a time (not enforced), and by default it is bound to fyrd.local.JQUEUE. That is the interface used by all other parts of this package.
fyrd.local.JobQueue¶
-
class
fyrd.local.
JobQueue
(cores=None)[source]¶ Bases:
object
Monitor and submit multiprocessing.Pool jobs with dependencies.
Spawn a job_runner process to interact with.
-
add
(function, args=None, kwargs=None, dependencies=None, cores=1)[source]¶ Add function to local job queue.
Parameters: - function – A function object. To run a command, use the run.cmd function here.
- args – A tuple of args to submit to the function.
- kwargs – A dict of keyword arguments to submit to the function.
- dependencies – A list of job IDs that this job will depend on.
- cores – The number of threads required by this job.
Returns: A job ID
Return type: int
-
fyrd.local.job_runner¶
-
fyrd.local.
job_runner
(jobqueue, outputs, cores=None, jobno=None)[source]¶ Run jobs with dependency tracking.
Must be run as a separate multiprocessing.Process to function correctly.
Parameters: - jobqueue – A multiprocessing.Queue object into which Job objects must be added. The function continually searches this Queue for new jobs. Note, function must be a function call, it cannot be anything else. function is the only required argument, the rest are optional. tuples are required.
- outputs – A multiprocessing.Queue object that will take outputs. A dictionary of job objects will be output here with the format:: {job_no => Job} NOTE: function return must be picklable otherwise this will raise an exception when it is put into the Queue object.
- cores – Number of cores to use in the multiprocessing pool. Defaults to all.
- jobno – What number to start counting jobs from, default 1.
fyrd.run¶
Some other wrapper functions are defined in run.py, these are just little useful knick-knacks that make function submission and queue management possible on both python 2 and python 3.
-
fyrd.run.
cmd
(command, args=None, stdout=None, stderr=None, tries=1)[source]¶ Run command and return status, output, stderr.
Parameters: - command (str) – Path to executable.
- args (tuple) – Tuple of arguments.
- stdout (str) – File or open file like object to write STDOUT to.
- stderr (str) – File or open file like object to write STDERR to.
- tries (int) – Number of times to try to execute. 1+
Returns: exit_code, STDOUT, STDERR
Return type: tuple
-
fyrd.run.
which
(program)[source]¶ Replicate the UNIX which command.
- Taken verbatim from:
- stackoverflow.com/questions/377017/test-if-executable-exists-in-python
Parameters: program – Name of executable to test. Returns: Path to the program or None on failure.
-
fyrd.run.
get_input
(message, valid_answers=None)[source]¶ Get input from the command line and check answers.
Allows input to work with python 2/3
Parameters: - message (str) – A message to print, an additional space will be added.
- valid_answers (list) – A list of answers to accept, if None, ignored. Case insensitive.
Returns: The response
Return type: str
-
fyrd.run.
indent
(string, prefix=' ')[source]¶ Replicate python3’s textwrap.indent for python2.
Parameters: - string (str) – Any string.
- prefix (str) – What to indent with.
Returns: Indented string
Return type: str
-
fyrd.run.
open_zipped
(infile, mode='r')[source]¶ Open a regular, gzipped, or bz2 file.
If infile is a file handle or text device, it is returned without changes.
Returns: text mode file handle.
fyrd.logme¶
This is a package I wrote myself and keep using because I like it. It provides syslog style leveled logging (e.g. ‘debug’->’info’->’warn’->’error’->’critical’) and it implements colors and timestamped messages.
The minimum print level can be set module wide at runtime by changing cluster.logme.MIN_LEVEL.
-
fyrd.logme.
log
(message, level='info', logfile=None, also_write=None, min_level=None, kind=None)[source]¶ Print a string to logfile.
Parameters: - message – The message to print.
- logfile – Optional file to log to, defaults to STDERR. Can provide a logging object
- level –
‘debug’|’info’|’warn’|’error’|’normal’ Will only print if level > MIN_LEVEL
‘verbose’: ‘<timestamp> VERBOSE –> ‘ ‘debug’: ‘<timestamp> DEBUG –> ‘ ‘info’: ‘<timestamp> INFO –> ‘ ‘warn’: ‘<timestamp> WARNING –> ‘ ‘error’: ‘<timestamp> ERROR –> ‘ ‘critical’: ‘<timestamp> CRITICAL –> ‘ - also_write – ‘stdout’: print to STDOUT also. ‘stderr’: print to STDERR also. These only have an effect if the output is not already set to the same device.
- min_level – Retained for backwards compatibility, min_level should be set using the logme.MIN_LEVEL constant.
- kind – synonym for level, kept to retain backwards compatibility
Logging with timestamps and optional log files.
Print a timestamped message to a logfile, STDERR, or STDOUT.
If STDERR or STDOUT are used, colored flags are added. Colored flags are INFO, WARNINING, ERROR, or CRITICAL.
It is possible to write to both logfile and STDOUT/STDERR using the also_write argument.
If level is ‘error’ or ‘critical’, error is written to STDERR unless also_write == -1
MIN_LEVEL can also be provided, logs will only print if vlevel > MIN_LEVEL. Level order: critical>error>warn>info>debug>verbose
Usage:
import logme as lm
lm.log("Screw up!", <outfile>,
level='debug'|'info'|'warn'|'error'|'normal',
also_write='stderr'|'stdout')
Example:
lm.log('Hi')
Prints: 20160223 11:46:24.969 | INFO --> Hi
lm.log('Hi', level='debug')
Prints nothing
lm.MIN_LEVEL = 'debug'
lm.log('Hi', level='debug')
Prints: 20160223 11:46:24.969 | DEBUG --> Hi
Note: Uses terminal colors and STDERR, not compatible with non-unix systems
-
fyrd.logme.
log
(message, level='info', logfile=None, also_write=None, min_level=None, kind=None)[source] Print a string to logfile.
Parameters: - message – The message to print.
- logfile – Optional file to log to, defaults to STDERR. Can provide a logging object
- level –
‘debug’|’info’|’warn’|’error’|’normal’ Will only print if level > MIN_LEVEL
‘verbose’: ‘<timestamp> VERBOSE –> ‘ ‘debug’: ‘<timestamp> DEBUG –> ‘ ‘info’: ‘<timestamp> INFO –> ‘ ‘warn’: ‘<timestamp> WARNING –> ‘ ‘error’: ‘<timestamp> ERROR –> ‘ ‘critical’: ‘<timestamp> CRITICAL –> ‘ - also_write – ‘stdout’: print to STDOUT also. ‘stderr’: print to STDERR also. These only have an effect if the output is not already set to the same device.
- min_level – Retained for backwards compatibility, min_level should be set using the logme.MIN_LEVEL constant.
- kind – synonym for level, kept to retain backwards compatibility