dagfile module

Machinery for reading, editing, and writing Condor DAG files.

When running DAGs on Condor compute clusters, very often one will wish to re-run a portion of a DAG. This can be done by marking all jobs except the ones to be re-run as “DONE”. Unfortunately the Condor software suite lacks an I/O library for reading and writing Condor DAG files, so there is no easy way to edit DAG files except by playing games sed, awk, or once-off Python or Perl scripts. That’s where this module comes in. This module will read a DAG file into an in-ram representation that is easily edited, and allow the file to be written to disk again.

Example:

>>> from gstlal import dagfile
>>> dag = dagfile.DAG.parse(open("pipeline.dag"))
>>> dag.write(open("pipeline.dag", "w"))

Although it is possible to machine-generate an original DAG file using this module and write it to disk, this module does not provide the tools required to do any of the other tasks associated with pipeline construction. For example there is no facility here to generate or manage submit files, data files, or any other files that are associated with a full pipeline. Only the DAG file itself is considered here. For general pipeline construction see the pipeline module. The focus of this module is on editing existing DAG files.

Developers should also consider doing any new pipeline development using DAX files as the fundamental workflow description, instead of DAGs. See http://pegasus.isi.edu for more information.

A DAG file is loaded using the .parse() class method of the DAG class. This parses the file-like object passed to it and returns an instance of the DAG class representing the file’s contents. Once loaded, the nodes in the DAG can all be found in the .nodes dictionary, whose keys are the node names and whose values are the corresponding node objects. Among each node object’s attributes are sets .children and .parents containing references to the child and parent nodes (not their names) for each node. Note that every node must appear listed as a parent of each of its children, and vice versa. The other attributes of a DAG instance contain information about the DAG, for example the CONFIG file or the DOT file, and so on. All of the data for each node in the DAG, for example the node’s VARS value, its initial working directory, and so on, can be found in the attributes of the nodes themselves. A DAG is written to a file using the .write() method of the DAG object.

class dagfile.DAG(nodes={}, maxjobs={}, config=None, dot=None, dotupdate=False, dotoverwrite=True, dotinclude=None, node_status_file=None, node_status_file_updatetime=None, jobstate_log=None)[source]

Bases: object

Representation of the contents of a Condor DAG file.

BUGS: the semantics of the “+” special character in category names is not understood. For now, it is an error for a node’s category to not be found verbatim in a MAXJOBS line. The “+” character is a wildcard-like character used in the assignment of MAXJOBS values to job categories in splices; see the Condor documentation for more information.

abortdagonpat = re.compile('^ABORT-DAG-ON\\s+(?P<name>\\S+)\\s+(?P<exitvalue>\\S+)(\\s+RETURN\\s+(?P<returnvalue>\\S+))?', re.IGNORECASE)
arcpat = re.compile('^PARENT\\s+(?P<parents>.+?)\\s+CHILD\\s+(?P<children>.+)', re.IGNORECASE)
categorypat = re.compile('^CATEGORY\\s+(?P<name>\\S+)\\s+(?P<category>\\S+)', re.IGNORECASE)
check_edges()[source]

Check all graph edges for validity. Checks that each of every node’s children lists that node as a parent, and vice versa, and that all nodes listed in the parent and child sets of all nodes are contained in this DAG. Raises ValueError if a problem is found, otherwise returns None.

Example:

>>> try:
...     dag.check_edges()
... except ValueError as e:
...     print("edges are broken: %s" % str(e))
... else:
...     print("all edges are OK")
...
configpat = re.compile('^CONFIG\\s+(?P<filename>\\S+)', re.IGNORECASE)
datapat = re.compile('^DATA\\s+(?P<name>\\S+)\\s+(?P<filename>\\S+)(\\s+DIR\\s+(?P<directory>\\S+))?(\\s+(?P<noop>NOOP))?(\\s+(?P<done>DONE))?', re.IGNORECASE)
donepat = re.compile('^DONE\\s+(?P<name>\\S+)', re.IGNORECASE)
dot_source(title='DAG', rename=False, colour='black', bgcolour='#a3a3a3', statecolours={'abort': 'red', 'fail': 'red', 'idle': 'yellow', 'run': 'lightblue', 'stop': 'red', 'success': 'green', 'wait': 'yellow'})[source]

Generator yielding a sequence of strings containing DOT code to generate a visualization of the DAG graph. See http://www.graphviz.org for more information.

title provides a title for the graph. If rename is True, instead of using the names of the nodes for the node names in the graph, numbers will be used instead. The numbers are assigned to the nodes in alphabetical order by node name. This might be required if the nodes have names that are incompatible with the DOT syntax.

colour and bgcolour set the outline colour of the graph nodes and the background colour for the graph respectively. statecolours is a dictionary mapping node state (see the .state attribute of the JOB class and its derivatives) to a colour. Set statecolours to None to disable state-based colouring of graph nodes.

