# -*- coding: utf-8 -*-
"""
Monitor the queue for torque or slurm.
Provides a class to monitor the all defined batch queues with identical
syntax.
At its simplest, you can use it like::
q = queue.Queue()
q.jobs
q.running
q.pending
q.complete
All of the above commands return a dictionary of:
{job_no: Queue.QueueJob}
Queue.QueueJob classes include information on job state, owner, queue, nodes,
threads, exitcode, etc.
Queue also defines a wait() method that takes a list of job numbers, job.Job()
objects, or JobQueue.Job objects and blocks until those jobs to complete
The default cluster environment is also defined in this file as MODE, it can be
set directly or with the get_cluster_environment() function definied here.
"""
import sys as _sys # Used to get TB info
import pwd as _pwd # Used to get usernames for queue
import getpass as _getpass # Used to get usernames for queue
import traceback as _tb # Used to mail TB info
from datetime import datetime as _dt
from time import time as _time
from time import sleep as _sleep
from six import reraise as _reraise
from six import text_type as _txt
from six import string_types as _str
from six import integer_types as _int
###############################################################################
# Our functions #
###############################################################################
from . import run as _run
from . import logme as _logme
from . import conf as _conf
from . import ClusterError as _ClusterError
from . import batch_systems as _batch
from . import notify as _notify
# Funtions to import if requested
__all__ = ['Queue']
GOOD_STATES = _batch.GOOD_STATES
ACTIVE_STATES = _batch.ACTIVE_STATES
BAD_STATES = _batch.BAD_STATES
UNCERTAIN_STATES = _batch.UNCERTAIN_STATES
ALL_STATES = _batch.ALL_STATES
DONE_STATES = _batch.DONE_STATES
###############################################################################
# The Queue Class #
###############################################################################
[docs]class Queue(object):
"""A wrapper for all defined batch systems.
Attributes
----------
jobs : dict
A dictionary of all jobs in this queue in the form:
`{jobid: Queue.QueueJob}`
finished : dict
A dictionary of all completed jobs, same format as jobs
bad : dict
A dictionary of all jobs with failed or unknown states, same format as
jobs
active_job_count : int
Total jobs in the queue (including array job children)
max_jobs : int
The maximum number of jobs allowed in the queue
can_submit : bool
True if active_job_count < max_jobs, False otherwise
job_states : list
A list of the different states of jobs in this queue
active_job_count : int
A count of all jobs that are either pending or running in the current
queue
can_submit : bool
True if total active jobs is less than max_jobs
users : set
A set of all users with active jobs
job_states : set
A set of all current job states
Methods
-------
wait(jobs, return_disp=False)
Block until all jobs in jobs are complete.
get(jobs)
Get all results from a bunch of Job objects.
wait_to_submit(max_jobs=None)
Block until fewer running/pending jobs in queue than max_jobs.
update()
Refresh the list of jobs from the server.
get_jobs(key)
Return a dict of jobs where state matches key.
get_user_jobs(users)
Return a dict of jobs for all all jobs by each user in users.
"""
def __init__(self, user=None, partition=None, qtype=None):
"""Can filter by user, queue type or partition on initialization.
Parameters
----------
user : str
Optional usernameto filter the queue with. If user='self' or
'current', the current user will be used.
partition : str
Optional partition to filter the queue with.
qtype : str
one of the defined batch queues (e.g. 'slurm')
"""
# Get user ID as an int UID
if user:
if user == 'self' or user == 'current':
self.user = _getpass.getuser()
"""The username if defined."""
self.uid = _pwd.getpwnam(self.user).pw_uid
elif user == 'ALL':
self.user = None
else:
if isinstance(user, int) \
or (isinstance(user, (_str, _txt)) and user.isdigit()):
self.uid = int(user)
else:
self.uid = _pwd.getpwnam(str(user)).pw_uid
else:
self.uid = None
self.user = _pwd.getpwuid(self.uid).pw_name if self.uid else None
self.partition = partition
self.max_jobs = int(_conf.get_option('queue', 'max_jobs'))
# Don't allow max jobs to be less than 5, otherwise basic split jobs
# permanently hang
if self.max_jobs < 4:
self.max_jobs = 4
# Support python2, which hates reciprocal import
from .job import Job
self._Job = Job
# Get sleep time and update time
self.queue_update_time = float(
_conf.get_option('queue', 'queue_update', 2)
)
self.sleep_len = float(_conf.get_option('queue', 'sleep_len', 0.5))
# Set type
if qtype:
_batch.check_queue(qtype)
else:
_batch.check_queue()
self.qtype = qtype if qtype else _batch.MODE
self.batch_system = _batch.get_batch_system(self.qtype)
# Allow tracking of updates to prevent too many updates
self._updating = False
# Will contain a dict of QueueJob objects indexed by ID
self.jobs = {}
self.last_update = None
####################
# Public Methods #
####################
[docs] def check_dependencies(self, dependencies):
"""Check if dependencies are running.
Parameters
----------
dependencies : list
List of job IDs
Returns
-------
str
'active' if dependencies are running or queued, 'good' if
completed, 'bad' if failed, cancelled, or suspended, 'absent'
otherwise.
"""
for dep in _run.listify(dependencies):
dep = str(dep)
if dep not in self.jobs:
return 'absent'
state = self.jobs[dep].state
if state in ACTIVE_STATES:
return 'active'
elif state in GOOD_STATES:
continue
elif state in BAD_STATES or state in UNCERTAIN_STATES:
return 'bad'
else:
raise _ClusterError('Invalid state {0}'.format(state))
return 'good'
[docs] def wait(self, jobs, return_disp=False, notify=True):
"""Block until all jobs in jobs are complete.
Update time is dependant upon the queue_update parameter in your
~/.fyrd/config.txt file.
Parameters
----------
jobs : list
List of either fyrd.job.Job, fyrd.queue.QueueJob, job_id
return_disp : bool, optional
If a job disappeares from the queue, return 'disapeared' instead of
True
notify : str, True, or False, optional
If True, both notification address and wait_time must be set in
the [notify] section of the config. A notification email will be
sent if the time exceeds this time. This is the default.
If a string is passed, notification is forced and the string must
be the to address.
False means no notification
Returns
-------
bool or str
True on success False or None on failure unless return_disp is True
and the job disappeares, then returns 'disappeared'
"""
if notify is True:
notify_time = _conf.get_option('notify', 'wait_time', 120)
notify_addr = _conf.get_option('notify', 'notify_address')
elif isinstance(notify, str):
notify_addr = notify
notify_time = 0
start_time = _dt.now()
self.update()
_logme.log('Queue waiting.', 'debug')
# Sanitize arguments
jobs = _run.listify(jobs)
for job in jobs:
if not isinstance(job, (_str, _txt, _int, QueueJob, self._Job)):
raise _ClusterError('job must be int, string, or Job, ' +
'is {}'.format(type(job)))
check_jobs = []
for job in jobs:
if isinstance(job, (self._Job, QueueJob)):
job_id = job.id
else:
job_id = str(job)
check_jobs.append(job_id)
pbar = _run.get_pbar(jobs, name="Waiting for job completion",
unit='jobs')
dispo = 'Unknown'
msg = None
try:
while check_jobs:
self.update()
for job in check_jobs:
if isinstance(job, (self._Job, QueueJob)):
job_id = job.id
else:
job_id = str(job)
job_id, array_id = self.batch_system.normalize_job_id(job_id)
_logme.log('Checking {}'.format(job_id), 'debug')
lgd = False
# Allow 12 seconds to elapse before job is found in queue,
# if it is not in the queue by then, assume failure.
if not self.test_job_in_queue(job_id):
if return_disp:
dispo = 'disappeared'
break
else:
dispo = False
break
## Actually look for job in running/queued queues
lgd = False
lgd2 = False
start = _dt.now()
res_time = float(_conf.get_option('queue', 'res_time'))
count = 0
# Get job state
job_state = self.jobs[job_id].state
# Check the state
if job_state in GOOD_STATES:
_logme.log('Queue wait for {} complete'
.format(job_id), 'debug')
check_jobs.pop(check_jobs.index(job))
pbar.update()
dispo = True
break
elif job_state in ACTIVE_STATES:
if lgd:
_logme.log('{} not complete yet, waiting'
.format(job_id), 'debug')
lgd = True
else:
_logme.log('{} still not complete, waiting'
.format(job_id), 'verbose')
elif job_state in BAD_STATES:
msg = 'Job {} failed with state {}'.format(job, job_state)
_logme.log(msg, 'error')
dispo = False
break
elif job_state in UNCERTAIN_STATES:
if not lgd2:
_logme.log('Job {} in state {}, waiting {} '
.format(job, job_state, res_time) +
'seconds for resolution', 'warn')
lgd2 = True
if (_dt.now() - start).seconds > res_time:
msg = (
'Job {} still in state {} after {}s wait, aborting'
.format(job, job_state, res_time)
)
_logme.log(msg, 'error')
dispo = False
break
else:
if count == 5:
_logme.log('Job {} in unknown state {} '
.format(job, job_state) +
'cannot continue', 'critical')
raise QueueError('Unknown job state {}'
.format(job_state))
_logme.log('Job {} in unknown state {} '
.format(job, job_state) +
'trying to resolve', 'debug')
count += 1
if dispo in [False, 'disappeared']:
break
_sleep(self.sleep_len)
# Update jobs
for job in jobs:
if isinstance(job, self._Job):
job.update()
except:
dispo = _sys.exc_info()
if notify:
if (_dt.now()-start_time).total_seconds() >= notify_time:
job_count = len(jobs)
subject = 'Fyrd Wait for {} Jobs Completed'.format(job_count)
if dispo is True:
message = 'Jobs completed\n'
subject += ', Success'
elif isinstance(dispo, tuple):
message = 'Jobs {}\nTraceback:\n{}'.format(
repr(dispo[1]), _tb.format_tb(dispo[2])[0]
)
subject += ', {}'.format(repr(dispo[1]))
else:
message = 'Some or all jobs failed\n'
if msg:
message += 'Error message:\n{}\n\n'.format(msg)
subject += ', Some Failed'
message += 'Total jobs waited for: {}\n'.format(job_count)
_notify.notify(message, notify_addr, subject)
if isinstance(dispo, tuple):
_reraise(*dispo)
return dispo
[docs] def get(self, jobs):
"""Get all results from a bunch of Job objects.
Parameters
----------
jobs : list
List of fyrd.Job objects
Returns
-------
job_results : dict
`{job_id: Job}`
Raises
------
fyrd.ClusterError
If any job fails or goes missing.
"""
self.update()
_logme.log('Queue waiting.', 'debug')
singular = True if isinstance(jobs, self._Job) else False
# Force into enumerated list to preserve order
jobs = dict(enumerate(_run.listify(jobs)))
done = {}
# Check that all jobs are valid
for job in jobs.values():
if not isinstance(job, self._Job):
raise _ClusterError('This only works with cluster job '
'objects')
# Loop through all jobs continuously trying to get outputs
pbar = _run.get_pbar(jobs, name="Getting Job Results", unit='jobs')
while jobs:
for i in list(jobs):
job = jobs[i]
job.update()
if not self.test_job_in_queue(job.id):
raise _ClusterError('Job {} not queued'.format(job.id))
if job.state == 'completed':
done[i] = jobs.pop(i).get()
pbar.update()
elif job.state in BAD_STATES:
pbar.close()
raise _ClusterError('Job {} failed, cannot get output'
.format(job.id))
# Block between attempts
_sleep(self.sleep_len)
pbar.write('Done\n')
pbar.close()
# Correct the order, make it the same as the input list
results = []
for i in sorted(done.keys()):
results.append(done[i])
return results[0] if singular else results
[docs] def test_job_in_queue(self, job_id, array_id=None):
"""Check to make sure job is in self.
Tries 12 times with 1 second between each. If found returns True,
else False.
Parameters
----------
job_id : str
array_id : str, optional
Returns
-------
exists : bool
"""
lgd = False
not_found = 0
job_id = str(job_id)
if array_id is not None:
array_id = str(array_id)
if isinstance(job_id, _QueueJob):
job_id = job_id.id
while True:
self._update()
# Allow 12 seconds to elapse before job is found in queue,
# if it is not in the queue by then, assume completion.
if job_id in self.jobs:
job = self.jobs[job_id]
if array_id and array_id not in job.children:
return False
return True
else:
if lgd:
_logme.log('Attempt #{}/12'.format(not_found),
'debug')
else:
_logme.log('{} not in queue, waiting up to 12s '
.format(job_id) +
'for it to appear', 'info')
lgd = True
_sleep(self.sleep_len)
not_found += 1
if not_found == 12:
_logme.log(
'{} not in queue, tried 12 times over 12s'
.format(job_id) + '. Job likely completed, ' +
'assuming completion, stats will be ' +
'unavailable.','warn'
)
return False
continue
[docs] def wait_to_submit(self, max_jobs=None):
"""Block until fewer running/queued jobs in queue than max_jobs.
Parameters
----------
max_jobs : int
Override self.max_jobs for wait
"""
count = 50
written = False
while True:
if self._can_submit(max_jobs):
return
if not written:
_logme.log(('The queue is full, there are {} jobs running and '
'{} jobs queued. Will wait to submit, retrying '
'every {} seconds.')
.format(len(self.running), len(self.queued),
self.sleep_len),
'info')
written = True
if count == 0:
_logme.log('Still waiting to submit.', 'info')
count = 50
count -= 1
_sleep(self.sleep_len)
[docs] def update(self):
"""Refresh the list of jobs from the server, limit queries."""
if not self.last_update:
self._update()
elif int(_time()) - self.last_update > self.queue_update_time:
self._update()
else:
_logme.log('Skipping update as last update too recent', 'debug')
return self
[docs] def get_jobs(self, key):
"""Return a dict of jobs where state matches key."""
retjobs = {}
keys = [k.lower() for k in _run.listify(key)]
for jobid, job in self.jobs.items():
if job.get_state() in keys:
retjobs[jobid] = job
return retjobs
[docs] def get_user_jobs(self, users):
"""Filter jobs by user.
Parameters
----------
users : list
A list of users/owners
Returns
-------
dict
A filtered job dictionary of `{job_id: QueueJob}` for all jobs
owned by the queried users.
"""
users = _run.listify(users)
return {k: v for k, v in self.jobs.items() if v.owner in users}
@property
def users(self):
"""Return a set of users with jobs running."""
return set([job.owner for job in self.jobs.values()])
@property
def job_states(self):
"""Return a list of job states for all jobs in the queue."""
return [job.state for job in self.jobs]
@property
def finished(self):
"""Return a list of jobs that are neither queued nor running."""
return {i: j for i, j in self.jobs.items() \
if j.state not in ACTIVE_STATES}
@property
def bad(self):
"""Return a list of jobs that have bad or uncertain states."""
return {i: j for i, j in self.jobs.items() \
if j.state in BAD_STATES or j.state in UNCERTAIN_STATES}
@property
def active_job_count(self):
"""Return a count of all queued or running jobs, inc. array jobs."""
self.update()
jobcount = 0
for j in self.get_jobs(ACTIVE_STATES):
jobcount += j.jobcount()
return int(jobcount)
@property
def can_submit(self):
"""Return True if R/Q jobs are less than max_jobs."""
return self._can_submit()
def _can_submit(self, max_jobs=None):
""""Return True if R/Q jobs are less than max_jobs."""
self.update()
# Get max jobs
max_jobs = int(max_jobs) if max_jobs else self.max_jobs
if max_jobs < 4:
max_jobs = 4
# Fix self.max_jobs also
if self.max_jobs < 4:
self.max_jobs = 4
# Update running and queued jobs can't use self.active_job_count as
# rapid update breaks the property count and caused jobcount to be None
jobcount = 0
for j in self.get_jobs(ACTIVE_STATES).values():
jobcount += j.jobcount()
return jobcount < max_jobs
######################
# Internal Functions #
######################
def _update(self):
"""Refresh the list of jobs from the server.
This is the core queue interaction function of this class.
"""
if self._updating:
return
_logme.log('Queue updating', 'debug')
# Set the update time I don't care about microseconds
self.last_update = int(_time())
jobs = [] # list of jobs created this session
for [job_id, array_id, job_name, job_user, job_partition,
job_state, job_nodelist, job_nodecount,
job_cpus, job_exitcode] in self.batch_system.queue_parser(
self.user, self.partition):
job_id = str(job_id)
job_state = job_state.lower()
if job_nodecount and job_cpus:
job_threads = int(job_nodecount) * int(job_cpus)
else:
job_threads = None
if job_state == 'completed' or job_state == 'failed':
job_exitcode = job_exitcode
else:
job_exitcode = None
# Get/Create the QueueJob object
if job_id not in self.jobs:
job = QueueJob()
else:
job = self.jobs[job_id]
job.id = job_id
job.name = job_name
job.owner = job_user
job.queue = job_partition
job.state = job_state.lower()
if array_id is not None:
job.array_job = True
cjob = QueueChild(job)
cjob.id = array_id
cjob.name = job_name
cjob.owner = job_user
cjob.queue = job_partition
cjob.state = job_state.lower()
cjob.nodes = job_nodelist
cjob.threads = job_threads
cjob.exitcode = job_exitcode
job.children[array_id] = cjob
job.state = job.get_state()
job.nodes = job.get_nodelist()
job.threads = job.get_threads()
job.exitcode = job.get_exitcode()
else:
job.nodes = job_nodelist
job.threads = job_threads
job.exitcode = job_exitcode
# Assign the job to self.
self.jobs[job_id] = job
jobs.append(str(job_id))
# We assume that if a job just disappeared it completed
if self.jobs:
for qjob in self.jobs.values():
if str(qjob.id) not in jobs:
qjob.state = 'completed'
qjob.disappeared = True
def __getattr__(self, key):
"""Make running and queued attributes dynamic."""
key = self.batch_system.normalize_state(key.lower())
if key == 'complete':
key = 'completed'
elif key == 'queued':
key = 'pending'
if key in ALL_STATES:
return self.get_jobs(key)
def __getitem__(self, key):
"""Allow direct accessing of jobs by job id."""
if isinstance(key, self._Job):
key = key.id
key = str(key)
try:
return self.jobs[key]
except KeyError:
return None
def __iter__(self):
"""Allow us to be iterable."""
self.update()
return self
def __next__(self):
"""Loop through jobs."""
for jb in self.jobs.values():
return jb
def next(self):
"""Wrapper for __next__."""
return self.__next__
def __len__(self):
"""Length is the total job count."""
return len(self.jobs)
def __repr__(self):
"""For debugging."""
self.update()
self._updating = True
if self.user:
outstr = 'Queue<jobs:{};completed:{};pending:{};user={}>'.format(
len(self), len(self.complete), len(self.queued), self.user)
else:
outstr = 'Queue<jobs:{};completed:{};pending:{};user=ALL>'.format(
len(self), len(self.complete), len(self.queued))
self._updating = False
return outstr
def __str__(self):
"""A list of keys."""
self.update()
return str(list(self.jobs.keys()))
##############################################
# A simple class to hold jobs in the queue #
##############################################
class _QueueJob(object):
"""A very simple class to store info about jobs in the queue.
Attributes
----------
id : int
Job ID
name : str
Job name
children : dict
If array job, list of child job numbers
owner : str
User who owns the job
threads : int
Number of cores used by the job
queue : str
The queue/partition the job is running in
state : str
Current state of the job, normalized to slurm states
nodes : list
List of nodes job is running on
exitcode : int
Exit code of completed job
disappeared : bool
Job cannot be found in the queue anymore
"""
id = None
name = None
owner = None
threads = None
queue = None
state = None
nodes = None
exitcode = None
disappeared = False
array_job = False
children = {}
parent = None
_child_job = False
_cname = None
def get_state(self):
"""return the current state of the job."""
if self.array_job:
pending_jobs = False
running_jobs = False
failed_jobs = False
for job_info in self.children.values():
if job_info.state == 'pending':
pending_jobs = True
elif job_info.state in ACTIVE_STATES:
running_jobs = True
elif job_info.state in BAD_STATES:
failed_jobs = True
if running_jobs:
return 'running'
if pending_jobs:
return 'pending'
if failed_jobs:
return 'failed'
return 'completed'
return self.state
def get_nodelist(self):
"""return the current state of the job."""
if self.array_job:
nodelist = []
for job_info in self.children.values():
if job_info.nodes:
nodelist = nodelist + job_info.nodes
return nodelist if nodelist else None
return self.nodes
def get_threads(self, state=None):
"""Return a count of how many running jobs we have."""
states = [i.lower() for i in _run.listify(state)]
if self.array_job:
if state:
return sum([j.threads for j in self.children.values()
if j.state in states])
return len(self.children)
if state:
return self.threads if self.state in states else 0
return self.threads
def get_exitcode(self):
"""Return sum of exitcodes for all completed jobs."""
if self.array_job:
code = 0
some_done = False
for child in self.children.values():
if child.state in DONE_STATES:
some_done = True
if child.exitcode:
code += child.exitcode
if some_done:
return code
return None
return self.exitcode
def jobcount(self, state=None):
"""Return a count of how many running jobs we have."""
states = [i.lower() for i in _run.listify(state)]
if self.array_job:
if state:
return len([j for j in self.children.values()
if j.state in states])
return len(self.children)
if state:
return 1 if self.state in states else 0
return 1
def __repr__(self):
"""Show all info."""
if not self._child_job:
if self.array_job:
children = len(self.children) if self.children else None
child_str = ':children:{0}'.format(children)
else:
child_str = ''
else:
child_str = ':parent:{0}'.format(self.parent.id)
outstr = ("{cname}<{id}:{state}{child}" +
"({name},owner:{owner}," +
"queue:{queue},nodes:{nodes},threads:{threads}," +
"exitcode:{code})").format(
id=self.id, name=self.name, owner=self.owner,
queue=self.queue, nodes=self.nodes,
code=self.exitcode, threads=self.threads,
state=self.state, child=child_str, cname=self._cname
)
if self.disappeared:
outstr += 'DISAPPEARED>'
else:
outstr += '>'
return outstr
def __str__(self):
"""Print job ID."""
return str(self.id)
[docs]class QueueJob(_QueueJob):
"""A very simple class to store info about jobs in the queue.
Only used for torque and slurm queues.
Attributes
----------
id : int
Job ID
name : str
Job name
owner : str
User who owns the job
threads : int
Number of cores used by the job
queue : str
The queue/partition the job is running in
state : str
Current state of the job, normalized to slurm states
nodes : list
List of nodes job is running on
exitcode : int
Exit code of completed job
disappeared : bool
Job cannot be found in the queue anymore
array_job : bool
This job is an array job and has children
children : dict
If array job, list of child job numbers
"""
def __init__(self):
"""Initialize."""
self._cname = 'QueueJob'
self.children = {}
def __getitem__(self, key):
"""Allow direct accessing of child jobs by job id."""
key = str(key)
if not self.array_job:
_logme.log('Not an array job', 'error')
return
return self.children[key]
[docs]class QueueChild(_QueueJob):
"""A very simple class to store info about child jobs in the queue.
Only used for torque and slurm queues.
Attributes
----------
id : int
Job ID
name : str
Job name
owner : str
User who owns the job
threads : int
Number of cores used by the job
queue : str
The queue/partition the job is running in
state : str
Current state of the job, normalized to slurm states
nodes : list
List of nodes job is running on
exitcode : int
Exit code of completed job
disappeared : bool
Job cannot be found in the queue anymore
parent : QueueJob
Backref to parent job
"""
def __init__(self, parent):
"""Initialize with a parent."""
self._cname = 'QueueChild'
self.parent = parent
################
# Exceptions #
################
[docs]class QueueError(Exception):
"""Simple Exception wrapper."""
pass
#########################################
# A default Queue Object for the User #
#########################################
_default_queues = None
def default_queue(qtype=None):
"""Return a default batch system."""
global _default_queues
if not _default_queues:
_default_queues = {}
if qtype not in _default_queues:
_default_queues[qtype] = Queue('self', qtype=qtype)
return _default_queues[qtype]