pipeline module

This modules contains objects that make it simple for the user to create python scripts that build Condor DAGs to run code on the LSC Data Grid.

This file is part of the Grid LSC User Environment (GLUE)

GLUE is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with this program. If not, see <http://www.gnu.org/licenses/>.

class pipeline.CondorDAG(log)[source]

Bases: object

A CondorDAG is a Condor Directed Acyclic Graph that describes a collection of Condor jobs and the order in which to run them. All Condor jobs in the DAG must write their Codor logs to the same file. NOTE: The log file must not be on an NFS mounted system as the Condor jobs must be able to get an exclusive file lock on the log file.

add_maxjobs_category(categoryName, maxJobsNum)[source]

Add a category to this DAG called categoryName with a maxjobs of maxJobsNum. @param node: Add (categoryName,maxJobsNum) tuple to CondorDAG.__maxjobs_categories.

add_node(node)[source]

Add a CondorDAGNode to this DAG. The CondorJob that the node uses is also added to the list of Condor jobs in the DAG so that a list of the submit files needed by the DAG can be maintained. Each unique CondorJob will be added once to prevent duplicate submit files being written. @param node: CondorDAGNode to add to the CondorDAG.

get_dag_file()[source]

Return the path to the DAG file.

get_jobs()[source]

Return a list containing all the jobs in the DAG

get_maxjobs_categories()[source]

Return an array of tuples containing (categoryName,maxJobsNum)

get_nodes()[source]

Return a list containing all the nodes in the DAG

set_dag_file(path)[source]

Set the name of the file into which the DAG is written. @param path: path to DAG file.

set_integer_node_names()[source]

Use integer node names for the DAG

write_concrete_dag()[source]

Write all the nodes in the DAG to the DAG file.

write_dag()[source]

Write a dag.

write_maxjobs(fh, category)[source]

Write the DAG entry for this category’s maxjobs to the DAG file descriptor. @param fh: descriptor of open DAG file. @param category: tuple containing type of jobs to set a maxjobs limit for

and the maximum number of jobs of that type to run at once.

write_script()[source]

Write the workflow to a script (.sh instead of .dag).

Assuming that parents were added to the DAG before their children, dependencies should be handled correctly.

write_sub_files()[source]

Write all the submit files used by the dag to disk. Each submit file is written to the file name set in the CondorJob.

exception pipeline.CondorDAGError(args=None)[source]

Bases: CondorError

class pipeline.CondorDAGJob(universe, executable)[source]

Bases: CondorJob

A Condor DAG job never notifies the user on completion and can have variable options that are set for a particular node in the DAG. Inherits methods from a CondorJob.

add_var_arg(arg_index, quote=False)[source]

Add a command to the submit file to allow variable (macro) arguments to be passed to the executable.

add_var_condor_cmd(command)[source]

Add a condor command to the submit file that allows variable (macro) arguments to be passes to the executable.

add_var_opt(opt, short=False)[source]

Add a variable (or macro) option to the condor job. The option is added to the submit file and a different argument to the option can be set for each node in the DAG. @param opt: name of option to add.

create_node()[source]

Create a condor node from this job. This provides a basic interface to the CondorDAGNode class. Most jobs in a workflow will subclass the CondorDAGNode class and overwrite this to give more details when initializing the node. However, this will work fine for jobs with very simp input/output.

get_grid_site()[source]

Return the grid site for this node

set_grid_site(site)[source]

Set the grid site to run on. If not specified, will not give hint to Pegasus

exception pipeline.CondorDAGJobError(args=None)[source]

Bases: CondorError

class pipeline.CondorDAGManJob(dag, dir=None)[source]

Bases: object

Condor DAGMan job class. Appropriate for setting up DAGs to run within a DAG.

create_node()[source]

Create a condor node from this job. This provides a basic interface to the CondorDAGManNode class. Most jobs in a workflow will subclass the CondorDAGManNode class and overwrite this to give more details when initializing the node. However, this will work fine for jobs with very simp input/output.

get_dag()[source]

Return the name of any associated dag file

get_dag_directory()[source]

Get the directory where the dag will be run

get_sub_file()[source]

Return the name of the dag as the submit file name for the SUBDAG EXTERNAL command in the uber-dag

set_dag_directory(dir)[source]

Set the directory where the dag will be run @param dir: the name of the directory where the dag will be run

