Source code for injection_generation_dag

from optparse import OptionParser
import tempfile
from glue import pipeline

[docs]class DAG(pipeline.CondorDAG): def __init__(self, name, logpath = None): self.basename = name.replace(".dag","") tempfile.tempdir = logpath tempfile.template = self.basename + '.dag.log.' logfile = tempfile.mktemp() fh = open( logfile, "w" ) fh.close() pipeline.CondorDAG.__init__(self,logfile) self.set_dag_file(self.basename) self.jobsDict = {} self.output_cache = []
[docs] def add_node(self, node): node.set_retry(1) node.add_macro("macronodename", node.get_name()) pipeline.CondorDAG.add_node(self, node)
[docs]class InjectionJob(pipeline.CondorDAGJob): def __init__(self, executable, tag_base = None, universe="vanilla", condor_commands = {}, **kwargs): self.__prog__ = tag_base self.__executable = executable self.__universe = universe pipeline.CondorDAGJob.__init__(self, self.__universe, self.__executable) self.add_condor_cmd('getenv','True') self.tag_base = tag_base self.set_sub_file(tag_base+'.sub') self.set_stdout_file('logs/$(macronodename)-$(cluster)-$(process).out') self.set_stderr_file('logs/$(macronodename)-$(cluster)-$(process).err') self.number = 1 # make an output directory for files self.output_path = tag_base try: os.mkdir(self.output_path) except: pass for cmd,val in condor_commands.items(): self.add_condor_cmd(cmd, val)
[docs]class InjectionNode(pipeline.CondorDAGNode): def __init__(self, job, dag, p_node=[], opts={}): pipeline.CondorDAGNode.__init__(self, job) for p in p_node: self.add_parent(p) for opt, val in opts.items(): if val is None: continue # not the same as val = '' which is allowed if not hasattr(val, "__iter__") or isinstance(val, str): # catches list like things and strings if opt == "": self.add_var_arg(val) else: self.add_var_opt(opt, val) # Must be an iterable, but not str else: if opt == "": [self.add_var_arg(a) for a in val] else: self.add_var_opt(opt, pipeline_dot_py_append_opts_hack(opt, val)) self.set_name("%s_%04X" % (job.tag_base, job.number)) job.number += 1 dag.add_node(self)