from optparse import OptionParser
import tempfile
from glue import pipeline
[docs]class DAG(pipeline.CondorDAG):
def __init__(self, name, logpath = None):
self.basename = name.replace(".dag","")
tempfile.tempdir = logpath
tempfile.template = self.basename + '.dag.log.'
logfile = tempfile.mktemp()
fh = open( logfile, "w" )
fh.close()
pipeline.CondorDAG.__init__(self,logfile)
self.set_dag_file(self.basename)
self.jobsDict = {}
self.output_cache = []
[docs] def add_node(self, node):
node.set_retry(1)
node.add_macro("macronodename", node.get_name())
pipeline.CondorDAG.add_node(self, node)
[docs]class InjectionJob(pipeline.CondorDAGJob):
def __init__(self, executable, tag_base = None, universe="vanilla", condor_commands = {}, **kwargs):
self.__prog__ = tag_base
self.__executable = executable
self.__universe = universe
pipeline.CondorDAGJob.__init__(self, self.__universe, self.__executable)
self.add_condor_cmd('getenv','True')
self.tag_base = tag_base
self.set_sub_file(tag_base+'.sub')
self.set_stdout_file('logs/$(macronodename)-$(cluster)-$(process).out')
self.set_stderr_file('logs/$(macronodename)-$(cluster)-$(process).err')
self.number = 1
# make an output directory for files
self.output_path = tag_base
try:
os.mkdir(self.output_path)
except:
pass
for cmd,val in condor_commands.items():
self.add_condor_cmd(cmd, val)
[docs]class InjectionNode(pipeline.CondorDAGNode):
def __init__(self, job, dag, p_node=[], opts={}):
pipeline.CondorDAGNode.__init__(self, job)
for p in p_node:
self.add_parent(p)
for opt, val in opts.items():
if val is None:
continue # not the same as val = '' which is allowed
if not hasattr(val, "__iter__") or isinstance(val, str): # catches list like things and strings
if opt == "":
self.add_var_arg(val)
else:
self.add_var_opt(opt, val)
# Must be an iterable, but not str
else:
if opt == "":
[self.add_var_arg(a) for a in val]
else:
self.add_var_opt(opt, pipeline_dot_py_append_opts_hack(opt, val))
self.set_name("%s_%04X" % (job.tag_base, job.number))
job.number += 1
dag.add_node(self)