set_notification(value)[source]

Set the email address to send notification to. @param value: email address or never for no notification.

write_sub_file()[source]

Do nothing as there is not need for a sub file with the SUBDAG EXTERNAL command in the uber-dag

class pipeline.CondorDAGManNode(job)[source]

Bases: CondorDAGNode

Condor DAGMan node class. Appropriate for setting up DAGs to run within a DAG. Adds the user-tag functionality to condor_dagman processes running in the DAG. May also be used to extend dagman-node specific functionality.

add_maxjobs_category(categoryName, maxJobsNum)[source]

Add a category to this DAG called categoryName with a maxjobs of maxJobsNum. @param node: Add (categoryName,maxJobsNum) tuple to CondorDAG.__maxjobs_categories.

get_cluster_jobs()[source]

Returns the usertag string

get_maxjobs_categories()[source]

Return an array of tuples containing (categoryName,maxJobsNum)

get_user_tag()[source]

Returns the usertag string

set_cluster_jobs(cluster)[source]

Set the type of job clustering pegasus can use to collapse jobs @param cluster: clustering type

set_user_tag(usertag)[source]

Set the user tag that is passed to the analysis code. @param user_tag: the user tag to identify the job

class pipeline.CondorDAGNode(job)[source]

Bases: object

A CondorDAGNode represents a node in the DAG. It corresponds to a particular condor job (and so a particular submit file). If the job has variable (macro) options, they can be set here so each nodes executes with the correct options.

add_checkpoint_file(filename)[source]

Add filename as a checkpoint file for this DAG node @param filename: checkpoint filename to add

add_checkpoint_macro(filename)[source]
add_file_arg(filename)[source]

Add a variable (or macro) file name argument to the condor job. The argument is added to the submit file and a different value of the argument can be set for each node in the DAG. The file name is also added to the list of input files for the DAX. @param filename: name of option to add.

add_file_opt(opt, filename, file_is_output_file=False)[source]

Add a variable (macro) option for this node. If the option specified does not exist in the CondorJob, it is added so the submit file will be correct when written. The value of the option is also added to the list of input files for the DAX. @param opt: option name. @param value: value of the option for this node in the DAG. @param file_is_output_file: A boolean if the file will be an output file instead of an input file. The default is to have it be an input.

add_input_file(filename)[source]

Add filename as a necessary input file for this DAG node.

@param filename: input filename to add

add_input_macro(filename)[source]

Add a variable (macro) for storing the input files associated with this node. @param filename: filename of input file

add_io_macro(io, filename)[source]

Add a variable (macro) for storing the input/output files associated with this node. @param io: macroinput or macrooutput @param filename: filename of input/output file

add_macro(name, value)[source]

Add a variable (macro) for this node. This can be different for each node in the DAG, even if they use the same CondorJob. Within the CondorJob, the value of the macro can be referenced as ‘$(name)’ – for instance, to define a unique output or error file for each node. @param name: macro name. @param value: value of the macro for this node in the DAG

add_output_file(filename)[source]

Add filename as a output file for this DAG node.

@param filename: output filename to add

add_output_macro(filename)[source]

Add a variable (macro) for storing the output files associated with this node. @param filename: filename of output file

add_parent(node)[source]

Add a parent to this node. This node will not be executed until the parent node has run sucessfully. @param node: CondorDAGNode to add as a parent.

add_post_script_arg(arg)[source]

Adds an argument to the post script that is executed before the DAG node is run.

add_pre_script_arg(arg)[source]

Adds an argument to the pre script that is executed before the DAG node is run.

add_var_arg(arg, quote=False)[source]

Add a variable (or macro) argument to the condor job. The argument is added to the submit file and a different value of the argument can be set for each node in the DAG. @param arg: name of option to add.

add_var_condor_cmd(command, value)[source]

Add a variable (macro) condor command for this node. If the command specified does not exist in the CondorJob, it is added so the submit file will be correct. PLEASE NOTE: AS with other add_var commands, the variable must be set for all nodes that use the CondorJob instance. @param command: command name @param value: Value of the command for this node in the DAG/DAX.

add_var_opt(opt, value, short=False)[source]

Add a variable (macro) option for this node. If the option specified does not exist in the CondorJob, it is added so the submit file will be correct when written. @param opt: option name. @param value: value of the option for this node in the DAG.

finalize()[source]