Example:

>>> import sys
>>> sys.stdout.writelines(dag.dot_source(statecolours = None))

BUGS: the JOB class does not implement the ability to retrieve the job state at this time, therefore it is always necessary to set statecolours to None. This might change in the future.

dotpat = re.compile('^DOT\\s+(?P<filename>\\S+)(\\s+(?P<options>.+))?', re.IGNORECASE)
get_all_child_names(names)[source]

Trace the DAG forward from the children of the nodes whose names are given to the leaf nodes, inclusively, and return the set of the names of all nodes visited.

Example:

>>> all_children = dag.get_all_child_names(["triggergen"])
get_all_parent_names(names)[source]

Trace the DAG backward from the parents of the nodes whose names are given to the head nodes, inclusively, and return the set of the names of all nodes visited.

Example:

>>> all_parents = dag.get_all_parent_names(["triggergen"])
jobpat = re.compile('^JOB\\s+(?P<name>\\S+)\\s+(?P<filename>\\S+)(\\s+DIR\\s+(?P<directory>\\S+))?(\\s+(?P<noop>NOOP))?(\\s+(?P<done>DONE))?', re.IGNORECASE)
jobstatepat = re.compile('^JOBSTATE_LOG\\s+(?P<filename>\\S+)', re.IGNORECASE)
load_rescue(f, progress=None)[source]

Parse the file-like object f as a rescue DAG, using the DONE lines therein to set the job states of this DAG.

In the past, rescue DAGs were full copies of the original DAG with the word DONE added to the JOB lines of completed jobs. In version 7.7.2 of Condor, the default format of rescue DAGs was changed to a condensed format consisting of only the names of completed jobs and the number of retries remaining for incomplete jobs. Currently Condor still supports the original rescue DAG format, but the user must set the DAGMAN_WRITE_PARTIAL_RESCUE config variable to false to obtain one. This module does not directly support the new format, however this method allows a new-style rescue DAG to be parsed to set the states of the jobs in a DAG. This, in effect, converts a new-style rescue DAG to an old-style rescue DAG, allowing the result to be manipulated as before.

If the progress argument is not None, it should be a callable object. This object will be called periodically and passed the f argument, the current line number, and a boolean indicating if parsing is complete. The boolean is always False until parsing is complete, then the callable will be invoked one last time with the final line count and the boolean set to True.

maxjobspat = re.compile('^MAXJOBS\\s+(?P<category>\\S+)\\s+(?P<value>\\S+)', re.IGNORECASE)
nodestatuspat = re.compile('^NODE_STATUS_FILE\\s+(?P<filename>\\S+)(\\s+(?P<updatetime>\\S+))?', re.IGNORECASE)
classmethod parse(f, progress=None)[source]

Parse the file-like object f as a Condor DAG file. Return a DAG object. The file object must be iterable, yielding one line of text of the DAG file in each iteration.

If the progress argument is not None, it should be a callable object. This object will be called periodically and passed the f argument, the current line number, and a boolean indicating if parsing is complete. The boolean is always False until parsing is complete, then the callable will be invoked one last time with the final line count and the boolean set to True.

Example:

>>> def progress(f, n, done):
...     print("reading %s: %d lines\r" % (f.name, n)),
...     if done:
...             print
...
>>> dag = DAG.parse(open("pipeline.dag"), progress = progress)
prioritypat = re.compile('^PRIORITY\\s+(?P<name>\\S+)\\s+(?P<value>\\S+)', re.IGNORECASE)
reindex()[source]

Rebuild the .nodes index. This is required if the names of nodes are changed.

retrypat = re.compile('^RETRY\\s+(?P<name>\\S+)\\s+(?P<retries>\\S+)(\\s+UNLESS-EXIT\\s+(?P<retry_unless_exit_value>\\S+))?', re.IGNORECASE)
scriptpat = re.compile('^SCRIPT\\s+(?P<type>(PRE)|(POST))\\s(?P<name>\\S+)\\s+(?P<executable>\\S+)(\\s+(?P<arguments>.+))?', re.IGNORECASE)
classmethod select_nodes_by_name(dag, nodenames)[source]

Construct a new DAG object containing only the nodes whose names are in nodenames.

Example:

>>> names_to_rerun = set(["triggergen"])
>>> dag = DAG.select_nodes_by_name(dag, names_to_rerun | dag.get_all_parent_names(names_to_rerun))

NOTE: the new DAG object is given references to the node (JOB, DATA, etc.) objects in the original DAG, not copies of them. Therefore, editing the node objects, for example modifying their parent or child sets, will affect both DAGs. To obtain an independent DAG with its own node objects, make a deepcopy of the object that is returned (see the copy module in the Python standard library for more information).

Example:

>>> import copy
>>> dag = copy.deepcopy(DAG.select_nodes_by_name(dag, names_to_rerun | dag.get_all_parent_names(names_to_rerun)))
splicepat = re.compile('^SPLICE\\s+(?P<name>\\S+)\\s+(?P<filename>\\S+)(\\s+DIR\\s+(?P<directory>\\S+))?', re.IGNORECASE)
subdagpat = re.compile('^SUBDAG\\s+EXTERNAL\\s+(?P<name>\\S+)\\s+(?P<filename>\\S+)(\\s+DIR\\s+(?P<directory>\\S+))?(\\s+(?P<noop>NOOP))?(\\s+(?P<done>DONE))?', re.IGNORECASE)
varspat = re.compile('^VARS\\s+(?P<name>\\S+)\\s+(?P<vars>.+)', re.IGNORECASE)
varsvaluepat = re.compile('(?P<name>\\S+)\\s*=\\s*"(?P<value>.*?)(?<!\\\\)"', re.IGNORECASE)
write(f, progress=None, rescue=None)[source]

Write the DAG to the file-like object f. The object must provide a .write() method. In the special case that the optional rescue argument is not None (see below) then f can be set to None and no DAG file will be written (just the rescue DAG will be written).

If the progress argument is not None, it should be a callable object. This object will be called periodically and passed the f argument, the current line number, and a boolean indicating if writing is complete. The boolean is always False until writing is complete, then the callable will be invoked one last time with the final line count and the boolean set to True.

Example:

>>> def progress(f, n, done):
...     print "writing %s: %d lines\r" % (f.name, n),
...     if done:
...             print
...
>>> dag.write(open("pipeline.dag", "w"), progress = progress)

NOTE: when writing PARENT/CHILD graph edges, this method will silently skip any node names that are not in this DAG’s graph. This is a convenience to simplify writing DAGs constructed by the .select_nodes_by_name() class method. If one wishes to check for broken parent/child links before writing the DAG use the .check_edges() method.

If the optional rescue argument is not None, it must be a file-like object providing a .write() method and the DONE state of jobs will be written to this file instead of the .dag (in the .dag all jobs will be marked not done).

Example:

>>> dag.write(open("pipeline.dag", "w"), rescue = open("pipeline.dag.rescue001", "w"))

NOTE: it is left as an exercise for the calling code to ensure the name chosen for the rescue file is consistent with the naming convention assumed by condor_dagman when it starts up.

class dagfile.DATA(name, filename, directory=None, done=False, noop=False)[source]

Bases: JOB

Representation of a Stork DATA node in a Condor DAG.

keyword = 'DATA'
class dagfile.JOB(name, filename, directory=None, done=False, noop=False)[source]

Bases: object

Representation of a JOB node in a Condor DAG. JOB objects have the following attributes corresponding to information in the DAG file:

.name

The name of the node in the DAG.

.filename

The name of the submit file for the JOB.

.directory

The initial working directory for the JOB. Set to None to omit from DAG (job’s working directory will be chosen by Condor).

.done

Boolean indicating if the JOB is done or not. See DAG.load_rescue() for more information.

.noop

Boolean indicating if the JOB is a no-op or not.

.vars

A dictionary of the name–>value pairs in the VARS line for the JOB. Leave empty to omit VARS from DAG.

.retry

The number of retries for the job. Set to None to omit from DAG.

.retry_unless_exit_value

The value of the UNLESS-EXIT suffix of the RETRY line. Set to None to omit from DAG.

.priority .category

The PRIORITY value and CATEGORY name for the node in the DAG. Set to None to omit from the DAG.

.parents .children

Sets of the parent and child nodes of JOB. The sets contain references to the node objects, not their names.

.prescript .prescriptargs .postscript .postscriptargs

The names and lists of arguments of the PRE and POST scripts. Set to None to omit from DAG.

.abort_dag_on_abortexitvalue .abort_dag_on_dagreturnvalue

The ABORT-DAG-ON abort exit value and DAG return value for the JOB. Set to None to omit from DAG.

For more information about the function of these parameters, refer to the Condor documentation.

keyword = 'JOB'
property state

Get the state of the node. One of ‘wait’, ‘idle’, ‘run’, ‘abort’, ‘stop’, ‘success’, ‘fail’.

NOTE: this feature is not implemented at this time.

write(f, progress=None)[source]

Write the lines describing this node to the file-like object f. The object must provide a .write() method.

If progress is not None, it will be incremented by 1 for every line written.

class dagfile.SPLICE(name, filename, directory=None, done=False, noop=False)[source]

Bases: JOB

Representation of a SPLICE node in a Condor DAG.

keyword = 'SPLICE'
class dagfile.SUBDAG_EXTERNAL(name, filename, directory=None, done=False, noop=False)[source]

Bases: JOB

Representation of a SUBDAG EXTERNAL node in a Condor DAG.

keyword = 'SUBDAG EXTERNAL'