# -*- coding: utf-8 -*-
"""
Manage job dependency tracking with multiprocessing.
Runs jobs with a multiprocessing.Pool, but manages dependency using an
additional Process that loops through all submitted jobs and checks
dependencies before running.
The JobQueue class works as the queue and functions in a similar, but much more
basic, way as torque or slurm. It manages jobs by forking an instance of the
job_runner function and keeping it alive. The multiprocessing.Queue class is
then used to pass job information contained in the Job class back and forth
between the JobQueue class running in the main process and the job_runner()
fork running as a separate thread.
The actual job management is done by job_runner() and uses
multiprocessing.Process objects and not the Pool object. This allows for more
careful management and it also allows exit codes to be captured.
"""
import os
import sys
import atexit
import signal
import multiprocessing as mp
from multiprocessing import cpu_count as _cnt
from subprocess import check_output, CalledProcessError
from time import sleep
from . import run
# Get defaults
from . import conf
# Get an Exception object to use
from . import ClusterError
# Get logging function
from . import logme
# A global placeholder for a single JobQueue instance
JQUEUE = None
__all__ = ['JobQueue']
################################
# Normal Mode Multithreading #
################################
THREADS = _cnt()
# Reset broken multithreading
# Some of the numpy C libraries can break multithreading, this command
# fixes the issue.
try:
check_output("taskset -p 0xff %d >/dev/null 2>/dev/null" % os.getpid(),
shell=True)
except CalledProcessError:
pass # This doesn't work on Macs or Windows
###############################################################################
# The JobQueue Class to Manage Jobs #
###############################################################################
[docs]class JobQueue(object):
"""Monitor and submit multiprocessing.Pool jobs with dependencies."""
def __init__(self, cores=None):
"""Spawn a job_runner process to interact with."""
self._jobqueue = mp.Queue()
self._outputs = mp.Queue()
self.jobno = int(conf.get_option('jobqueue', 'jobno', '1'))
self.cores = int(cores) if cores else THREADS
self.runner = mp.Process(target=job_runner,
args=(self._jobqueue,
self._outputs,
self.cores,
self.jobnumber),
name='Runner')
self.runner.start()
self.pid = self.runner.pid
assert self.runner.is_alive()
self.jobs = {}
def terminate():
"""Kill the queue runner."""
try:
self.runner.terminate()
self._jobqueue.close()
self._outputs.close()
except AttributeError:
pass
if run.check_pid(self.runner.pid):
os.kill(self.runner.pid, signal.SIGKILL)
# Call terminate when we exit
atexit.register(terminate)
[docs] def update(self):
"""Get fresh job info from the runner."""
sleep(0.5) # This allows the queue time to flush
if self.runner.is_alive() is not True:
self.restart(True)
if self.runner.is_alive() is not True:
raise ClusterError('JobRunner has crashed')
while not self._outputs.empty():
# We loop through the whole queue stack, updating the dictionary
# every time so that we get the latest info
self.jobs.update(self._outputs.get_nowait())
if self.jobs:
self.jobno = max(self.jobs.keys())
conf.set_option('jobqueue', 'jobno', str(self.jobno))
[docs] def add(self, function, args=None, kwargs=None, dependencies=None,
cores=1):
"""Add function to local job queue.
Args:
function: A function object. To run a command, use the run.cmd
function here.
args: A tuple of args to submit to the function.
kwargs: A dict of keyword arguments to submit to the function.
dependencies: A list of job IDs that this job will depend on.
cores: The number of threads required by this job.
Returns:
int: A job ID
"""
self.update()
assert self.runner.is_alive()
oldjob = self.jobno
cores = int(cores)
if cores > self.cores:
logme.log('Job core request exceeds resources, limiting to max: ' +
'{}'.format(self.cores), 'warn')
cores = self.cores
self._jobqueue.put(Job(function, args, kwargs, dependencies,
cores))
sleep(0.5)
self.update()
newjob = self.jobno
# Sometimes the queue can freeze for reasons I don't understand, this
# is an attempted workaround.
if not newjob == oldjob + 1:
self.restart(True)
self._jobqueue.put(Job(function, args, kwargs, dependencies,
cores))
self.update()
newjob = self.jobno
if not newjob == oldjob + 1:
raise ClusterError('Job numbers are not updating correctly, the '
'local queue has probably crashed. Please '
'report this issue.')
return self.jobno
[docs] def wait(self, jobs=None):
"""Wait for a list of jobs, all jobs are the default."""
self.update()
if not isinstance(jobs, (list, tuple)):
jobs = [jobs]
while jobs:
for job in jobs:
if job not in self.jobs:
raise ClusterError('Job {} has not been submitted.'.format(job))
if self.jobs[job].state == 'done':
jobs.remove(job)
sleep(0.5)
[docs] def get(self, job):
"""Return the output of a single job"""
if job not in self.jobs:
raise ClusterError('Job {} has not been submitted.'.format(job))
self.wait(job)
return self.jobs[job].out
[docs] def restart(self, force=False):
"""Kill the job queue and restart it."""
if not force:
self.update()
if len(self.done) != len(self.jobs):
logme.log('Cannot restart, incomplete jobs', 'error')
return
self.runner.terminate()
self.runner = mp.Process(target=job_runner,
args=(self._jobqueue, self._outputs,
self.cores, self.jobnumber),
name='Runner')
self.runner.start()
self.pid = self.runner.pid
assert self.runner.is_alive()
def __getattr__(self, attr):
"""Dynamic dictionary filtering."""
if attr == 'done' or attr == 'queued' or attr == 'waiting' \
or attr == 'running':
newdict = {}
for jobid, job_info in self.jobs.items():
if job_info.state == attr:
newdict[jobid] = job_info
return newdict
def __getitem__(self, key):
"""Allow direct accessing of jobs by job id."""
self.update()
key = int(key)
try:
return self.jobs[key]
except KeyError:
return None
def __iter__(self):
"""Allow us to be iterable"""
self.update()
for jobno, job in self.jobs.items():
yield jobno, job
def __len__(self):
"""Length is the total job count."""
self.update()
return len(self.jobs)
def __repr__(self):
"""Class information."""
self.update()
return ("JobQueue<({})jobs:{};completed:{};running:{};queued:{}>"
.format(self.cores, len(self.jobs), len(self.done),
len(self.running),
len(self.waiting) + len(self.queued)))
def __str__(self):
"""Print jobs."""
self.update()
return str(self.jobs)
class Job(object):
"""An object to pass arguments to the runner."""
def __init__(self, function, args=None, kwargs=None, depends=None,
cores=1):
"""Parse and save arguments."""
if args and not isinstance(args, tuple):
args = (args,)
if kwargs and not isinstance(kwargs, dict):
raise TypeError('kwargs must be a dict')
if depends:
if not isinstance(depends, (tuple, list)):
depends = [depends]
try:
depends = [int(i) for i in depends]
except ValueError:
raise ValueError('dependencies must be integer job ids')
self.function = function
self.args = args
self.kwargs = kwargs
self.depends = depends
self.cores = int(cores)
# Assigned later
self.id = None
self.pid = None
self.exitcode = None
self.out = None
self.state = 'Not Submitted'
def __repr__(self):
"""Job Info."""
return ("Job<{} (function:{},args:{}," +
"kwargs:{};cores:{}) {}>").format(
self.id, self.function.__name__, self.args,
self.kwargs, self.cores, self.state)
def __str__(self):
"""Print Info and Output."""
outstr = "Job #{}; Cores: {}\n".format(
self.id if self.id else 'NA', self.cores)
outstr += "\tFunction: {}\n\targs: {}\n\tkwargs: {}\n\t".format(
self.function.__name__, self.args, self.kwargs)
outstr += "State: {}\n\tOutput: {}\n".format(self.state, self.out)
return outstr
###############################################################################
# The Job Runner that will fork and run all jobs #
###############################################################################
[docs]def job_runner(jobqueue, outputs, cores=None, jobno=None):
"""Run jobs with dependency tracking.
Must be run as a separate multiprocessing.Process to function correctly.
Args:
jobqueue: A multiprocessing.Queue object into which Job objects must be
added. The function continually searches this Queue for new
jobs. Note, function must be a function call, it cannot be
anything else. function is the only required argument, the
rest are optional. tuples are required.
outputs: A multiprocessing.Queue object that will take outputs. A
dictionary of job objects will be output here with the
format:: {job_no => Job}
**NOTE**: function return must be picklable otherwise this
will raise an exception when it is put into the Queue object.
cores: Number of cores to use in the multiprocessing pool. Defaults
to all.
jobno: What number to start counting jobs from, default 1.
"""
def output(out):
"""Explicitly clear the dictionary before sending the output."""
# Don't send output if it is the same as last time.
lastout = outputs.get() if not outputs.empty() else ''
if out == lastout:
return
while not outputs.empty():
# Clear the output object
outputs.get()
outputs.put(out)
# Make sure we have Queue objects
if not isinstance(jobqueue, mp.queues.Queue) \
or not isinstance(outputs, mp.queues.Queue):
raise ClusterError('jobqueue and outputs must be multiprocessing ' +
'Queue objects')
# Initialize job objects
jobno = int(jobno) if jobno \
else int(conf.get_option('jobqueue', 'jobno', str(1)))
jobs = {} # This will hold job numbers
started = [] # A list of started jobs to check against
cores = cores if cores else THREADS
queue = [] # This will hold Processes that haven't started yet
running = [] # This will hold actively running jobs to manage core count
done = [] # A list of completed jobs to check against
# Actually loop through the jobs
while True:
if not jobqueue.empty():
oldjob = jobno
jobno += 1
newjob = jobno
# Sometimes the thread stalls if it has been left a while and
# ends up reusing the same job number. I don't know why this
# happens, however explicitly getting the jobno seems to fix the
# issue. Just to be sure I also want to test that the job number is
# incremented, but this is a little redundant at this point.
assert newjob == oldjob + 1
job = jobqueue.get_nowait()
if not isinstance(job, Job):
logme.log('job information must be a job object, was {}'.format(
type(job)), 'error')
continue
# The arguments look good, so lets add this to the stack.
job.state = 'submitted'
job.id = jobno
jobs[jobno] = job
# Send the job dictionary
output(jobs)
# If there are jobs, try and run them
if jobs:
for jobno, job_info in jobs.items():
# Skip completed jobs
if job_info.state == 'done':
continue
# Check dependencies
ready = True
if job_info.depends:
for depend in job_info.depends:
if int(depend) not in done:
ready = False
job_info.state = 'waiting'
output(jobs)
# Start jobs if dependencies are met and they aren't started.
# We use daemon mode so that child jobs are killed on exit.
if ready and not jobno in started:
ver = sys.version_info.major
# Python 2 doesn't support daemon, even though the docs
# say that it does.
gen_args = dict(name=str(jobno)) if ver == 2 \
else dict(name=str(jobno), daemon=True)
if job_info.args and job_info.kwargs:
queue.append((mp.Process(target=job_info.function,
args=job_info.args,
kwargs=job_info.kwargs,
**gen_args),
job_info.cores))
elif job_info.args:
queue.append((mp.Process(target=job_info.function,
args=job_info.args,
**gen_args),
job_info.cores))
elif job_info.kwargs:
queue.append((mp.Process(target=job_info.function,
kwargs=job_info.kwargs,
**gen_args),
job_info.cores))
else:
queue.append((mp.Process(target=job_info.function,
**gen_args),
job_info.cores))
job_info.state = 'queued'
started.append(jobno)
output(jobs)
# Actually run jobs
if queue:
# Get currently used cores, ignore ourself
running_cores = 0
for i in [i[1] for i in running]:
running_cores += i
# Look for a job to run
for j in queue:
if j[1] + running_cores <= cores:
j[0].start()
jobs[int(j[0].name)].state = 'running'
jobs[int(j[0].name)].pid = j[0].pid
running.append(queue.pop(queue.index(j)))
output(jobs)
sleep(0.5) # Wait for a second to allow job to start
break
# Clean out running jobs
if running:
for i in running:
j = i[0]
if not j.is_alive():
jobs[int(j.name)].out = j.join()
jobs[int(j.name)].state = 'done'
jobs[int(j.name)].exitcode = j.exitcode
done.append(int(j.name))
running.pop(running.index(i))
output(jobs)
# Wait for half a second before looping again
sleep(0.5)