The finalize method of a node is called before the node is finally added to the DAG and can be overridden to do any last minute clean up (such as setting extra command line arguments)

get_args()[source]

Return the arguments for this node. Note that this returns only the arguments for this instance of the node and not those associated with the underlying job template.

get_category()[source]

Get the category for this node in the DAG.

get_checkpoint_files()[source]

Return a list of checkpoint files for this DAG node and its job.

get_cmd_line()[source]

Return the full command line that will be used when this node is run by DAGman.

get_cmd_tuple_list()[source]

Return a list of tuples containg the command line arguments

get_input_files()[source]

Return list of input files for this DAG node and its job.

get_name()[source]

Get the name for this node in the DAG.

get_opts()[source]

Return the opts for this node. Note that this returns only the options for this instance of the node and not those associated with the underlying job template.

get_output_files()[source]

Return list of output files for this DAG node and its job.

get_post_script()[source]

returns the name of the post script that is executed before the DAG node is run. @param script: path to script

get_post_script_arg()[source]

Returns and array of arguments to the post script that is executed before the DAG node is run.

get_priority()[source]

Get the priority for this node in the DAG.

get_retry()[source]

Return the number of times that this node in the DAG should retry. @param retry: number of times to retry node.

get_vds_group()[source]

Returns the VDS group key for this node

job()[source]

Return the CondorJob that this node is associated with.

set_category(category)[source]

Set the category for this node in the DAG.

set_log_file(log)[source]

Set the Condor log file to be used by this CondorJob. @param log: path of Condor log file.

set_name(name)[source]

Set the name for this node in the DAG.

set_post_script(script)[source]

Sets the name of the post script that is executed before the DAG node is run. @param script: path to script

set_pre_script(script)[source]

Sets the name of the pre script that is executed before the DAG node is run. @param script: path to script

set_priority(priority)[source]

Set the priority for this node in the DAG.

set_retry(retry)[source]

Set the number of times that this node in the DAG should retry. @param retry: number of times to retry node.

set_vds_group(group)[source]

Set the name of the VDS group key when generating a DAX @param group: name of group for thus nore

write_category(fh)[source]

Write the DAG entry for this node’s category to the DAG file descriptor. @param fh: descriptor of open DAG file.

write_input_files(fh)[source]

Write as a comment into the DAG file the list of input files for this DAG node.

@param fh: descriptor of open DAG file.

write_job(fh)[source]

Write the DAG entry for this node’s job to the DAG file descriptor. @param fh: descriptor of open DAG file.

write_output_files(fh)[source]

Write as a comment into the DAG file the list of output files for this DAG node.

@param fh: descriptor of open DAG file.

write_parents(fh)[source]

Write the parent/child relations for this job to the DAG file descriptor. @param fh: descriptor of open DAG file.

write_post_script(fh)[source]

Write the post script for the job, if there is one @param fh: descriptor of open DAG file.

write_pre_script(fh)[source]

Write the pre script for the job, if there is one @param fh: descriptor of open DAG file.

write_priority(fh)[source]

Write the DAG entry for this node’s priority to the DAG file descriptor. @param fh: descriptor of open DAG file.

write_vars(fh)[source]

Write the variable (macro) options and arguments to the DAG file descriptor. @param fh: descriptor of open DAG file.

exception pipeline.CondorDAGNodeError(args=None)[source]

Bases: CondorError

exception pipeline.CondorError(args=None)[source]

Bases: Exception

Error thrown by Condor Jobs

class pipeline.CondorJob(universe, executable, queue)[source]

Bases: object

Generic condor job class. Provides methods to set the options in the condor submit file for a particular executable

add_arg(arg)[source]

Add an argument to the executable. Arguments are appended after any options and their order is guaranteed. @param arg: argument to add.

add_checkpoint_file(filename)[source]

Add filename as a checkpoint file for this DAG job.

add_condor_cmd(cmd, value)[source]

Add a Condor command to the submit file (e.g. a class add or evironment). @param cmd: Condor command directive. @param value: value for command.

add_file_arg(filename)[source]

Add a file argument to the executable. Arguments are appended after any options and their order is guaranteed. Also adds the file name to the list of required input data for this job. @param filename: file to add as argument.

add_file_opt(opt, filename)[source]

