# Copyright (C) 2010 Kipp Cannon (kipp.cannon@ligo.org)
# Copyright (C) 2010 Chad Hanna (chad.hanna@ligo.org)
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
## @file
## @package dagparts
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
"""
DAG construction tools.
"""
import collections
import doctest
import itertools
import math
import os
import sys
import socket
import subprocess
import tempfile
import warnings
from ligo import segments
from lal.utils import CacheEntry
from gstlal import pipeline
__author__ = "Kipp Cannon <kipp.cannon@ligo.org>, Chad Hanna <chad.hanna@ligo.org>"
__date__ = "$Date$" #FIXME
__version__ = "$Revision$" #FIXME
warnings.warn(
"all functionality within this module has been replaced by gstlal.dags and "
"gstlal.dags.util, and will be removed from gstlal in the future",
DeprecationWarning,
)
#
# =============================================================================
#
# Environment utilities
#
# =============================================================================
#
[docs]def which(prog):
which = subprocess.Popen(['which',prog], stdout=subprocess.PIPE)
out = which.stdout.read().strip().decode('utf-8')
if not out:
raise ValueError("could not find %s in your path, have you built the proper software and sourced the proper environment scripts?" % prog)
return out
[docs]def condor_scratch_space():
"""!
A way to standardize the condor scratch space even if it changes
>>> condor_scratch_space()
'_CONDOR_SCRATCH_DIR'
"""
return "_CONDOR_SCRATCH_DIR"
[docs]def log_path():
"""!
The stupid pet tricks to find log space on the LDG.
Defaults to checking TMPDIR first.
"""
host = socket.getfqdn()
try:
return os.environ['TMPDIR']
except KeyError:
print("\n\n!!!! $TMPDIR NOT SET !!!!\n\n\tPLEASE email your admin to tell them to set $TMPDIR to be the place where a users temporary files should be\n")
#FIXME add more hosts as you need them
if 'cit' in host or 'caltech.edu' in host:
tmp = '/usr1/' + os.environ['USER']
print(f"falling back to {tmp}")
return tmp
if 'phys.uwm.edu' in host:
tmp = '/localscratch/' + os.environ['USER']
print(f"falling back to {tmp}")
return tmp
if 'aei.uni-hannover.de' in host:
tmp = '/local/user/' + os.environ['USER']
print(f"falling back to {tmp}")
return tmp
if 'phy.syr.edu' in host:
tmp = '/usr1/' + os.environ['USER']
print(f"falling back to {tmp}")
return tmp
raise KeyError("$TMPDIR is not set and I don't recognize this environment")
#
# =============================================================================
#
# Condor DAG utilities
#
# =============================================================================
#
[docs]class DAG(pipeline.CondorDAG):
"""!
A thin subclass of pipeline.CondorDAG.
Extra features include an add_node() method and a cache writing method.
Also includes some standard setup, e.g., log file paths etc.
"""
def __init__(self, name, logpath = log_path()):
self.basename = name.replace(".dag","")
tempfile.tempdir = logpath
tempfile.template = self.basename + '.dag.log.'
logfile = tempfile.mktemp()
fh = open( logfile, "w" )
fh.close()
pipeline.CondorDAG.__init__(self,logfile)
self.set_dag_file(self.basename)
self.jobsDict = {}
self.output_cache = []
[docs] def add_node(self, node, retry = 3):
node.set_retry(retry)
node.add_macro("macronodename", node.get_name())
pipeline.CondorDAG.add_node(self, node)
[docs] def write_cache(self):
out = self.basename + ".cache"
f = open(out,"w")
for c in self.output_cache:
f.write(str(c)+"\n")
f.close()
[docs]class DAGJob(pipeline.CondorDAGJob):
"""!
A job class that subclasses pipeline.CondorDAGJob and adds some extra
boiler plate items for gstlal jobs which tends to do the "right" thing
when given just an executable name.
"""
def __init__(self, executable, tag_base = None, universe = "vanilla", condor_commands = {}):
self.__executable = which(executable)
self.__universe = universe
if tag_base:
self.tag_base = tag_base
else:
self.tag_base = os.path.split(self.__executable)[1]
self.__prog__ = self.tag_base
pipeline.CondorDAGJob.__init__(self, self.__universe, self.__executable)
self.add_condor_cmd('getenv','True')
self.add_condor_cmd('environment',"GST_REGISTRY_UPDATE=no;")
self.set_sub_file(self.tag_base+'.sub')
self.set_stdout_file('logs/$(macronodename)-$(cluster)-$(process).out')
self.set_stderr_file('logs/$(macronodename)-$(cluster)-$(process).err')
self.number = 1
# make an output directory for files
self.output_path = self.tag_base
try:
os.mkdir(self.output_path)
except:
pass
for cmd, val in condor_commands.items():
self.add_condor_cmd(cmd, val)
[docs]class DAGNode(pipeline.CondorDAGNode):
"""!
A node class that subclasses pipeline.CondorDAGNode that automates
adding the node to the dag, makes sensible names and allows a list of parent
nodes to be provided.
It tends to do the "right" thing when given a job, a dag, parent nodes, dictionary
options relevant to the job, a dictionary of options related to input files and a
dictionary of options related to output files.
NOTE and important and subtle behavior - You can specify an option with
an empty argument by setting it to "". However options set to None are simply
ignored.
"""
def __init__(self, job, dag, parent_nodes, opts = {}, input_files = {}, output_files = {}, input_cache_files = {}, output_cache_files = {}, input_cache_file_name = None):
pipeline.CondorDAGNode.__init__(self, job)
for p in parent_nodes:
self.add_parent(p)
self.set_name("%s_%04X" % (job.tag_base, job.number))
job.number += 1
dag.add_node(self)
self.input_files = input_files.copy()
self.input_files.update(input_cache_files)
self.output_files = output_files.copy()
self.output_files.update(output_cache_files)
self.cache_inputs = {}
self.cache_outputs = {}
for opt, val in list(opts.items()) + list(output_files.items()) + list(input_files.items()):
if val is None:
continue # not the same as val = '' which is allowed
if isinstance(val, str) or not isinstance(val, collections.Iterable): # catches list like things but not strings
if opt == "":
self.add_var_arg(val)
else:
self.add_var_opt(opt, val)
# Must be an iterable
else:
if opt == "":
[self.add_var_arg(a) for a in val]
else:
self.add_var_opt(opt, pipeline_dot_py_append_opts_hack(opt, val))
# Create cache files for long command line arguments and store them in the job's subdirectory. NOTE the svd-bank string
# is handled by gstlal_inspiral_pipe directly
cache_dir = os.path.join(job.tag_base, 'cache')
for opt, val in input_cache_files.items():
if not os.path.isdir(cache_dir):
os.mkdir(cache_dir)
cache_entries = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in val]
if input_cache_file_name is None:
cache_file_name = group_T050017_filename_from_T050017_files(cache_entries, '.cache', path = cache_dir)
else:
cache_file_name = os.path.join(cache_dir, input_cache_file_name)
open(cache_file_name, "w").write("\n".join(map(str, cache_entries)))
self.add_var_opt(opt, cache_file_name)
# Keep track of the cache files being created
self.cache_inputs.setdefault(opt, []).append(cache_file_name)
for opt, val in output_cache_files.items():
if not os.path.isdir(cache_dir):
os.mkdir(cache_dir)
cache_entries = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in val]
cache_file_name = group_T050017_filename_from_T050017_files(cache_entries, '.cache', path = cache_dir)
open(cache_file_name, "w").write("\n".join(map(str, cache_entries)))
self.add_var_opt(opt, cache_file_name)
# Keep track of the cache files being created
self.cache_outputs.setdefault(opt, []).append(cache_file_name)
[docs]def condor_command_dict_from_opts(opts, defaultdict = None):
"""!
A function to turn a list of options into a dictionary of condor commands, e.g.,
>>> condor_command_dict_from_opts(["+Online_CBC_SVD=True", "TARGET.Online_CBC_SVD =?= True"])
{'+Online_CBC_SVD': 'True', 'TARGET.Online_CBC_SVD ': '?= True'}
>>> condor_command_dict_from_opts(["+Online_CBC_SVD=True", "TARGET.Online_CBC_SVD =?= True"], {"somecommand":"somevalue"})
{'somecommand': 'somevalue', '+Online_CBC_SVD': 'True', 'TARGET.Online_CBC_SVD ': '?= True'}
>>> condor_command_dict_from_opts(["+Online_CBC_SVD=True", "TARGET.Online_CBC_SVD =?= True"], {"+Online_CBC_SVD":"False"})
{'+Online_CBC_SVD': 'True', 'TARGET.Online_CBC_SVD ': '?= True'}
"""
if defaultdict is None:
defaultdict = {}
for o in opts:
osplit = o.split("=")
k = osplit[0]
v = "=".join(osplit[1:])
defaultdict.update([(k, v)])
return defaultdict
[docs]def pipeline_dot_py_append_opts_hack(opt, vals):
"""!
A way to work around the dictionary nature of pipeline.py which can
only record options once.
>>> pipeline_dot_py_append_opts_hack("my-favorite-option", [1,2,3])
'1 --my-favorite-option 2 --my-favorite-option 3'
"""
out = str(vals[0])
for v in vals[1:]:
out += " --%s %s" % (opt, str(v))
return out
#
# =============================================================================
#
# Segment utilities
#
# =============================================================================
#
[docs]def breakupseg(seg, maxextent, overlap):
if maxextent <= 0:
raise ValueError("maxextent must be positive, not %s" % repr(maxextent))
# Simple case of only one segment
if abs(seg) < maxextent:
return segments.segmentlist([seg])
# adjust maxextent so that segments are divided roughly equally
maxextent = max(int(abs(seg) / (int(abs(seg)) // int(maxextent) + 1)), overlap)
maxextent = int(math.ceil(abs(seg) / math.ceil(abs(seg) / maxextent)))
end = seg[1]
seglist = segments.segmentlist()
while abs(seg):
if (seg[0] + maxextent + overlap) < end:
seglist.append(segments.segment(seg[0], seg[0] + maxextent + overlap))
seg = segments.segment(seglist[-1][1] - overlap, seg[1])
else:
seglist.append(segments.segment(seg[0], end))
break
return seglist
[docs]def breakupsegs(seglist, maxextent, overlap):
newseglist = segments.segmentlist()
for bigseg in seglist:
newseglist.extend(breakupseg(bigseg, maxextent, overlap))
return newseglist
[docs]def breakupseglists(seglists, maxextent, overlap):
for instrument, seglist in seglists.iteritems():
newseglist = segments.segmentlist()
for bigseg in seglist:
newseglist.extend(breakupseg(bigseg, maxextent, overlap))
seglists[instrument] = newseglist
#
# =============================================================================
#
# File utilities
#
# =============================================================================
#
[docs]def cache_to_instruments(cache):
"""!
Given a cache, returns back a string containing all the IFOs that are
contained in each of its cache entries, sorted by IFO name.
"""
observatories = set()
for cache_entry in cache:
observatories.update(groups(cache_entry.observatory, 2))
return ''.join(sorted(list(observatories)))
[docs]def T050017_filename(instruments, description, seg, extension, path = None):
"""!
A function to generate a T050017 filename.
"""
if not isinstance(instruments, str):
instruments = "".join(sorted(instruments))
start, end = seg
start = int(math.floor(start))
try:
duration = int(math.ceil(end)) - start
# FIXME this is not a good way of handling this...
except OverflowError:
duration = 2000000000
extension = extension.strip('.')
if path is not None:
return '%s/%s-%s-%d-%d.%s' % (path, instruments, description, start, duration, extension)
else:
return '%s-%s-%d-%d.%s' % (instruments, description, start, duration, extension)
[docs]def group_T050017_filename_from_T050017_files(cache_entries, extension, path = None):
"""!
A function to return the name of a file created from multiple files following
the T050017 convention. In addition to the T050017 requirements, this assumes
that numbers relevant to organization schemes will be the first entry in the
description, e.g. 0_DIST_STATS, and that all files in a given cache file are
from the same group of ifos and either contain data from the same segment or
from the same background bin. Note, that each file doesn't have to be from
the same IFO, for example the template bank cache could contain template bank
files from H1 and template bank files from L1.
"""
# Check that every file has same observatory.
observatories = cache_to_instruments(cache_entries)
split_description = cache_entries[0].description.split('_')
min_bin = [x for x in split_description[:2] if x.isdigit()]
max_bin = [x for x in cache_entries[-1].description.split('_')[:2] if x.isdigit()]
seg = segments.segmentlist(cache_entry.segment for cache_entry in cache_entries).extent()
if min_bin:
min_bin = min_bin[0]
if max_bin:
max_bin = max_bin[-1]
if min_bin and (min_bin == max_bin or not max_bin):
# All files from same bin, thus segments may be different.
# Note that this assumes that if the last file in the cache
# does not start with a number that every file in the cache is
# from the same bin, an example of this is the cache file
# generated for gstlal_inspiral_calc_likelihood, which contains
# all of the DIST_STATS files from a given background bin and
# then CREATE_PRIOR_DIST_STATS files which are not generated
# for specific bins
return T050017_filename(observatories, cache_entries[0].description, seg, extension, path = path)
elif min_bin and max_bin and min_bin != max_bin:
if split_description[1].isdigit():
description_base = split_description[2:]
else:
description_base = split_description[1:]
# Files from different bins, thus segments must be same
return T050017_filename(observatories, '_'.join([min_bin, max_bin] + description_base), seg, extension, path = path)
else:
print("ERROR: first and last file of cache file do not match known pattern, cannot name group file under T050017 convention. \nFile 1: %s\nFile 2: %s" % (cache_entries[0].path, cache_entries[-1].path), file=sys.stderr)
raise ValueError
#
# =============================================================================
#
# Misc utilities
#
# =============================================================================
#
[docs]def groups(l, n):
"""!
Given a list, returns back sublists with a maximum size n.
"""
for i in range(0, len(l), n):
yield l[i:i+n]
[docs]def flatten(lst):
"""!
Flatten a list by one level of nesting.
"""
return list(itertools.chain.from_iterable(lst))
if __name__ == "__main__":
import doctest
doctest.testmod()