# Copyright (C) 2011--2015 Kipp Cannon
# Copyright (C) 2004--2006 Brian Moe
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
Machinery for reading, editing, and writing Condor DAG files.
When running DAGs on Condor compute clusters, very often one will wish to
re-run a portion of a DAG. This can be done by marking all jobs except the
ones to be re-run as "DONE". Unfortunately the Condor software suite lacks
an I/O library for reading and writing Condor DAG files, so there is no
easy way to edit DAG files except by playing games sed, awk, or once-off
Python or Perl scripts. That's where this module comes in. This module
will read a DAG file into an in-ram representation that is easily edited,
and allow the file to be written to disk again.
Example:
>>> from gstlal import dagfile
>>> dag = dagfile.DAG.parse(open("pipeline.dag"))
>>> dag.write(open("pipeline.dag", "w"))
Although it is possible to machine-generate an original DAG file using this
module and write it to disk, this module does not provide the tools
required to do any of the other tasks associated with pipeline
construction. For example there is no facility here to generate or manage
submit files, data files, or any other files that are associated with a
full pipeline. Only the DAG file itself is considered here. For general
pipeline construction see the pipeline module. The focus of this module is
on editing existing DAG files.
Developers should also consider doing any new pipeline development using
DAX files as the fundamental workflow description, instead of DAGs. See
http://pegasus.isi.edu for more information.
A DAG file is loaded using the .parse() class method of the DAG class.
This parses the file-like object passed to it and returns an instance of
the DAG class representing the file's contents. Once loaded, the nodes in
the DAG can all be found in the .nodes dictionary, whose keys are the node
names and whose values are the corresponding node objects. Among each node
object's attributes are sets .children and .parents containing references
to the child and parent nodes (not their names) for each node. Note that
every node must appear listed as a parent of each of its children, and vice
versa. The other attributes of a DAG instance contain information about
the DAG, for example the CONFIG file or the DOT file, and so on. All of
the data for each node in the DAG, for example the node's VARS value, its
initial working directory, and so on, can be found in the attributes of the
nodes themselves. A DAG is written to a file using the .write() method of
the DAG object.
"""
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
import copy
import itertools
import re
__all__ = ["DAG", "JOB", "DATA", "SPLICE", "SUBDAG_EXTERNAL"]
#
# =============================================================================
#
# Progress Wrapper
#
# =============================================================================
#
class progress_wrapper(object):
"""
Progress report wrapper. For internal use only.
"""
def __init__(self, f, callback):
self.n = 0
self.f = f
self.callback = callback
def __iadd__(self, dn):
self.n += dn
if self.callback is not None and not self.n % 7411:
self.callback(self.f, self.n, False)
return self
def __del__(self):
if self.callback is not None:
self.callback(self.f, self.n, True)
class nofile(object):
"""
Object providing a no-op .write() method to fake a file. For
internal use only.
"""
def write(self, *args):
pass
#
# =============================================================================
#
# The Contents of a Condor DAG File
#
# =============================================================================
#
[docs]class JOB(object):
"""
Representation of a JOB node in a Condor DAG. JOB objects have the
following attributes corresponding to information in the DAG file:
.name
The name of the node in the DAG.
.filename
The name of the submit file for the JOB.
.directory
The initial working directory for the JOB. Set to None to
omit from DAG (job's working directory will be chosen by
Condor).
.done
Boolean indicating if the JOB is done or not. See
DAG.load_rescue() for more information.
.noop
Boolean indicating if the JOB is a no-op or not.
.vars
A dictionary of the name-->value pairs in the VARS line for
the JOB. Leave empty to omit VARS from DAG.
.retry
The number of retries for the job. Set to None to omit
from DAG.
.retry_unless_exit_value
The value of the UNLESS-EXIT suffix of the RETRY line.
Set to None to omit from DAG.
.priority
.category
The PRIORITY value and CATEGORY name for the node in the
DAG. Set to None to omit from the DAG.
.parents
.children
Sets of the parent and child nodes of JOB. The sets
contain references to the node objects, not their names.
.prescript
.prescriptargs
.postscript
.postscriptargs
The names and lists of arguments of the PRE and POST
scripts. Set to None to omit from DAG.
.abort_dag_on_abortexitvalue
.abort_dag_on_dagreturnvalue
The ABORT-DAG-ON abort exit value and DAG return value for
the JOB. Set to None to omit from DAG.
For more information about the function of these parameters, refer
to the Condor documentation.
"""
keyword = "JOB"
def __init__(self, name, filename, directory = None, done = False, noop = False):
# information from the JOB line in the DAG file
self.name = name
self.filename = filename
self.directory = directory
self.done = done
self.noop = noop
# the VARS line in the DAG file. orderless name, value
# pairs
self.vars = {}
# the RETRY line in the DAG file
self.retry = None
self.retry_unless_exit_value = None
# the PRIORITY and CATEGORY lines in the DAG file
self.priority = None
self.category = None
# the parents and children of this node. the sets contain
# references to the parent and child objects, not their
# names
self.parents = set()
self.children = set()
# the names and arguments of the PRE and POST scripts, if
# any
self.prescript = None
self.prescriptargs = None
self.postscript = None
self.postscriptargs = None
# the ABORT-DAG-ON abort exit value and dag return value
# for this job if they are set, or None if not
self.abort_dag_on_abortexitvalue = None
self.abort_dag_on_dagreturnvalue = None
[docs] def write(self, f, progress = None):
"""
Write the lines describing this node to the file-like
object f. The object must provide a .write() method.
If progress is not None, it will be incremented by 1 for
every line written.
"""
# JOB ...
f.write("%s %s %s" % (self.keyword, self.name, self.filename))
if self.directory is not None:
f.write(" DIR \"%s\"" % self.directory)
if self.noop:
f.write(" NOOP")
if self.done:
f.write(" DONE")
f.write("\n")
if progress is not None:
progress += 1
# PRIORITY ...
if self.priority:
f.write("PRIORITY %s %d\n" % (self.name, self.priority))
if progress is not None:
progress += 1
# CATEGORY ...
if self.category is not None:
f.write("CATEGORY %s %s\n" % (self.name, self.category))
if progress is not None:
progress += 1
# RETRY ...
if self.retry:
f.write("RETRY %s %d" % (self.name, self.retry))
if self.retry_unless_exit_value is not None:
f.write(" UNLESS-EXIT %d" % self.retry_unless_exit_value)
f.write("\n")
if progress is not None:
progress += 1
# VARS ...
if self.vars:
f.write("VARS %s" % self.name)
for name, value in sorted(self.vars.items()):
# apply escape rules to the value
f.write(" %s=\"%s\"" % (name, value.replace("\\", "\\\\").replace("\"", "\\\"")))
f.write("\n")
if progress is not None:
progress += 1
# SCRIPT PRE ...
if self.prescript is not None:
f.write("SCRIPT PRE %s %s" % (self.name, self.prescript))
if self.prescriptargs:
f.write(" %s" % " ".join(self.prescriptargs))
f.write("\n")
if progress is not None:
progress += 1
# SCRIPT POST ...
if self.postscript is not None:
f.write("SCRIPT POST %s %s" % (self.name, self.postscript))
if self.postscriptargs:
f.write(" %s" % " ".join(self.postscriptargs))
f.write("\n")
if progress is not None:
progress += 1
# ABORT-DAG-ON ...
if self.abort_dag_on_abortexitvalue is not None:
f.write("ABORT-DAG-ON %s %d" % (self.name, self.abort_dag_on_abortexitvalue))
if self.abort_dag_on_dagreturnvalue is not None:
f.write(" RETURN %d" % self.abort_dag_on_dagreturnvalue)
f.write("\n")
if progress is not None:
progress += 1
# state
@property
def state(self):
"""
Get the state of the node. One of 'wait', 'idle', 'run',
'abort', 'stop', 'success', 'fail'.
NOTE: this feature is not implemented at this time.
"""
raise NotImplemented
[docs]class DATA(JOB):
"""
Representation of a Stork DATA node in a Condor DAG.
"""
keyword = "DATA"
[docs]class SUBDAG_EXTERNAL(JOB):
"""
Representation of a SUBDAG EXTERNAL node in a Condor DAG.
"""
keyword = "SUBDAG EXTERNAL"
[docs]class SPLICE(JOB):
"""
Representation of a SPLICE node in a Condor DAG.
"""
# NOTE: although this is a subclass of the JOB class, splices
# don't support most of the things that can be associated with
# jobs, like VARS and so on, so don't set attributes that shouldn't
# be set or you'll get a nonsense DAG. In the future, more error
# checking might be added to prevent mis-use
keyword = "SPLICE"
[docs]class DAG(object):
"""
Representation of the contents of a Condor DAG file.
BUGS: the semantics of the "+" special character in category names
is not understood. For now, it is an error for a node's category
to not be found verbatim in a MAXJOBS line. The "+" character is a
wildcard-like character used in the assignment of MAXJOBS values to
job categories in splices; see the Condor documentation for more
information.
"""
#
# lines in DAG files
#
dotpat = re.compile(r'^DOT\s+(?P<filename>\S+)(\s+(?P<options>.+))?', re.IGNORECASE)
jobpat = re.compile(r'^JOB\s+(?P<name>\S+)\s+(?P<filename>\S+)(\s+DIR\s+(?P<directory>\S+))?(\s+(?P<noop>NOOP))?(\s+(?P<done>DONE))?', re.IGNORECASE)
datapat = re.compile(r'^DATA\s+(?P<name>\S+)\s+(?P<filename>\S+)(\s+DIR\s+(?P<directory>\S+))?(\s+(?P<noop>NOOP))?(\s+(?P<done>DONE))?', re.IGNORECASE)
subdagpat = re.compile(r'^SUBDAG\s+EXTERNAL\s+(?P<name>\S+)\s+(?P<filename>\S+)(\s+DIR\s+(?P<directory>\S+))?(\s+(?P<noop>NOOP))?(\s+(?P<done>DONE))?', re.IGNORECASE)
splicepat = re.compile(r'^SPLICE\s+(?P<name>\S+)\s+(?P<filename>\S+)(\s+DIR\s+(?P<directory>\S+))?', re.IGNORECASE)
prioritypat = re.compile(r'^PRIORITY\s+(?P<name>\S+)\s+(?P<value>\S+)', re.IGNORECASE)
categorypat = re.compile(r'^CATEGORY\s+(?P<name>\S+)\s+(?P<category>\S+)', re.IGNORECASE)
retrypat = re.compile(r'^RETRY\s+(?P<name>\S+)\s+(?P<retries>\S+)(\s+UNLESS-EXIT\s+(?P<retry_unless_exit_value>\S+))?', re.IGNORECASE)
varspat = re.compile(r'^VARS\s+(?P<name>\S+)\s+(?P<vars>.+)', re.IGNORECASE)
varsvaluepat = re.compile(r'(?P<name>\S+)\s*=\s*"(?P<value>.*?)(?<!\\)"', re.IGNORECASE)
scriptpat = re.compile(r'^SCRIPT\s+(?P<type>(PRE)|(POST))\s(?P<name>\S+)\s+(?P<executable>\S+)(\s+(?P<arguments>.+))?', re.IGNORECASE)
abortdagonpat = re.compile(r'^ABORT-DAG-ON\s+(?P<name>\S+)\s+(?P<exitvalue>\S+)(\s+RETURN\s+(?P<returnvalue>\S+))?', re.IGNORECASE)
arcpat = re.compile(r'^PARENT\s+(?P<parents>.+?)\s+CHILD\s+(?P<children>.+)', re.IGNORECASE)
maxjobspat = re.compile(r'^MAXJOBS\s+(?P<category>\S+)\s+(?P<value>\S+)', re.IGNORECASE)
configpat = re.compile(r'^CONFIG\s+(?P<filename>\S+)', re.IGNORECASE)
nodestatuspat = re.compile(r'^NODE_STATUS_FILE\s+(?P<filename>\S+)(\s+(?P<updatetime>\S+))?', re.IGNORECASE)
jobstatepat = re.compile(r'^JOBSTATE_LOG\s+(?P<filename>\S+)', re.IGNORECASE)
#
# lines in rescue DAG files
#
donepat = re.compile(r'^DONE\s+(?P<name>\S+)', re.IGNORECASE)
#
# methods
#
def __init__(self, nodes = {}, maxjobs = {}, config = None, dot = None, dotupdate = False, dotoverwrite = True, dotinclude = None, node_status_file = None, node_status_file_updatetime = None, jobstate_log = None):
"""
The meanings of the keyword arguments are:
nodes:
name --> JOB object mapping
maxjobs:
category name --> integer max jobs value mapping. all
categories are listed, that is it is an error for a JOB
in the DAG to claim to be in a category that cannot be
found in this dictionary. categories that don't have a
MAXJOBS set for them use None as their max jobs value
in this dictionary.
config:
filename or None
dot:
filename or None
dotupdate:
dotoverwrite:
booleans, defaults match Condor's
dotinclude:
filename or None
node_status_file:
node_status_file_updatetime:
filename and update time or None for both
jobstate_log:
filename or None
It is also possible to initialize a DAG object from another
DAG (-like) object with.
>> new = DAG(old)
"""
# initialize from keyword arguments, will check to see if
# nodes was a DAG object afteward
self.nodes = nodes
self.maxjobs = maxjobs
self.config = config
self.dot = dot
self.dotupdate = dotupdate
self.dotoverwrite = dotoverwrite
self.dotinclude = dotinclude
self.node_status_file = node_status_file
self.node_status_file_updatetime = node_status_file_updatetime
self.jobstate_log = jobstate_log
try:
# is nodes a DAG object? test for this by trying
# to retrieve the attributes it would need if it
# is
dag = nodes
dag.nodes, dag.maxjobs, dag.config, dag.dot, dag.dotupdate, dag.dotoverwrite, dag.dotinclude, dag.node_status_file, dag.node_status_file_updatetime, dag.jobstate_log
except AttributeError:
# nope, it's not a proper DAG object
pass
else:
# that worked, so reinitialize ourselves from its
# attributes
# FIXME: maybe the JOB class can be taught to
# duplicate itself
self.nodes = dict((name, copy.copy(node)) for name, node in dag.nodes.items())
self.maxjobs = dict(dag.maxjobs)
self.config = dag.config
self.dot = dag.dot
self.dotupdate = dag.dotupdate
self.dotoverwrite = dag.dotoverwrite
self.dotinclude = dag.dotinclude
self.node_status_file = dag.node_status_file
self.node_status_file_updatetime = dag.node_status_file_updatetime
self.jobstate_log = dag.jobstate_log
[docs] def reindex(self):
"""
Rebuild the .nodes index. This is required if the names of
nodes are changed.
"""
# the .nodes object has its contents replaced instead of
# building a new object so that if external code is holding
# a reference to it that code sees the new index as well
nodes = dict((node.name, node) for node in self.nodes.values())
if len(nodes) != len(self.nodes):
raise ValueError("node names are not unique")
self.nodes.clear()
self.nodes.update(nodes)
[docs] @classmethod
def parse(cls, f, progress = None):
"""
Parse the file-like object f as a Condor DAG file. Return
a DAG object. The file object must be iterable, yielding
one line of text of the DAG file in each iteration.
If the progress argument is not None, it should be a
callable object. This object will be called periodically
and passed the f argument, the current line number, and a
boolean indicating if parsing is complete. The boolean is
always False until parsing is complete, then the callable
will be invoked one last time with the final line count and
the boolean set to True.
Example:
>>> def progress(f, n, done):
... print("reading %s: %d lines\\r" % (f.name, n)),
... if done:
... print
...
>>> dag = DAG.parse(open("pipeline.dag"), progress = progress)
"""
progress = progress_wrapper(f, progress)
self = cls()
arcs = []
for n, line in enumerate(f, start = 1):
# progress
progress += 1
# skip comments and blank lines
line = line.strip()
if not line or line.startswith("#"):
continue
# JOB ...
m = self.jobpat.search(line)
if m is not None:
if m.group("name") in self.nodes:
raise ValueError("line %d: duplicate JOB %s" % (n, m.group("name")))
self.nodes[m.group("name")] = JOB(m.group("name"), m.group("filename"), directory = m.group("directory") and m.group("directory").strip("\""), done = bool(m.group("done")), noop = bool(m.group("noop")))
continue
# DATA ...
m = self.datapat.search(line)
if m is not None:
if m.group("name") in self.nodes:
raise ValueError("line %d: duplicate DATA %s" % (n, m.group("name")))
self.nodes[m.group("name")] = DATA(m.group("name"), m.group("filename"), directory = m.group("directory") and m.group("directory").strip("\""), done = bool(m.group("done")), noop = bool(m.group("noop")))
continue
# SUBDAG EXTERNAL ...
m = self.subdagpat.search(line)
if m is not None:
if m.group("name") in self.nodes:
raise ValueError("line %d: duplicate SUBDAG EXTERNAL %s" % (n, m.group("name")))
self.nodes[m.group("name")] = SUBDAG_EXTERNAL(m.group("name"), m.group("filename"), directory = m.group("directory") and m.group("directory").strip("\""), done = bool(m.group("done")), noop = bool(m.group("noop")))
continue
# SPLICE ...
m = self.splicepat.search(line)
if m is not None:
if m.group("name") in self.nodes:
raise ValueError("line %d: duplicate SPLICE %s" % (n, m.group("name")))
self.nodes[m.group("name")] = SPLICE(m.group("name"), m.group("filename"), directory = m.group("directory") and m.group("directory").strip("\""))
continue
# VARS ...
m = self.varspat.search(line)
if m is not None:
node = self.nodes[m.group("name")]
# FIXME: find a way to detect malformed name=value pairs
for name, value in self.varsvaluepat.findall(m.group("vars")):
if name in node.vars:
raise ValueError("line %d: multiple variable %s for %s %s" % (n, name, node.keyword, node.name))
# apply unescape rules to the value
node.vars[name] = value.replace("\\\\", "\\").replace("\\\"", "\"")
continue
# PARENT ... CHILD ...
m = self.arcpat.search(line)
if m is not None:
parents = m.group("parents").strip().split()
children = m.group("children").strip().split()
arcs.extend((parent, child) for parent in parents for child in children)
continue
# RETRY ...
m = self.retrypat.search(line)
if m is not None:
node = self.nodes[m.group("name")]
node.retry = int(m.group("retries"))
node.retry_unless_exit_value = m.group("retry_unless_exit_value")
continue
# SCRIPT ...
m = self.scriptpat.search(line)
if m is not None:
node = self.nodes[m.group("name")]
if m.group("type").upper() == "PRE":
if node.prescript is not None:
raise ValueError("line %d: multiple SCRIPT PRE for %s %s" % (n, node.keyword, node.name))
node.prescript = m.group("executable")
if m.group("arguments") is not None:
node.prescriptargs = m.group("arguments").split()
elif m.group("type").upper() == "POST":
if node.postscript is not None:
raise ValueError("line %d: multiple SCRIPT POST for %s %s" % (n, node.keyword, node.name))
node.postscript = m.group("executable")
if m.group("arguments") is not None:
node.postscriptargs = m.group("arguments").split()
else:
assert False # impossible to get here
continue
# PRIORITY ...
m = self.prioritypat.search(line)
if m is not None:
node = self.nodes[m.group("name")]
if node.priority is not None:
raise ValueError("line %d: multiple PRIORITY for %s %s" % (n, node.keyword, node.name))
node.priority = int(m.group("value"))
continue
# CATEGORY ...
m = self.categorypat.search(line)
if m is not None:
self.nodes[m.group("name")].category = m.group("category")
continue
# ABORT-DAG-ON ...
m = self.abortdagonpat.search(line)
if m is not None:
node = self.nodes[m.group("name")]
if node.abort_dag_on_abortexitvalue is not None:
raise ValueError("line %d: multiple ABORT-DAG-ON for %s %s" % (n, node.keyword, node.name))
node.abort_dag_on_abortexitvalue = int(m.group("exitvalue"))
if m.group("returnvalue") is not None:
node.abort_dag_on_dagreturnvalue = int(m.group("returnvalue"))
continue
# MAXJOBS ...
m = self.maxjobspat.search(line)
if m is not None:
if m.group("category") in self.maxjobs:
raise ValueError("line %d: multiple MAXJOBS for category %s" % (n, m.group("category")))
self.maxjobs[m.group("category")] = int(m.group("value"))
continue
# DOT ...
m = self.dotpat.search(line)
if m is not None:
self.dot = m.group("filename")
options = (m.group("options") or "").split()
while options:
option = options.pop(0).upper()
if option == "UPDATE":
self.dotupdate = True
elif option == "DONT-UPDATE":
self.dotupdate = False
elif option == "OVERWRITE":
self.dotoverwrite = True
elif option == "DONT-OVERWRITE":
self.dotoverwrite = False
elif option == "INCLUDE":
try:
self.dotinclude = options.pop(0)
except IndexError:
raise ValueError("line %d: missing filename for INCLUDE option of DOT" % n)
else:
raise ValueError("unrecognized option %s for DOT" % option)
continue
# CONFIG ...
m = self.dotpat.search(line)
if m is not None:
if self.config is not None:
raise ValueError("line %d: multiple CONFIG lines in dag file" % n)
self.config = m.group("filename")
continue
# NODE_STATUS_FILE ...
m = self.nodestatuspat.search(line)
if m is not None:
if self.node_status_file is not None:
raise ValueError("line %d: multiple NODE_STATUS_FILE lines in dag file" % n)
self.node_status_file = m.group("filename")
if m.group(updatetime) is not None:
self.node_status_file_updatetime = int(m.group("updatetime"))
continue
# JOBSTATE_LOG ...
m = self.jobstatepat.search(line)
if m is not None:
# dagman allows more than one of these
# statements, ignoring all but the first
if self.jobstate_log is None:
self.jobstate_log = m.group("filename")
continue
# error
raise ValueError("line %d: invalid line in dag file: %s" % (n, line))
# progress
del progress
# populate parent and child sets
getnode = self.nodes.__getitem__
for parent, child in arcs:
parent = getnode(parent)
child = getnode(child)
parent.children.add(child)
child.parents.add(parent)
# make sure all categories are known
for node in self.nodes.values():
if node.category is not None and node.category not in self.maxjobs:
self.maxjobs[node.category] = None
# done
return self
[docs] @classmethod
def select_nodes_by_name(cls, dag, nodenames):
"""
Construct a new DAG object containing only the nodes whose
names are in nodenames.
Example:
>>> names_to_rerun = set(["triggergen"])
>>> dag = DAG.select_nodes_by_name(dag, names_to_rerun | dag.get_all_parent_names(names_to_rerun))
NOTE: the new DAG object is given references to the node
(JOB, DATA, etc.) objects in the original DAG, not copies
of them. Therefore, editing the node objects, for example
modifying their parent or child sets, will affect both
DAGs. To obtain an independent DAG with its own node
objects, make a deepcopy of the object that is returned
(see the copy module in the Python standard library for
more information).
Example:
>>> import copy
>>> dag = copy.deepcopy(DAG.select_nodes_by_name(dag, names_to_rerun | dag.get_all_parent_names(names_to_rerun)))
"""
self = cls(dag)
self.nodes = dict((name, node) for name, node in dag.nodes.items() if name in nodenames)
self.maxjobs = dict((category, dag.maxjobs[category]) for category in set(node.category for node in self.nodes.values() if node.category is not None))
return self
[docs] def get_all_parent_names(self, names):
"""
Trace the DAG backward from the parents of the nodes whose
names are given to the head nodes, inclusively, and return
the set of the names of all nodes visited.
Example:
>>> all_parents = dag.get_all_parent_names(["triggergen"])
"""
all_parent_names = set()
nodes_to_scan = set(self.nodes[name] for name in names)
while nodes_to_scan:
node = nodes_to_scan.pop()
nodes_to_scan |= node.parents
all_parent_names |= set(parent.name for parent in node.parents)
return all_parent_names
[docs] def get_all_child_names(self, names):
"""
Trace the DAG forward from the children of the nodes whose
names are given to the leaf nodes, inclusively, and return
the set of the names of all nodes visited.
Example:
>>> all_children = dag.get_all_child_names(["triggergen"])
"""
all_child_names = set()
nodes_to_scan = set(self.nodes[name] for name in names)
while nodes_to_scan:
node = nodes_to_scan.pop()
nodes_to_scan |= node.children
all_child_names |= set(child.name for child in node.children)
return all_child_names
[docs] def check_edges(self):
"""
Check all graph edges for validity. Checks that each of
every node's children lists that node as a parent, and vice
versa, and that all nodes listed in the parent and child
sets of all nodes are contained in this DAG. Raises
ValueError if a problem is found, otherwise returns None.
Example:
>>> try:
... dag.check_edges()
... except ValueError as e:
... print("edges are broken: %s" % str(e))
... else:
... print("all edges are OK")
...
"""
nodes = set(self.nodes.values())
for node in nodes:
for child in node.children:
if node not in child.parents:
raise ValueError("node %s is not a parent of its child %s" % (node.name, child.name))
if child not in nodes:
raise ValueError("node %s has child %s that is not in DAG" % (node.name, child.name))
for parent in node.parents:
if node not in parent.children:
raise ValueError("node %s is not a child of its parent %s" % (node.name, parent.name))
if parent not in nodes:
raise ValueError("node %s has parent %s that is not in DAG" % (node.name, parent.name))
[docs] def load_rescue(self, f, progress = None):
"""
Parse the file-like object f as a rescue DAG, using the
DONE lines therein to set the job states of this DAG.
In the past, rescue DAGs were full copies of the original
DAG with the word DONE added to the JOB lines of completed
jobs. In version 7.7.2 of Condor, the default format of
rescue DAGs was changed to a condensed format consisting of
only the names of completed jobs and the number of retries
remaining for incomplete jobs. Currently Condor still
supports the original rescue DAG format, but the user must
set the DAGMAN_WRITE_PARTIAL_RESCUE config variable to
false to obtain one. This module does not directly support
the new format, however this method allows a new-style
rescue DAG to be parsed to set the states of the jobs in a
DAG. This, in effect, converts a new-style rescue DAG to
an old-style rescue DAG, allowing the result to be
manipulated as before.
If the progress argument is not None, it should be a
callable object. This object will be called periodically
and passed the f argument, the current line number, and a
boolean indicating if parsing is complete. The boolean is
always False until parsing is complete, then the callable
will be invoked one last time with the final line count and
the boolean set to True.
"""
# set all jobs to "not done"
for job in self.nodes.values():
job.done = False
# now load rescue DAG, updating done and retries states
progress = progress_wrapper(f, progress)
for n, line in enumerate(f):
# lines are counted from 1, enumerate counts from 0
n += 1
# progress
progress += 1
# skip comments and blank lines
line = line.strip()
if not line or line.startswith("#"):
continue
# DONE ...
m = self.donepat.search(line)
if m is not None:
self.nodes[m.group("name")].done = True
continue
# RETRY ...
m = self.retrypat.search(line)
if m is not None:
node = self.nodes[m.group("name")]
node.retry = int(m.group("retries"))
node.retry_unless_exit_value = m.group("retry_unless_exit_value")
continue
# error
raise ValueError("line %d: invalid line in rescue file: %s" % (n, line))
# progress
del progress
[docs] def write(self, f, progress = None, rescue = None):
"""
Write the DAG to the file-like object f. The object must
provide a .write() method. In the special case that the
optional rescue argument is not None (see below) then f can
be set to None and no DAG file will be written (just the
rescue DAG will be written).
If the progress argument is not None, it should be a
callable object. This object will be called periodically
and passed the f argument, the current line number, and a
boolean indicating if writing is complete. The boolean is
always False until writing is complete, then the callable
will be invoked one last time with the final line count and
the boolean set to True.
Example:
>>> def progress(f, n, done):
... print "writing %s: %d lines\\r" % (f.name, n),
... if done:
... print
...
>>> dag.write(open("pipeline.dag", "w"), progress = progress)
NOTE: when writing PARENT/CHILD graph edges, this method
will silently skip any node names that are not in this
DAG's graph. This is a convenience to simplify writing
DAGs constructed by the .select_nodes_by_name() class
method. If one wishes to check for broken parent/child
links before writing the DAG use the .check_edges() method.
If the optional rescue argument is not None, it must be a
file-like object providing a .write() method and the DONE
state of jobs will be written to this file instead of the
.dag (in the .dag all jobs will be marked not done).
Example:
>>> dag.write(open("pipeline.dag", "w"), rescue = open("pipeline.dag.rescue001", "w"))
NOTE: it is left as an exercise for the calling code to
ensure the name chosen for the rescue file is consistent
with the naming convention assumed by condor_dagman when it
starts up.
"""
# initialize proegress report wrapper
progress = progress_wrapper(f, progress)
# if needed, create a dummy object to allow .write() method
# calls
if f is None and rescue is not None:
f = nofile()
# DOT ...
if self.dot is not None:
f.write("DOT %s" % self.dot)
if self.dotupdate:
f.write(" UPDATE")
if not self.dotoverwrite:
f.write(" DONT-OVERWRITE")
if self.dotinclude is not None:
f.write(" INCLUDE %s" % self.dotinclude)
f.write("\n")
progress += 1
# CONFIG ...
if self.config is not None:
f.write("CONFIG %s\n" % self.config)
progress += 1
# NODE_STATUS_FILE ...
if self.node_status_file is not None:
f.write("NODE_STATUS_FILE %s" % self.node_status_file)
if self.node_status_file_updatetime is not None:
f.write(" %d" % self.node_status_file_updatetime)
f.write("\n")
progress += 1
# JOBSTATE_LOG ...
if self.jobstate_log is not None:
f.write("JOBSTATE_LOG %s\n" % self.jobstate_log)
progress += 1
# MAXJOBS ...
if set(node.category for node in self.nodes.values() if node.category is not None) - set(self.maxjobs):
raise ValueError("no MAXJOBS statement(s) for node category(ies) %s" % ", ".join(sorted(set(node.category for node in self.nodes.values() if node.category is not None) - set(self.maxjobs))))
for name, value in sorted(self.maxjobs.items()):
if value is not None:
f.write("MAXJOBS %s %d\n" % (name, value))
progress += 1
# JOB/DATA/SUBDAG ... (and things that go with them)
for name, node in sorted(self.nodes.items()):
if rescue is not None:
if node.done:
rescue.write("DONE %s\n" % node.name)
# save done state, then clear
done = node.done
node.done = False
node.write(f, progress = progress)
if rescue is not None:
# restore done state
node.done = done
# PARENT ... CHILD ...
names = set(self.nodes)
parents_of = {}
for name, node in self.nodes.items():
parents_of.setdefault(frozenset(child.name for child in node.children) & names, set()).add(node.name)
for children, parents in parents_of.items():
if children:
f.write("PARENT %s CHILD %s\n" % (" ".join(sorted(parents)), " ".join(sorted(children))))
progress += 1
# progress
del progress
[docs] def dot_source(self, title = "DAG", rename = False, colour = "black", bgcolour = "#a3a3a3", statecolours = {'wait': 'yellow', 'idle': 'yellow', 'run': 'lightblue', 'abort': 'red', 'stop': 'red', 'success': 'green', 'fail': 'red'}):
"""
Generator yielding a sequence of strings containing DOT
code to generate a visualization of the DAG graph. See
http://www.graphviz.org for more information.
title provides a title for the graph. If rename is True,
instead of using the names of the nodes for the node names
in the graph, numbers will be used instead. The numbers
are assigned to the nodes in alphabetical order by node
name. This might be required if the nodes have names that
are incompatible with the DOT syntax.
colour and bgcolour set the outline colour of the graph
nodes and the background colour for the graph respectively.
statecolours is a dictionary mapping node state (see the
.state attribute of the JOB class and its derivatives) to a
colour. Set statecolours to None to disable state-based
colouring of graph nodes.
Example:
>>> import sys
>>> sys.stdout.writelines(dag.dot_source(statecolours = None))
BUGS: the JOB class does not implement the ability to
retrieve the job state at this time, therefore it is always
necessary to set statecolours to None. This might change
in the future.
"""
# set up renaming map
if rename:
namemap = dict((name, str(n)) for n, name in enumerate(sorted(self.nodes), start = 1))
else:
namemap = dict((name, name) for name in self.nodes)
# generate dot code
yield 'digraph "%s" {\nnode [color="%s", href="\\N"];\ngraph [bgcolor="%s"];\n' % (title, colour, bgcolour)
for node in self.nodes.values():
if statecolours is not None:
yield '"%s"[color="%s"];\n' % (namemap[node.name], statecolours[node.state])
for child in node.children:
yield '"%s" -> "%s";\n' % (namemap[node.name], namemap[child.name])
yield '}\n'
# done
def optimize(dag):
# validate graph edges
dag.check_edges()
# generate no-op jobs
def noopgen(dag, submit_filename):
used = frozenset(name for name in dag.nodes if name.startswith("NOOP"))
for i in itertools.count():
name = "NOOP%d" % i
if name in used:
continue
noop = JOB(
name = name,
filename = submit_filename,
noop = True
)
dag.nodes[noop.name] = noop
yield noop
noops = iter(noopgen(dag, "noop.submit"))
# visit each node, construct a set of each node's children, and
# construct a look-up table mapping each unique such set to the set
# of parents possessing that set of children. these are the
# many-to-many parent-child relationships that become PARENT ...
# CHILD ... lines in the .dag. internally dagman represents each
# of these as a collection of parents*children objects (graph
# edges), so what is one line of text in the .dag file requires a
# quadratically large amount of ram in dagman.
parents_of = {}
for name, node in dag.nodes.items():
parents_of.setdefault(frozenset(node.children), set()).add(node)
# to work around this scaling problem we insert no-op jobs between
# the parents and children to replace the n*m edges with n+m edges
# plus one new node.
for children, parents in parents_of.items():
if len(parents) < 3 or len(children) < 3 or len(parents) * len(children) < 25:
# below this number of edges we don't bother
continue
noop = noops.next()
noop.parents |= parents
noop.children |= children
for node in parents:
node.children.clear()
node.children.add(noop)
for node in children:
node.parents.clear()
node.parents.add(noop)