Add a command line option to the executable. The order that the arguments will be appended to the command line is not guaranteed, but they will always be added before any command line arguments. The name of the option is prefixed with double hyphen and the program is expected to parse it with getopt_long(). @param opt: command line option to add. @param value: value to pass to the option (None for no argument).

add_ini_opts(cp, section)[source]

Parse command line options from a given section in an ini file and pass to the executable. @param cp: ConfigParser object pointing to the ini file. @param section: section of the ini file to add to the options.

add_input_file(filename)[source]

Add filename as a necessary input file for this DAG node.

@param filename: input filename to add

add_opt(opt, value)[source]

Add a command line option to the executable. The order that the arguments will be appended to the command line is not guaranteed, but they will always be added before any command line arguments. The name of the option is prefixed with double hyphen and the program is expected to parse it with getopt_long(). @param opt: command line option to add. @param value: value to pass to the option (None for no argument).

add_output_file(filename)[source]

Add filename as a output file for this DAG node.

@param filename: output filename to add

add_short_opt(opt, value)[source]

Add a command line option to the executable. The order that the arguments will be appended to the command line is not guaranteed, but they will always be added before any command line arguments. The name of the option is prefixed with single hyphen and the program is expected to parse it with getopt() or getopt_long() (if a single character option), or getopt_long_only() (if multiple characters). Long and (single-character) short options may be mixed if the executable permits this. @param opt: command line option to add. @param value: value to pass to the option (None for no argument).

get_args()[source]

Return the list of arguments that are to be passed to the executable.

get_checkpoint_files()[source]

Return a list of checkpoint files for this DAG node

get_condor_cmds()[source]

Return the dictionary of condor keywords to add to the job

get_executable()[source]

Return the name of the executable for this job.

get_executable_installed()[source]

return whether or not the executable is installed

get_grid_scheduler()[source]

Return the grid scheduler.

get_grid_server()[source]

Return the grid server on which the job will run.

get_grid_type()[source]

Return the grid type of the job.

get_input_files()[source]

Return list of input files for this DAG node.

get_opt(opt)[source]

Returns the value associated with the given command line option. Returns None if the option does not exist in the options list. @param opt: command line option

get_opts()[source]

Return the dictionary of opts for the job.

get_output_files()[source]

Return list of output files for this DAG node.

get_short_opts()[source]

Return the dictionary of short options for the job.

get_stderr_file()[source]

Get the file to which Condor directs the stderr of the job.

get_stdin_file()[source]

Get the file from which Condor directs the stdin of the job.

get_stdout_file()[source]

Get the file to which Condor directs the stdout of the job.

get_sub_file()[source]

Get the name of the file which the Condor submit file will be written to when write_sub_file() is called.

get_universe()[source]

Return the condor universe that the job will run in.

set_executable(executable)[source]

Set the name of the executable for this job.

set_executable_installed(installed)[source]

If executable installed is true, then no copying of the executable is done. If it is false, pegasus stages the executable to the remote site. Default is executable is installed (i.e. True). @param installed: true or fale

set_grid_scheduler(grid_scheduler)[source]

Set the grid scheduler. @param grid_scheduler: grid scheduler on which to run.

set_grid_server(grid_server)[source]

Set the grid server on which to run the job. @param grid_server: grid server on which to run.

set_grid_type(grid_type)[source]

Set the type of grid resource for the job. @param grid_type: type of grid resource.

set_log_file(path)[source]

Set the Condor log file. @param path: path to log file.

set_notification(value)[source]

Set the email address to send notification to. @param value: email address or never for no notification.

set_stderr_file(path)[source]

Set the file to which Condor directs the stderr of the job. @param path: path to stderr file.

set_stdin_file(path)[source]

Set the file from which Condor directs the stdin of the job. @param path: path to stdin file.

set_stdout_file(path)[source]

Set the file to which Condor directs the stdout of the job. @param path: path to stdout file.

set_sub_file(path)[source]

Set the name of the file to write the Condor submit file to when write_sub_file() is called. @param path: path to submit file.

set_universe(universe)[source]

Set the condor universe for the job to run in. @param universe: the condor universe to run the job in.

write_sub_file()[source]

Write a submit file for this Condor job.

exception pipeline.CondorJobError(args=None)[source]

Bases: CondorError

exception pipeline.CondorSubmitError(args=None)[source]

Bases: CondorError

exception pipeline.SegmentError(args=None)[source]

Bases: Exception