2This modules contains objects that make it simple for the user to
3create python scripts that build Condor DAGs to run code on the LSC
6This file is part of the Grid LSC User Environment (GLUE)
8GLUE is free software: you can redistribute it and/or modify it under the
9terms of the GNU General Public License as published by the Free Software
10Foundation, either version 3 of the License, or (at your option) any later
13This program is distributed in the hope that it will be useful, but WITHOUT
14ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
18You should have received a copy of the GNU General Public License along
with
19this program. If
not, see <http://www.gnu.org/licenses/>.
22from . import git_version
23__author__ = 'Duncan Brown <duncan@gravity.phys.uwm.edu>'
24__version__ = "git id %s" % git_version.id
25__date__ = git_version.date
27from collections import OrderedDict
29import igwn_segments as segments
38from hashlib import md5
41class CondorError(Exception):
42 """Error thrown by Condor Jobs"""
51class CondorSubmitError(CondorError):
74 Generic condor job class. Provides methods to set the options
in the
75 condor submit file
for a particular executable
77 def __init__(self, universe, executable, queue):
79 @param universe: the condor universe to run the job
in.
80 @param executable: the executable to run.
81 @param queue: number of jobs to queue.
108 Return the name of the executable for this job.
114 Set the name of the executable for this job.
120 Return the condor universe that the job will run in.
126 Set the condor universe for the job to run
in.
127 @param universe: the condor universe to run the job
in.
133 Return the grid type of the job.
139 Set the type of grid resource for the job.
140 @param grid_type: type of grid resource.
146 Return the grid server on which the job will run.
152 Set the grid server on which to run the job.
153 @param grid_server: grid server on which to run.
159 Return the grid scheduler.
165 Set the grid scheduler.
166 @param grid_scheduler: grid scheduler on which to run.
172 If executable installed is true, then no copying of the executable
is
173 done. If it
is false, pegasus stages the executable to the remote site.
174 Default
is executable
is installed (i.e.
True).
175 @param installed: true
or fale
181 return whether
or not the executable
is installed
187 Add a Condor command to the submit file (e.g. a class add or evironment).
188 @param cmd: Condor command directive.
189 @param value: value
for command.
195 Return the dictionary of condor keywords to add to the job
201 Add filename as a necessary input file
for this DAG node.
203 @param filename: input filename to add
210 Add filename as a output file
for this DAG node.
212 @param filename: output filename to add
219 Add filename as a checkpoint file
for this DAG job.
226 Return list of input files for this DAG node.
232 Return list of output files for this DAG node.
238 Return a list of checkpoint files for this DAG node
244 Add an argument to the executable. Arguments are appended after any
245 options and their order
is guaranteed.
246 @param arg: argument to add.
252 Add a file argument to the executable. Arguments are appended after any
253 options and their order
is guaranteed. Also adds the file name to the
254 list of required input data
for this job.
255 @param filename: file to add
as argument.
263 Return the list of arguments that are to be passed to the executable.
269 Add a command line option to the executable. The order that the arguments
270 will be appended to the command line is not guaranteed, but they will
271 always be added before any command line arguments. The name of the option
272 is prefixed
with double hyphen
and the program
is expected to parse it
274 @param opt: command line option to add.
275 @param value: value to
pass to the option (
None for no argument).
281 Returns the value associated with the given command line option.
282 Returns
None if the option does
not exist
in the options list.
283 @param opt: command line option
291 Add a command line option to the executable. The order that the arguments
292 will be appended to the command line is not guaranteed, but they will
293 always be added before any command line arguments. The name of the option
294 is prefixed
with double hyphen
and the program
is expected to parse it
296 @param opt: command line option to add.
297 @param filename: value to
pass to the option (
None for no argument).
305 Return the dictionary of opts for the job.
311 Add a command line option to the executable. The order that the arguments
312 will be appended to the command line is not guaranteed, but they will
313 always be added before any command line arguments. The name of the option
314 is prefixed
with single hyphen
and the program
is expected to parse it
315 with getopt()
or getopt_long() (
if a single character option),
or
316 getopt_long_only() (
if multiple characters). Long
and (single-character)
317 short options may be mixed
if the executable permits this.
318 @param opt: command line option to add.
319 @param value: value to
pass to the option (
None for no argument).
325 Return the dictionary of short options for the job.
331 Parse command line options from a given section
in an ini file
and
332 pass to the executable.
333 @param cp: ConfigParser object pointing to the ini file.
334 @param section: section of the ini file to add to the options.
336 for opt
in cp.options(section):
337 arg = str(cp.get(section,opt)).strip()
342 Set the email address to send notification to.
343 @param value: email address
or never
for no notification.
349 Set the Condor log file.
350 @param path: path to log file.
356 Set the file from which Condor directs the stdin of the job.
357 @param path: path to stdin file.
363 Get the file from which Condor directs the stdin of the job.
369 Set the file to which Condor directs the stderr of the job.
370 @param path: path to stderr file.
376 Get the file to which Condor directs the stderr of the job.
382 Set the file to which Condor directs the stdout of the job.
383 @param path: path to stdout file.
389 Get the file to which Condor directs the stdout of the job.
395 Set the name of the file to write the Condor submit file to when
397 @param path: path to submit file.
403 Get the name of the file which the Condor submit file will be
410 Write a submit file for this Condor job.
440 subfile.write(
'universe = ' + self.
__universe +
'\n' )
441 subfile.write(
'executable = ' + self.
__executable +
'\n' )
445 subfile.write(
'grid_resource = %s %s\n' % (self.
__grid_type,
448 subfile.write(
'grid_resource = %s %s %s\n' % (self.
__grid_type,
452 subfile.write(
'when_to_transfer_output = ON_EXIT\n')
453 subfile.write(
'transfer_output_files = $(macrooutput)\n')
454 subfile.write(
'transfer_input_files = $(macroinput)\n')
457 subfile.write(
'arguments = "' )
459 subfile.write(
' ' + c )
462 subfile.write(
' --' + c +
' ' + self.
__options[c] )
464 subfile.write(
' --' + c )
469 subfile.write(
' -' + c )
470 subfile.write(
' "\n' )
473 subfile.write( str(cmd) +
" = " + str(self.
__condor_cmds[cmd]) +
'\n' )
475 subfile.write(
'log = ' + self.
__log_file +
'\n' )
477 subfile.write(
'input = ' + self.
__in_file +
'\n' )
478 subfile.write(
'error = ' + self.
__err_file +
'\n' )
479 subfile.write(
'output = ' + self.
__out_file +
'\n' )
482 subfile.write(
'queue ' + str(self.
__queue) +
'\n' )
489 A Condor DAG job never notifies the user on completion and can have variable
490 options that are set
for a particular node
in the DAG. Inherits methods
493 def __init__(self, universe, executable):
495 universe = the condor universe to run the job in.
496 executable = the executable to run
in the DAG.
498 super(CondorDAGJob,self).__init__(universe, executable, 1)
499 CondorJob.set_notification(self, 'never')
509 Create a condor node from this job. This provides a basic interface to
510 the CondorDAGNode
class. Most jobs
in a workflow will subclass the
511 CondorDAGNode
class and overwrite this to give more details when
512 initializing the node. However, this will work fine
for jobs
with very simp
519 Set the grid site to run on. If not specified,
520 will
not give hint to Pegasus
528 Return the grid site for this node
534 Add a variable (or macro) option to the condor job. The option
is added
535 to the submit file
and a different argument to the option can be set
for
536 each node
in the DAG.
537 @param opt: name of option to add.
538 @param short: opt
is short
546 self.
add_opt(opt,
'$(macro' + macro +
')')
550 Add a condor command to the submit file that allows variable (macro)
551 arguments to be passes to the executable.
560 Add a command to the submit file to allow variable (macro) arguments
561 to be passed to the executable.
569 self.
__var_args.append(
"'$(macroargument%s)'" % str(arg_index))
571 self.
__var_args.append(
'$(macroargument%s)' % str(arg_index))
578 Condor DAGMan job class. Appropriate
for setting up DAGs to run within a
583 dag = the name of the condor dag file to run
584 dir = the diretory in which the dag file
is located
592 Create a condor node from this job. This provides a basic interface to
593 the CondorDAGManNode
class. Most jobs
in a workflow will subclass the
594 CondorDAGManNode
class and overwrite this to give more details when
595 initializing the node. However, this will work fine
for jobs
with very simp
602 Set the directory where the dag will be run
603 @param dir: the name of the directory where the dag will be run
609 Get the directory where the dag will be run
615 Set the email address to send notification to.
616 @param value: email address
or never
for no notification.
622 Return the name of the dag as the submit file name
for the
623 SUBDAG EXTERNAL command
in the uber-dag
629 Do nothing as there
is not need
for a sub file
with the
630 SUBDAG EXTERNAL command
in the uber-dag
636 Return the name of any associated dag file
643 A CondorDAGNode represents a node in the DAG. It corresponds to a particular
644 condor job (
and so a particular submit file). If the job has variable
645 (macro) options, they can be set here so each nodes executes
with the
650 @param job: the CondorJob that this node corresponds to.
652 if not isinstance(job, CondorDAGJob)
and \
653 not isinstance(job,CondorDAGManJob):
655 "A DAG node must correspond to a Condor DAG job or Condor DAGMan job")
675 if isinstance(job, CondorDAGJob)
and job.get_universe() ==
'standard':
681 t = str( int( time.time() * 1000 ) )
682 r = str( int( random.random() * 100000000000000000 ) )
683 a = str( self.__class__ )
684 self.
__name = md5((t + r + a).encode()).hexdigest()
692 Return the CondorJob that this node is associated
with.
698 Sets the name of the pre script that is executed before the DAG node
is
700 @param script: path to script
706 Adds an argument to the pre script that is executed before the DAG node
is
713 Sets the name of the post script that is executed before the DAG node
is
715 @param script: path to script
721 returns the name of the post script that is executed before the DAG node
is
728 Adds an argument to the post script that is executed before the DAG node
is
735 Returns and array of arguments to the post script that
is executed before
742 Set the name for this node
in the DAG.
748 Get the name for this node
in the DAG.
754 Set the category for this node
in the DAG.
760 Get the category for this node
in the DAG.
766 Set the priority for this node
in the DAG.
772 Get the priority for this node
in the DAG.
778 Add filename as a necessary input file
for this DAG node.
780 @param filename: input filename to add
784 if not isinstance(self.
job(), CondorDAGManJob):
785 if self.
job().get_universe() ==
'grid':
790 Add filename as a output file
for this DAG node.
792 @param filename: output filename to add
796 if not isinstance(self.
job(), CondorDAGManJob):
797 if self.
job().get_universe() ==
'grid':
802 Add filename as a checkpoint file
for this DAG node
803 @param filename: checkpoint filename to add
807 if not isinstance(self.
job(), CondorDAGManJob):
808 if self.
job().get_universe() ==
'grid':
813 Return list of input files for this DAG node
and its job.
816 if isinstance(self.
job(), CondorDAGJob):
822 Return list of output files for this DAG node
and its job.
825 if isinstance(self.
job(), CondorDAGJob):
831 Return a list of checkpoint files for this DAG node
and its job.
834 if isinstance(self.
job(), CondorDAGJob):
836 return checkpoint_files
840 Set the name of the VDS group key when generating a DAX
841 @param group: name of group
for thus nore
847 Returns the VDS group key for this node
853 Add a variable (macro) for this node. This can be different
for
854 each node
in the DAG, even
if they use the same CondorJob. Within
855 the CondorJob, the value of the macro can be referenced
as
856 '$(name)' --
for instance, to define a unique output
or error file
858 @param name: macro name.
859 @param value: value of the macro
for this node
in the DAG
862 self.
__opts[macro] = value
866 Add a variable (macro) for storing the input/output files associated
868 @param io: macroinput
or macrooutput
869 @param filename: filename of input/output file
873 self.
__opts[io] = filename
875 if filename
not in self.
__opts[io]:
876 self.
__opts[io] +=
',%s' % filename
880 Add a variable (macro) for storing the input files associated
with
882 @param filename: filename of input file
888 Add a variable (macro) for storing the output files associated
with
890 @param filename: filename of output file
899 Return the opts for this node. Note that this returns only
900 the options
for this instance of the node
and not those
901 associated
with the underlying job template.
907 Add a variable (macro) condor command for this node. If the command
908 specified does
not exist
in the CondorJob, it
is added so the submit file
910 PLEASE NOTE: AS
with other add_var commands, the variable must be set
for
911 all nodes that use the CondorJob instance.
912 @param command: command name
913 @param value: Value of the command
for this node
in the DAG/DAX.
916 self.
__macros[
'macro' + macro] = value
921 Add a variable (macro) option for this node. If the option
922 specified does
not exist
in the CondorJob, it
is added so the submit
923 file will be correct when written.
924 @param opt: option name.
925 @param value: value of the option
for this node
in the DAG.
926 @param short: opt
is short
929 self.
__opts[
'macro' + macro] = value
932 def add_file_opt(self,opt,filename,file_is_output_file=False):
934 Add a variable (macro) option for this node. If the option
935 specified does
not exist
in the CondorJob, it
is added so the submit
936 file will be correct when written. The value of the option
is also
937 added to the list of input files
for the DAX.
938 @param opt: option name.
939 @param filename: value of the option
for this node
in the DAG.
940 @param file_is_output_file: A boolean
if the file will be an output file
941 instead of an input file. The default
is to have it be an input.
949 Add a variable (or macro) argument to the condor job. The argument
is
950 added to the submit file
and a different value of the argument can be set
951 for each node
in the DAG.
952 @param arg: name of option to add.
961 Add a variable (or macro) file name argument to the condor job. The
962 argument
is added to the submit file
and a different value of the
963 argument can be set
for each node
in the DAG. The file name
is also
964 added to the list of input files
for the DAX.
965 @param filename: name of option to add.
972 Return the arguments for this node. Note that this returns
973 only the arguments
for this instance of the node
and not those
974 associated
with the underlying job template.
980 Set the number of times that this node in the DAG should retry.
981 @param retry: number of times to retry node.
987 Return the number of times that this node in the DAG should retry.
993 Write the DAG entry for this node
's job to the DAG file descriptor.
994 @param fh: descriptor of open DAG file.
996 if isinstance(self.
job(),CondorDAGManJob):
999 [
'SUBDAG EXTERNAL', self.
__name, self.
__job.get_sub_file()]) )
1000 if self.
job().get_dag_directory():
1001 fh.write(
' DIR ' + self.
job().get_dag_directory() )
1004 fh.write(
'JOB ' + self.
__name +
' ' + self.
__job.get_sub_file() )
1007 fh.write(
'RETRY ' + self.
__name +
' ' + str(self.
__retry) +
'\n' )
1011 Write the DAG entry for this node
's category to the DAG file descriptor.
1012 @param fh: descriptor of open DAG file.
1018 Write the DAG entry for this node
's priority to the DAG file descriptor.
1019 @param fh: descriptor of open DAG file.
1025 Write the variable (macro) options and arguments to the DAG file
1027 @param fh: descriptor of open DAG file.
1030 fh.write(
'VARS ' + self.
__name )
1032 fh.write(
' ' + str(k) +
'="' + str(self.
__macros[k]) +
'"' )
1033 for k
in self.
__opts.keys():
1034 fh.write(
' ' + str(k) +
'="' + str(self.
__opts[k]) +
'"' )
1037 fh.write(
' macroargument' + str(i) +
'="' + self.
__args[i] +
'"' )
1042 Write the parent/child relations for this job to the DAG file descriptor.
1043 @param fh: descriptor of open DAG file.
1046 fh.write(
'PARENT ' +
" ".join((str(p)
for p
in self.
__parents)) +
' CHILD ' + str(self) +
'\n' )
1050 Write the pre script for the job,
if there
is one
1051 @param fh: descriptor of open DAG file.
1054 fh.write(
'SCRIPT PRE ' + str(self) +
' ' + self.
__pre_script +
' ' +
1059 Write the post script for the job,
if there
is one
1060 @param fh: descriptor of open DAG file.
1063 fh.write(
'SCRIPT POST ' + str(self) +
' ' + self.
__post_script +
' ' +
1068 Write as a comment into the DAG file the list of input files
1071 @param fh: descriptor of open DAG file.
1074 fh.write(
"## Job %s requires input file %s\n" % (self.
__name, f))
1078 Write as a comment into the DAG file the list of output files
1081 @param fh: descriptor of open DAG file.
1084 fh.write(
"## Job %s generates output file %s\n" % (self.
__name, f))
1088 Set the Condor log file to be used by this CondorJob.
1089 @param log: path of Condor log file.
1095 Add a parent to this node. This node will not be executed until the
1096 parent node has run sucessfully.
1097 @param node: CondorDAGNode to add
as a parent.
1099 if not isinstance(node, (CondorDAGNode,CondorDAGManNode) ):
1105 Return a list of tuples containg the command line arguments
1109 pat = re.compile(
r'\$\((.+)\)')
1110 argpat = re.compile(
r'\d+')
1121 arg_index = int(argpat.findall(a)[0])
1123 cmd_list.append((
"%s" % macros[arg_index],
""))
1127 cmd_list.append((
"%s" % a,
""))
1140 cmd_list.append((
"--%s" % k, str(value)))
1142 cmd_list.append((
"--%s" % k, str(val)))
1145 options = self.
job().get_short_opts()
1154 cmd_list.append((
"-%s" % k, str(value)))
1156 cmd_list.append((
"-%s" % k, str(val)))
1162 Return the full command line that will be used when this node
1168 for argument
in cmd_list:
1169 cmd +=
' '.join(argument) +
" "
1175 The finalize method of a node is called before the node
is
1176 finally added to the DAG
and can be overridden to do any last
1177 minute clean up (such
as setting extra command line arguments)
1184 Condor DAGMan node class. Appropriate
for setting up DAGs to run within a
1185 DAG. Adds the user-tag functionality to condor_dagman processes running
in
1186 the DAG. May also be used to extend dagman-node specific functionality.
1190 @param job: a CondorDAGNodeJob
1192 super(CondorDAGManNode,self).__init__(job)
1199 Set the user tag that is passed to the analysis code.
1200 @param usertag: the user tag to identify the job
1206 Returns the usertag string
1212 Add a category to this DAG called categoryName with a maxjobs of maxJobsNum.
1213 @param categoryName: category name
1214 @param maxJobsNum: max jobs num
1220 Return an array of tuples containing (categoryName,maxJobsNum)
1226 Set the type of job clustering pegasus can use to collapse jobs
1227 @param cluster: clustering type
1233 Returns the usertag string
1240 A CondorDAG is a Condor Directed Acyclic Graph that describes a collection
1241 of Condor jobs
and the order
in which to run them. All Condor jobs
in the
1242 DAG must write their Codor logs to the same file.
1243 NOTE: The log file must
not be on an NFS mounted system
as the Condor jobs
1244 must be able to get an exclusive file lock on the log file.
1248 @param log: path to log file which must
not be on an NFS mounted file system.
1261 Return a list containing all the nodes in the DAG
1267 Return a list containing all the jobs in the DAG
1273 Use integer node names for the DAG
1279 Set the name of the file into which the DAG is written.
1280 @param path: path to DAG file.
1286 Return the path to the DAG file.
1295 Add a CondorDAGNode to this DAG. The CondorJob that the node uses is
1296 also added to the list of Condor jobs
in the DAG so that a list of the
1297 submit files needed by the DAG can be maintained. Each unique CondorJob
1298 will be added once to prevent duplicate submit files being written.
1299 @param node: CondorDAGNode to add to the CondorDAG.
1301 if not isinstance(node, CondorDAGNode):
1302 raise CondorDAGError(
"Nodes must be class CondorDAGNode or subclass")
1303 if not isinstance(node.job(), CondorDAGManJob):
1309 if node.job()
not in self.
__jobs:
1310 self.
__jobs.append(node.job())
1314 Add a category to this DAG called categoryName with a maxjobs of maxJobsNum.
1315 @param categoryName: category name
1316 @param maxJobsNum: max jobs num
1322 Return an array of tuples containing (categoryName,maxJobsNum)
1328 Write the DAG entry for this category
's maxjobs to the DAG file descriptor.
1329 @param fh: descriptor of open DAG file.
1330 @param category: tuple containing type of jobs to set a maxjobs limit
for
1331 and the maximum number of jobs of that type to run at once.
1333 fh.write('MAXJOBS ' + str(category[0]) +
' ' + str(category[1]) +
'\n')
1337 Write all the submit files used by the dag to disk. Each submit file is
1338 written to the file name set
in the CondorJob.
1344 job.write_sub_file()
1348 Write all the nodes in the DAG to the DAG file.
1357 node.write_job(dagfile)
1358 node.write_vars(dagfile)
1359 if node.get_category():
1360 node.write_category(dagfile)
1361 if node.get_priority():
1362 node.write_priority(dagfile)
1363 node.write_pre_script(dagfile)
1364 node.write_post_script(dagfile)
1365 node.write_input_files(dagfile)
1366 node.write_output_files(dagfile)
1368 node.write_parents(dagfile)
1384 Write the workflow to a script (.sh instead of .dag).
1386 Assuming that parents were added to the DAG before their children,
1387 dependencies should be handled correctly.
1393 outfilename =
".".join(dfp.split(
".")[:-1]) +
".sh"
1394 outfile = open(outfilename,
"w")
1399 outfile.write(
"# Job %s\n" % node.get_name())
1401 if isinstance(node,CondorDAGManNode):
1402 outfile.write(
"condor_submit_dag %s\n\n" % (node.job().get_dag()))
1404 outfile.write(
"%s %s\n\n" % (node.job().get_executable(),
1405 node.get_cmd_line()))
1408 os.chmod(outfilename, os.stat(outfilename)[0] | stat.S_IEXEC)
1413 Describes a generic analysis job that filters LIGO data as configured by
1418 @param cp: ConfigParser object that contains the configuration
for this job.
1422 self.
__channel = str(self.
__cp.get(
'input',
'channel')).strip()
1428 Get the configration variable in a particular section of this jobs ini
1430 @param sec: ini file section.
1431 @param opt: option
from section sec.
1433 return str(self.
__cp.get(sec,opt)).strip()
1437 Set the name of the channel that this job is filtering. This will
1438 overwrite the value obtained at initialization.
1444 Returns the name of the channel that this job is filtering. Note that
1445 channel
is defined to be IFO independent, so this may be LSC-AS_Q
or
1446 IOO-MC_F. The IFO
is set on a per node basis,
not a per job basis.
1453 Contains the methods that allow an object to be built to analyse LIGO
1454 data in a Condor DAG.
1470 self.
__LHO2k = re.compile(
r'H2')
1471 self.
__user_tag = self.job().get_opts().get(
"user-tag",
None)
1473 def set_start(self,time,pass_to_command_line=True):
1475 Set the GPS start time of the analysis node by setting a --gps-start-time
1476 option to the node when it is executed.
1477 @param time: GPS start time of job.
1478 @param pass_to_command_line: add gps-start-time
as variable option.
1480 if pass_to_command_line:
1481 self.add_var_opt(
'gps-start-time',time)
1489 Get the GPS start time of the node.
1493 def set_end(self,time,pass_to_command_line=True):
1495 Set the GPS end time of the analysis node by setting a --gps-end-time
1496 option to the node when it is executed.
1497 @param time: GPS end time of job.
1498 @param pass_to_command_line: add gps-end-time
as variable option.
1500 if pass_to_command_line:
1501 self.add_var_opt(
'gps-end-time',time)
1507 Get the GPS end time of the node.
1513 Set the GPS start time of the data needed by this analysis node.
1514 @param time: GPS start time of job.
1520 Get the GPS start time of the data needed by this node.
1526 Set the GPS start time of the data needed by this analysis node.
1533 Get the GPS start time of the data needed by this node.
1539 Set the GPS end time of the data needed by this analysis node.
1540 @param time: GPS end time of job.
1546 Get the GPS end time of the data needed by this node.
1552 Set the trig start time of the analysis node by setting a
1553 --trig-start-time option to the node when it is executed.
1554 @param time: trig start time of job.
1555 @param pass_to_command_line: add trig-start-time
as a variable option.
1557 if pass_to_command_line:
1558 self.add_var_opt(
'trig-start-time',time)
1563 Get the trig start time of the node.
1569 Set the trig end time of the analysis node by setting a --trig-end-time
1570 option to the node when it is executed.
1571 @param time: trig end time of job.
1572 @param pass_to_command_line: add trig-end-time
as a variable option.
1574 if pass_to_command_line:
1575 self.add_var_opt(
'trig-end-time',time)
1580 Get the trig end time of the node.
1584 def set_input(self,filename,pass_to_command_line=True):
1586 Add an input to the node by adding a --input option.
1587 @param filename: option argument to
pass as input.
1588 @param pass_to_command_line: add input
as a variable option.
1591 if pass_to_command_line:
1592 self.add_var_opt(
'input', filename)
1593 self.add_input_file(filename)
1597 Get the file that will be passed as input.
1601 def set_output(self,filename,pass_to_command_line=True):
1603 Add an output to the node by adding a --output option.
1604 @param filename: option argument to
pass as output.
1605 @param pass_to_command_line: add output
as a variable option.
1608 if pass_to_command_line:
1609 self.add_var_opt(
'output', filename)
1610 self.add_output_file(filename)
1614 Get the file that will be passed as output.
1620 Set the ifo name to analyze. If the channel name for the job
is defined,
1621 then the name of the ifo
is prepended to the channel name obtained
1622 from the job configuration file
and passed
with a --channel-name option.
1623 @param ifo: two letter ifo code (e.g. L1, H1
or H2).
1626 if self.job().channel():
1627 self.add_var_opt(
'channel-name', ifo +
':' + self.job().channel())
1631 Returns the two letter IFO code for this node.
1635 def set_ifo_tag(self,ifo_tag,pass_to_command_line=True):
1637 Set the ifo tag that is passed to the analysis code.
1638 @param ifo_tag: a string to identify one
or more IFOs
1639 @param pass_to_command_line: add ifo-tag
as a variable option.
1642 if pass_to_command_line:
1643 self.add_var_opt(
'ifo-tag', ifo_tag)
1647 Returns the IFO tag string
1651 def set_user_tag(self,usertag,pass_to_command_line=True):
1653 Set the user tag that is passed to the analysis code.
1654 @param usertag: the user tag to identify the job
1655 @param pass_to_command_line: add user-tag
as a variable option.
1658 if pass_to_command_line:
1659 self.add_var_opt(
'user-tag', usertag)
1663 Returns the usertag string
1669 Set the LAL frame cache to to use. The frame cache is passed to the job
1670 with the --frame-cache argument.
1671 @param filename: calibration file to use.
1673 if isinstance( filename, str ):
1675 self.add_var_opt(
'frame-cache', filename)
1676 self.add_input_file(filename)
1677 elif isinstance( filename, list ):
1679 self.add_var_opt(
'glob-frame-data',
' ')
1682 if len(filename) == 0:
1684 "LDR did not return any LFNs for query: check ifo and frame type")
1685 for lfn
in filename:
1686 a, b, c, d = lfn.split(
'.')[0].split(
'-')
1688 t_end = int(c) + int(d)
1691 self.add_input_file(lfn)
1693 self.add_var_opt(
'frame-type', b)
1699 Determine the path to the correct calibration cache file to use.
1702 cal_path = self.job().get_config(
'calibration',
'path')
1708 self.job().get_config(
'calibration',
'H2-cal-epoch-boundary')):
1709 cal_file = self.job().get_config(
'calibration',
'H2-1')
1711 cal_file = self.job().get_config(
'calibration',
'H2-2')
1714 cal_file = self.job().get_config(
'calibration',self.
__ifo)
1716 cal = os.path.join(cal_path,cal_file)
1719 msg =
"IFO and start-time must be set first"
1724 Set the path to the calibration cache file for the given IFO.
1725 During S2 the Hanford 2km IFO had two calibration epochs, so
1726 if the start time
is during S2, we use the correct cache file.
1739 Return the calibration cache file to be used by the
1747 An AnalysisChunk is the unit of data that a node works
with, usually some
1748 subset of a ScienceSegment.
1750 def __init__(self, start, end, trig_start=0, trig_end=0):
1752 @param start: GPS start time of the chunk.
1753 @param end: GPS end time of the chunk.
1754 @param trig_start: GPS time at which to start generating triggers
1755 @param trig_end: GPS time at which to stop generating triggers
1765 return '<AnalysisChunk: start %d, end %d, trig_start %d, trig_end %d>' % (
1768 return '<AnalysisChunk: start %d, end %d, trig_start %d>' % (
1771 return '<AnalysisChunk: start %d, end %d, trig_end %d>' % (
1774 return '<AnalysisChunk: start %d, end %d>' % (self.
__start, self.
__end)
1778 Returns the length of data for which this AnalysisChunk will produce
1779 triggers (
in seconds).
1797 Returns the GPS start time of the chunk.
1803 Returns the GPS end time of the chunk.
1809 Returns the length (duration) of the chunk in seconds.
1815 Return the first GPS time at which triggers for this chunk should be
1822 Return the last GPS time at which triggers for this chunk should be
1829 Set the first GPS time at which triggers for this chunk should be
1836 Set the last GPS time at which triggers for this chunk should be
1844 A ScienceSegment is a period of time where the experimenters determine
1845 that the inteferometer
is in a state where the data
is suitable
for
1846 scientific analysis. A science segment can have a list of AnalysisChunks
1847 asscociated
with it that
break the segment up into (possibly overlapping)
1848 smaller time intervals
for analysis.
1852 @param segment: a tuple containing the (segment id, gps start time, gps end
1853 time, duration) of the segment.
1855 self.__id = segment[0]
1857 self.__end = segment[2]
1858 self.__dur = segment[3]
1866 Allows iteration over and direct access to the AnalysisChunks contained
1867 in this ScienceSegment.
1869 if i < 0:
raise IndexError(
"list index out of range")
1874 Returns the number of AnalysisChunks contained in this ScienceSegment.
1879 return '<ScienceSegment: id %d, start %d, end %d, dur %d, unused %d>' % (
1884 ScienceSegments are compared by the GPS start time of the segment.
1886 return cmp(self.
start(),other.start())
1888 def make_chunks(self,length=0,overlap=0,play=0,sl=0,excl_play=0,pad_data=0):
1890 Divides the science segment into chunks of length seconds overlapped by
1891 overlap seconds. If the play option is set, only chunks that contain S2
1892 playground data are generated. If the user has a more complicated way
1893 of generating chunks, this method should be overriden
in a sub-
class.
1894 Any data at the end of the ScienceSegment that
is too short to contain a
1895 chunk
is ignored. The length of this unused data
is stored
and can be
1896 retrieved
with the
unused() method.
1897 @param length: length of chunk
in seconds.
1898 @param overlap: overlap between chunks
in seconds.
1899 @param play: 1 : only generate chunks that overlap
with S2 playground data.
1900 2 :
as play = 1 plus compute trig start
and end times to
1901 coincide
with the start/end of the playground
1902 @param sl: slide by sl seconds before determining playground data.
1903 @param excl_play: exclude the first excl_play second
from the start
and end
1904 of the chunk when computing
if the chunk overlaps
with playground.
1905 @param pad_data: exclude the first
and last pad_data seconds of the segment
1906 when generating chunks
1908 time_left = self.dur() - (2 * pad_data)
1909 start = self.start() + pad_data
1910 increment = length - overlap
1911 while time_left >= length:
1912 end = start + length
1913 if (
not play)
or (play
and (((end - sl - excl_play - 729273613) % 6370)
1914 < (600 + length - 2 * excl_play))):
1917 play_start = 729273613 + 6370 * \
1918 math.floor((end - sl - excl_play - 729273613) / 6370)
1919 play_end = play_start + 600
1922 if ( (play_end - 6370) > start ):
1923 print(
"Two playground segments in this chunk:", end=
' ')
1924 print(
" Code to handle this case has not been implemented")
1927 if play_start > start:
1928 trig_start = int(play_start)
1930 trig_end = int(play_end)
1935 time_left -= increment
1936 self.
__unused = time_left - overlap
1938 def add_chunk(self, start, end, trig_start=0, trig_end=0):
1940 Add an AnalysisChunk to the list associated with this ScienceSegment.
1941 @param start: GPS start time of chunk.
1942 @param end: GPS end time of chunk.
1943 @param trig_start: GPS start time
for triggers
from chunk
1944 @param trig_end: trig_end
1950 Returns the length of data in the science segment
not used to make chunks.
1956 Set the length of data in the science segment
not used to make chunks.
1962 Returns the ID of this ScienceSegment.
1968 Returns the GPS start time of this ScienceSegment.
1974 Returns the GPS end time of this ScienceSegment.
1980 Override the GPS start time (and set the duration) of this ScienceSegment.
1981 @param t: new GPS start time.
1988 Override the GPS end time (and set the duration) of this ScienceSegment.
1989 @param t: new GPS end time.
1996 Returns the length (duration) in seconds of this ScienceSegment.
2002 Set the DataFind node associated with this ScienceSegment to df_node.
2003 @param df_node: the DataFind node
for this ScienceSegment.
2009 Returns the DataFind node for this ScienceSegment.
2016 An object that can contain all the science data used in an analysis. Can
2017 contain multiple ScienceSegments
and has a method to generate these
from
2018 a text file produces by the LIGOtools segwizard program.
2026 Allows direct access to or iteration over the ScienceSegments associated
2027 with the ScienceData.
2032 return '<ScienceData: file %s>' % self.
__filename
2036 Returns the number of ScienceSegments associated with the ScienceData.
2040 def read(self,filename,min_length,slide_sec=0,buffer=0):
2042 Parse the science segments from the segwizard output contained
in file.
2043 @param filename: input text file containing a list of science segments generated by
2045 @param min_length: only append science segments that are longer than min_length.
2046 @param slide_sec: Slide each ScienceSegment by::
2049 [s, e] -> [s+delta, e].
2051 [s, e] -> [s, e-delta].
2053 @param buffer: shrink the ScienceSegment::
2055 [s, e] -> [s+buffer, e-buffer]
2058 octothorpe = re.compile(r'\A#')
2059 for line
in open(filename):
2060 if not octothorpe.match(line)
and int(line.split()[3]) >= min_length:
2061 (id, st, en, du) = list(map(int, line.split()))
2068 du -= abs(slide_sec)
2074 du -= 2 * abs(buffer)
2085 Parse the science segments from a tama list of locked segments contained
in
2087 @param filename: input text file containing a list of tama segments.
2090 for line
in open(filename):
2091 columns = line.split()
2092 id = int(columns[0])
2093 start = int(math.ceil(float(columns[3])))
2094 end = int(math.floor(float(columns[4])))
2100 def make_chunks(self, length, overlap=0, play=0, sl=0, excl_play=0, pad_data=0):
2102 Divide each ScienceSegment contained in this object into AnalysisChunks.
2103 @param length: length of chunk
in seconds.
2104 @param overlap: overlap between segments.
2105 @param play:
if true, only generate chunks that overlap
with S2 playground
2107 @param sl: slide by sl seconds before determining playground data.
2108 @param excl_play: exclude the first excl_play second
from the start
and end
2109 of the chunk when computing
if the chunk overlaps
with playground.
2110 @param pad_data: exclude the first
and last pad_data seconds of the segment
2111 when generating chunks
2114 seg.make_chunks(length,overlap,play,sl,excl_play,pad_data)
2117 sl=0,excl_play=0,pad_data=0):
2119 Create an extra chunk that uses up the unused data in the science segment.
2120 @param length: length of chunk
in seconds.
2121 @param trig_overlap: length of time start generating triggers before the
2122 start of the unused data.
2124 - 1 : only generate chunks that overlap
with S2 playground data.
2125 - 2 :
as 1 plus compute trig start
and end times to coincide
2126 with the start/end of the playground
2127 @param min_length: the unused data must be greater than min_length to make a
2129 @param sl: slide by sl seconds before determining playground data.
2130 @param excl_play: exclude the first excl_play second
from the start
and end
2131 of the chunk when computing
if the chunk overlaps
with playground.
2132 @param pad_data: exclude the first
and last pad_data seconds of the segment
2133 when generating chunks
2138 if seg.unused() > min_length:
2139 end = seg.end() - pad_data
2140 start = end - length
2141 if (
not play)
or (play
and (((end - sl - excl_play - 729273613) % 6370)
2142 < (600 + length - 2 * excl_play))):
2143 trig_start = end - seg.unused() - trig_overlap
2146 play_start = 729273613 + 6370 * \
2147 math.floor((end - sl - excl_play - 729273613) / 6370)
2148 play_end = play_start + 600
2150 if ( (play_end - 6370) > start ):
2151 print(
"Two playground segments in this chunk")
2152 print(
" Code to handle this case has not been implemented")
2155 if play_start > trig_start:
2156 trig_start = int(play_start)
2157 if (play_end < end):
2158 trig_end = int(play_end)
2159 if (trig_end == 0)
or (trig_end > trig_start):
2160 seg.add_chunk(start, end, trig_start, trig_end)
2162 seg.add_chunk(start, end, trig_start)
2166 self,min_length,overlap=0,play=0,sl=0,excl_play=0):
2168 Create a chunk that uses up the unused data in the science segment
2169 @param min_length: the unused data must be greater than min_length to make a
2171 @param overlap: overlap between chunks
in seconds.
2172 @param play:
if true, only generate chunks that overlap
with S2 playground data.
2173 @param sl: slide by sl seconds before determining playground data.
2174 @param excl_play: exclude the first excl_play second
from the start
and end
2175 of the chunk when computing
if the chunk overlaps
with playground.
2178 if seg.unused() > min_length:
2179 start = seg.end() - seg.unused() - overlap
2181 length = start - end
2182 if (
not play)
or (play
and (((end - sl - excl_play - 729273613) % 6370)
2183 < (600 + length - 2 * excl_play))):
2184 seg.add_chunk(start, end, start)
2189 Splits ScienceSegments up into chunks, of a given maximum length.
2190 The length of the last two chunks are chosen so that the data
2191 utilisation is optimised.
2192 @param min_length: minimum chunk length.
2193 @param max_length: maximum chunk length.
2194 @param pad_data: exclude the first
and last pad_data seconds of the
2195 segment when generating chunks
2199 seg_start = seg.start() + pad_data
2200 seg_end = seg.end() - pad_data
2202 if seg.unused() > max_length:
2204 N = (seg_end - seg_start) / max_length
2207 for i
in range(N - 1):
2208 start = seg_start + (i * max_length)
2209 stop = start + max_length
2210 seg.add_chunk(start, stop)
2213 start = seg_start + ((N - 1) * max_length)
2214 middle = (start + seg_end) / 2
2215 seg.add_chunk(start, middle)
2216 seg.add_chunk(middle, seg_end)
2218 elif seg.unused() > min_length:
2220 seg.add_chunk(seg_start, seg_end)
2227 Replaces the ScienceSegments contained in this instance of ScienceData
2228 with the intersection of those
in the instance other. Returns the number
2229 of segments
in the intersection.
2230 @param other: ScienceData to use to generate the intersection
2241 start1 = seg1.start()
2246 while start2 < stop1:
2268 if iseg2 < len(other):
2270 start2 = seg2.start()
2281 def union(self, other):
2283 Replaces the ScienceSegments contained in this instance of ScienceData
2284 with the union of those
in the instance other. Returns the number of
2285 ScienceSegments
in the union.
2286 @param other: ScienceData to use to generate the intersection
2291 length2 = len(other)
2308 start1 = self[i1].start()
2309 stop1 = self[i1].
end()
2318 start2 = other[i2].start()
2319 stop2 = other[i2].
end()
2324 if start1 > -1
and ( start2 == -1
or start1 <= start2):
2342 elif ustart <= ostop:
2367 Coalesces any adjacent ScienceSegments. Returns the number of
2368 ScienceSegments in the coalesced list.
2408 Inverts the ScienceSegments in the
class (i.e. set NOT). Returns the
2409 number of ScienceSegments after inversion.
2423 if start < 0
or stop < start
or start < ostart:
2430 if ostart < 1999999999:
2431 x =
ScienceSegment(tuple([0, ostart, 1999999999, 1999999999 - ostart]))
2439 Keep only times in ScienceSegments which are
in the playground
2445 begin_s2 = 729273613
2455 play_start = begin_s2 + play_space * ( 1
2456 + int((start - begin_s2 - play_len) / play_space) )
2458 while play_start < stop:
2459 if play_start > start:
2464 play_stop = play_start + play_len
2466 if play_stop < stop:
2475 play_start = play_start + play_space
2483 Intersection routine for three inputs. Built out of the intersect,
2484 coalesce
and play routines
2493 Intersection routine for four inputs.
2501 def split(self, dt):
2503 Split the segments in the list
is subsegments at least
as long
as dt
2512 tmpstop = start + dt
2515 elif tmpstop + dt > stop:
2516 tmpstop = int( (start + stop) / 2 )
2533 self.
cache = {
'gwf':
None,
'sft':
None,
'xml':
None}
2537 for type
in self.
cache.keys():
2538 self.
cache[type] = {}
2540 def group(self, lst, n):
2542 Group an iterable into an n-tuples iterable. Incomplete
2543 tuples are discarded
2545 return itertools.izip(*[itertools.islice(lst, i,
None, n)
for i
in range(n)])
2547 def parse(self,type_regex=None):
2549 Each line of the frame cache file is like the following:
2551 /frames/E13/LHO/frames/hoftMon_H1/H-H1_DMT_C00_L2-9246,H,H1_DMT_C00_L2,1,16 1240664820 6231 {924600000 924646720 924646784 924647472 924647712 924700000}
2553 The description
is as follows:
2555 1.1) Directory path of files
2558 1.4) Number of frames
in the files (assumed to be 1)
2559 1.5) Duration of the frame files.
2561 2) UNIX timestamp
for directory modification time.
2563 3) Number of files that that match the above pattern
in the directory.
2565 4) List of time range
or segments [start, stop)
2567 We store the cache
for each site
and frameType combination
2568 as a dictionary where the keys are (directory, duration)
2569 tuples
and the values are segment lists.
2571 Since the cache file
is already coalesced we do
not
2572 have to call the coalesce method on the segment lists.
2577 type_filter = re.compile(type_regex)
2589 if type_filter
and type_filter.search(line)
is None:
2593 header, modTime, fileCount, times = line.strip().split(
' ', 3)
2594 dir, site, frameType, frameCount, duration = header.split(
',')
2595 duration = int(duration)
2601 times = [ int(s)
for s
in times[1:-1].split(
' ') ]
2604 segs = [ segments.segment(a)
for a
in self.
group(times, 2) ]
2607 if site
not in gwfDict:
2611 if frameType
not in gwfDict[site]:
2612 gwfDict[site][frameType] = {}
2615 key = (dir, duration)
2616 if key
in gwfDict[site][frameType]:
2617 msg =
"The combination %s is not unique in the frame cache file" \
2619 raise RuntimeError(msg)
2621 gwfDict[site][frameType][key] = segments.segmentlist(segs)
2624 cache[
'gwf'] = gwfDict
2626 def get_lfns(self, site, frameType, gpsStart, gpsEnd):
2633 if site
not in cache[
'gwf']:
2637 if frameType
not in cache[
'gwf'][site]:
2641 search = segments.segment(gpsStart, gpsEnd)
2644 searchlist = segments.segmentlist([search])
2649 for key,seglist
in cache[
'gwf'][site][frameType].items():
2653 overlap = seglist.intersects(searchlist)
2655 if not overlap:
continue
2661 if s.intersects(search):
2663 times = range(t1, t2, dur)
2667 if search.intersects(segments.segment(t, t + dur)):
2668 lfn =
"%s-%s-%d-%d.gwf" % (site, frameType, t, dur)
2672 lfns = list(lfnDict.keys())
2680 An LSCdataFind job used to locate data. The static options are
2681 read from the section [datafind]
in the ini file. The stdout
from
2682 LSCdataFind contains the paths to the frame files
and is directed to a file
2683 in the cache directory named by site
and GPS start
and end times. The stderr
2684 is directed to the logs directory. The job always runs
in the scheduler
2685 universe. The path to the executable
is determined
from the ini file.
2687 def __init__(self,cache_dir,log_dir,config_file,lsync_cache_file=None,lsync_type_regex=None):
2689 @param cache_dir: the directory to write the output lal cache files to.
2690 @param log_dir: the directory to write the stderr file to.
2691 @param config_file: ConfigParser object containing the path to the LSCdataFind executable
in the [condor] section
and a [datafind] section
from which the LSCdataFind options are read.
2692 @param lsync_cache_file: lsync_cache_file
2693 @param lsync_type_regex: lsync_type_regex
2698 AnalysisJob.__init__(self,config_file)
2702 if config_file.has_option(
'condor',
'accounting_group'):
2703 self.
add_condor_cmd(
'accounting_group',config_file.get(
'condor',
'accounting_group'))
2704 if lsync_cache_file:
2710 opt = str(o).strip()
2711 if opt[:4] !=
"type":
2717 self.
add_opt(
'url-type',
'file')
2721 self.
set_stderr_file(os.path.join(log_dir,
'datafind-$(macroobservatory)-$(macrotype)-$(macrogpsstarttime)-$(macrogpsendtime)-$(cluster)-$(process).err'))
2722 self.
set_stdout_file(os.path.join(log_dir,
'datafind-$(macroobservatory)-$(macrotype)-$(macrogpsstarttime)-$(macrogpsendtime)-$(cluster)-$(process).out'))
2727 returns the directroy that the cache files are written to.
2733 return the configuration file object
2743 A DataFindNode runs an instance of LSCdataFind in a Condor DAG.
2747 @param job: A CondorDAGJob that can run an instance of LALdataFind.
2749 CondorDAGNode.__init__(self,job)
2750 AnalysisNode.__init__(self)
2760 self.
set_type(self.
job().get_config_file().get(
'datafind',
'type'))
2764 def __set_output(self):
2766 Private method to set the file to write the cache to. Automaticaly set
2767 once the ifo, start and end times have been set.
2775 Set the start time of the datafind query.
2776 @param time: GPS start time of query.
2780 self.
add_var_opt(
'gps-start-time', int(time) - int(pad))
2788 Return the start time of the datafind query
2794 Set the end time of the datafind query.
2795 @param time: GPS end time of query.
2803 Return the start time of the datafind query
2809 Set the IFO to retrieve data for. Since the data
from both Hanford
2810 interferometers
is stored
in the same frame file, this takes the first
2811 letter of the IFO (e.g. L
or H)
and passes it to the --observatory option
2813 @param obs: IFO to obtain data
for.
2821 Return the start time of the datafind query
2827 sets the frame type that we are querying
2835 gets the frame type that we are querying
2844 Return the output file, i.e. the file containing the frame cache data.
2845 or the files itself
as tuple (
for DAX)
2852 A ligolw_add job can be used to concatenate several ligo lw files
2856 cp = ConfigParser object from which options are read.
2861 AnalysisJob.__init__(self,cp)
2865 if cp.has_option(
'condor',
'accounting_group'):
2866 self.
add_condor_cmd(
'accounting_group',cp.get(
'condor',
'accounting_group'))
2868 self.
set_stdout_file(os.path.join( log_dir,
'ligolw_add-$(cluster)-$(process).out') )
2869 self.
set_stderr_file(os.path.join( log_dir,
'ligolw_add-$(cluster)-$(process).err') )
2875 Runs an instance of ligolw_add in a Condor DAG.
2879 @param job: A CondorDAGJob that can run an instance of ligolw_add
2881 CondorDAGNode.__init__(self,job)
2882 AnalysisNode.__init__(self)
2887 A ligolw_cut job can be used to remove parts of a ligo lw file
2891 cp = ConfigParser object from which options are read.
2896 AnalysisJob.__init__(self,cp)
2900 self.
set_stdout_file(os.path.join( log_dir,
'ligolw_cut-$(cluster)-$(process).out') )
2901 self.
set_stderr_file(os.path.join( log_dir,
'ligolw_cut-$(cluster)-$(process).err') )
2907 Runs an instance of ligolw_cut in a Condor DAG.
2911 @param job: A CondorDAGJob that can run an instance of ligolw_cut
2913 CondorDAGNode.__init__(self,job)
2914 AnalysisNode.__init__(self)
2919 A Noop Job does nothing.
2923 cp = ConfigParser object from which options are read.
2928 AnalysisJob.__init__(self,cp)
2932 if cp.has_option(
'condor',
'accounting_group'):
2933 self.
add_condor_cmd(
'accounting_group',cp.get(
'condor',
'accounting_group'))
2935 self.
set_stdout_file(os.path.join( log_dir,
'noop-$(cluster)-$(process).out') )
2936 self.
set_stderr_file(os.path.join( log_dir,
'noop-$(cluster)-$(process).err') )
2942 Run an noop job in a Condor DAG.
2946 @param job: A CondorDAGJob that does nothing.
2948 CondorDAGNode.__init__(self,job)
2949 AnalysisNode.__init__(self)
2959 A cbc sqlite job adds to CondorDAGJob and AnalysisJob features common to jobs
2960 which read
or write to a sqlite database. Of note, the universe
is always set to
2961 local regardless of what
's in the cp file, the extension is set
2962 to None so that it may be set by individual SqliteNodes, log files do
not
2963 have macrogpsstarttime
and endtime
in them,
and get_env
is set to
True.
2965 def __init__(self, cp, sections, exec_name):
2967 @param cp: a ConfigParser object
from which options are read
2968 @param sections: list of sections
in cp to get added options
2969 @param exec_name: the name of the sql executable
2972 executable = cp.get('condor', exec_name)
2973 universe =
'vanilla'
2974 CondorDAGJob.__init__(self, universe, executable)
2975 AnalysisJob.__init__(self, cp)
2977 for sec
in sections:
2978 if cp.has_section(sec):
2981 sys.stderr.write(
"warning: config file is missing section [" + sec +
"]\n")
2984 if cp.has_option(
'condor',
'accounting_group'):
2985 self.
add_condor_cmd(
'accounting_group',cp.get(
'condor',
'accounting_group'))
2986 self.
set_stdout_file(
'logs/' + exec_name +
'-$(cluster)-$(process).out')
2987 self.
set_stderr_file(
'logs/' + exec_name +
'-$(cluster)-$(process).err')
2991 Set the exec_name name
2997 Get the exec_name name
3004 A cbc sqlite node adds to the standard AnalysisNode features common to nodes
3005 which read or write to a sqlite database. Specifically, it adds the set_tmp_space_path
3006 and set_database methods.
3010 @param job: an Sqlite job
3012 CondorDAGNode.__init__(self, job)
3013 AnalysisNode.__init__(self)
3019 Sets temp-space path. This should be on a local disk.
3020 @param tmp_space: tmp_space
3027 Gets tmp-space path.
3033 Sets database option.
3034 @param database: database
3041 Gets database option.
3048 A LigolwSqlite job. The static options are read from the
3049 section [ligolw_sqlite]
in the ini file.
3053 @param cp: ConfigParser object
from which options are read.
3055 exec_name = 'ligolw_sqlite'
3056 sections = [
'ligolw_sqlite']
3057 super(LigolwSqliteJob,self).
__init__(cp, sections, exec_name)
3061 Sets the --replace option. This will cause the job
3062 to overwrite existing databases rather than add to them.
3069 A LigolwSqlite node.
3073 @param job: a LigolwSqliteJob
3075 super(LigolwSqliteNode,self).__init__(job)
3083 @param input_cache: input_cache
3096 Sets xml input file instead of cache
3097 @param xml_file: xml_file
3103 Tell ligolw_sqlite to dump the contents of the database to a file.
3104 @param xml_file: xml_file
3107 raise ValueError(
"no database specified")
3113 Override standard get_output to return xml-file
if xml-file
is specified.
3114 Otherwise, will
return database.
3121 raise ValueError(
"no output xml file or database specified")
3126 The standard SafeConfigParser no longer supports deepcopy() as of python
3127 2.7 (see http://bugs.python.org/issue16058). This subclass restores that
3133 from io
import StringIO
3134 config_string = StringIO()
3135 self.write(config_string)
3136 config_string.seek(0)
3137 new_config = self.__class__()
3138 new_config.readfp(config_string)
static int cmp(REAL4Sequence *a, REAL4Sequence *b)
An AnalysisChunk is the unit of data that a node works with, usually some subset of a ScienceSegment.
def dur(self)
Returns the length (duration) of the chunk in seconds.
def set_trig_end(self, end)
Set the last GPS time at which triggers for this chunk should be generated.
def end(self)
Returns the GPS end time of the chunk.
def set_trig_start(self, start)
Set the first GPS time at which triggers for this chunk should be generated.
def __len__(self)
Returns the length of data for which this AnalysisChunk will produce triggers (in seconds).
def trig_start(self)
Return the first GPS time at which triggers for this chunk should be generated.
def start(self)
Returns the GPS start time of the chunk.
def __init__(self, start, end, trig_start=0, trig_end=0)
def trig_end(self)
Return the last GPS time at which triggers for this chunk should be generated.
Describes a generic analysis job that filters LIGO data as configured by an ini file.
def set_channel(self, channel)
Set the name of the channel that this job is filtering.
def channel(self)
Returns the name of the channel that this job is filtering.
def get_config(self, sec, opt)
Get the configration variable in a particular section of this jobs ini file.
Contains the methods that allow an object to be built to analyse LIGO data in a Condor DAG.
def set_cache(self, filename)
Set the LAL frame cache to to use.
def set_data_start(self, time)
Set the GPS start time of the data needed by this analysis node.
def set_input(self, filename, pass_to_command_line=True)
Add an input to the node by adding a –input option.
def set_trig_start(self, time, pass_to_command_line=True)
Set the trig start time of the analysis node by setting a –trig-start-time option to the node when it...
def set_user_tag(self, usertag, pass_to_command_line=True)
Set the user tag that is passed to the analysis code.
def get_end(self)
Get the GPS end time of the node.
def get_trig_start(self)
Get the trig start time of the node.
def get_ifo_tag(self)
Returns the IFO tag string.
def get_calibration(self)
Return the calibration cache file to be used by the DAG.
def set_output(self, filename, pass_to_command_line=True)
Add an output to the node by adding a –output option.
def set_start(self, time, pass_to_command_line=True)
Set the GPS start time of the analysis node by setting a –gps-start-time option to the node when it i...
def get_user_tag(self)
Returns the usertag string.
def set_trig_end(self, time, pass_to_command_line=True)
Set the trig end time of the analysis node by setting a –trig-end-time option to the node when it is ...
def set_ifo_tag(self, ifo_tag, pass_to_command_line=True)
Set the ifo tag that is passed to the analysis code.
def calibration_cache_path(self)
Determine the path to the correct calibration cache file to use.
def get_start(self)
Get the GPS start time of the node.
def get_ifo(self)
Returns the two letter IFO code for this node.
def set_pad_data(self, pad)
Set the GPS start time of the data needed by this analysis node.
def set_end(self, time, pass_to_command_line=True)
Set the GPS end time of the analysis node by setting a –gps-end-time option to the node when it is ex...
def set_ifo(self, ifo)
Set the ifo name to analyze.
def calibration(self)
Set the path to the calibration cache file for the given IFO.
def get_input(self)
Get the file that will be passed as input.
def get_data_end(self)
Get the GPS end time of the data needed by this node.
def get_output(self)
Get the file that will be passed as output.
def get_data_start(self)
Get the GPS start time of the data needed by this node.
def set_data_end(self, time)
Set the GPS end time of the data needed by this analysis node.
def get_pad_data(self)
Get the GPS start time of the data needed by this node.
def get_trig_end(self)
Get the trig end time of the node.
A CondorDAG is a Condor Directed Acyclic Graph that describes a collection of Condor jobs and the ord...
def get_nodes(self)
Return a list containing all the nodes in the DAG.
def write_script(self)
Write the workflow to a script (.sh instead of .dag).
def add_maxjobs_category(self, categoryName, maxJobsNum)
Add a category to this DAG called categoryName with a maxjobs of maxJobsNum.
def set_integer_node_names(self)
Use integer node names for the DAG.
def write_maxjobs(self, fh, category)
Write the DAG entry for this category's maxjobs to the DAG file descriptor.
def set_dag_file(self, path)
Set the name of the file into which the DAG is written.
def add_node(self, node)
Add a CondorDAGNode to this DAG.
def get_maxjobs_categories(self)
Return an array of tuples containing (categoryName,maxJobsNum)
def write_concrete_dag(self)
Write all the nodes in the DAG to the DAG file.
def write_sub_files(self)
Write all the submit files used by the dag to disk.
def get_dag_file(self)
Return the path to the DAG file.
def get_jobs(self)
Return a list containing all the jobs in the DAG.
def write_dag(self)
Write a dag.
A Condor DAG job never notifies the user on completion and can have variable options that are set for...
def get_grid_site(self)
Return the grid site for this node.
def add_var_arg(self, arg_index, quote=False)
Add a command to the submit file to allow variable (macro) arguments to be passed to the executable.
def add_var_condor_cmd(self, command)
Add a condor command to the submit file that allows variable (macro) arguments to be passes to the ex...
def add_var_opt(self, opt, short=False)
Add a variable (or macro) option to the condor job.
def create_node(self)
Create a condor node from this job.
def set_grid_site(self, site)
Set the grid site to run on.
def __init__(self, universe, executable)
universe = the condor universe to run the job in.
def set_dag_directory(self, dir)
Set the directory where the dag will be run.
def set_notification(self, value)
Set the email address to send notification to.
def get_sub_file(self)
Return the name of the dag as the submit file name for the SUBDAG EXTERNAL command in the uber-dag.
def create_node(self)
Create a condor node from this job.
def get_dag_directory(self)
Get the directory where the dag will be run.
def get_dag(self)
Return the name of any associated dag file.
def write_sub_file(self)
Do nothing as there is not need for a sub file with the SUBDAG EXTERNAL command in the uber-dag.
def __init__(self, dag, dir=None)
dag = the name of the condor dag file to run dir = the diretory in which the dag file is located
Condor DAGMan node class.
def get_user_tag(self)
Returns the usertag string.
def get_maxjobs_categories(self)
Return an array of tuples containing (categoryName,maxJobsNum)
def get_cluster_jobs(self)
Returns the usertag string.
def set_cluster_jobs(self, cluster)
Set the type of job clustering pegasus can use to collapse jobs.
def set_user_tag(self, usertag)
Set the user tag that is passed to the analysis code.
def add_maxjobs_category(self, categoryName, maxJobsNum)
Add a category to this DAG called categoryName with a maxjobs of maxJobsNum.
A CondorDAGNode represents a node in the DAG.
def get_post_script_arg(self)
Returns and array of arguments to the post script that is executed before the DAG node is run.
def set_priority(self, priority)
Set the priority for this node in the DAG.
def set_vds_group(self, group)
Set the name of the VDS group key when generating a DAX.
def add_var_arg(self, arg, quote=False)
Add a variable (or macro) argument to the condor job.
def set_pre_script(self, script)
Sets the name of the pre script that is executed before the DAG node is run.
def get_output_files(self)
Return list of output files for this DAG node and its job.
def set_log_file(self, log)
Set the Condor log file to be used by this CondorJob.
def get_input_files(self)
Return list of input files for this DAG node and its job.
def add_output_file(self, filename)
Add filename as a output file for this DAG node.
def add_io_macro(self, io, filename)
Add a variable (macro) for storing the input/output files associated with this node.
def get_checkpoint_files(self)
Return a list of checkpoint files for this DAG node and its job.
def add_input_macro(self, filename)
Add a variable (macro) for storing the input files associated with this node.
def finalize(self)
The finalize method of a node is called before the node is finally added to the DAG and can be overri...
def get_cmd_tuple_list(self)
Return a list of tuples containg the command line arguments.
def get_retry(self)
Return the number of times that this node in the DAG should retry.
def get_category(self)
Get the category for this node in the DAG.
def write_parents(self, fh)
Write the parent/child relations for this job to the DAG file descriptor.
def set_retry(self, retry)
Set the number of times that this node in the DAG should retry.
def write_input_files(self, fh)
Write as a comment into the DAG file the list of input files for this DAG node.
def add_output_macro(self, filename)
Add a variable (macro) for storing the output files associated with this node.
def add_checkpoint_macro(self, filename)
def get_args(self)
Return the arguments for this node.
def add_var_condor_cmd(self, command, value)
Add a variable (macro) condor command for this node.
def job(self)
Return the CondorJob that this node is associated with.
def write_category(self, fh)
Write the DAG entry for this node's category to the DAG file descriptor.
def get_opts(self)
Return the opts for this node.
def write_pre_script(self, fh)
Write the pre script for the job, if there is one.
def get_vds_group(self)
Returns the VDS group key for this node.
def get_post_script(self)
returns the name of the post script that is executed before the DAG node is run.
def add_file_opt(self, opt, filename, file_is_output_file=False)
Add a variable (macro) option for this node.
def write_priority(self, fh)
Write the DAG entry for this node's priority to the DAG file descriptor.
def add_macro(self, name, value)
Add a variable (macro) for this node.
def add_post_script_arg(self, arg)
Adds an argument to the post script that is executed before the DAG node is run.
def write_vars(self, fh)
Write the variable (macro) options and arguments to the DAG file descriptor.
def set_name(self, name)
Set the name for this node in the DAG.
def get_priority(self)
Get the priority for this node in the DAG.
def write_post_script(self, fh)
Write the post script for the job, if there is one.
def write_job(self, fh)
Write the DAG entry for this node's job to the DAG file descriptor.
def add_input_file(self, filename)
Add filename as a necessary input file for this DAG node.
def set_post_script(self, script)
Sets the name of the post script that is executed before the DAG node is run.
def add_file_arg(self, filename)
Add a variable (or macro) file name argument to the condor job.
def add_checkpoint_file(self, filename)
Add filename as a checkpoint file for this DAG node.
def get_cmd_line(self)
Return the full command line that will be used when this node is run by DAGman.
def write_output_files(self, fh)
Write as a comment into the DAG file the list of output files for this DAG node.
def set_category(self, category)
Set the category for this node in the DAG.
def get_name(self)
Get the name for this node in the DAG.
def add_pre_script_arg(self, arg)
Adds an argument to the pre script that is executed before the DAG node is run.
def add_parent(self, node)
Add a parent to this node.
def add_var_opt(self, opt, value, short=False)
Add a variable (macro) option for this node.
Error thrown by Condor Jobs.
def __init__(self, args=None)
Generic condor job class.
def add_short_opt(self, opt, value)
Add a command line option to the executable.
def get_universe(self)
Return the condor universe that the job will run in.
def get_grid_scheduler(self)
Return the grid scheduler.
def get_input_files(self)
Return list of input files for this DAG node.
def set_stderr_file(self, path)
Set the file to which Condor directs the stderr of the job.
def add_opt(self, opt, value)
Add a command line option to the executable.
def get_stdout_file(self)
Get the file to which Condor directs the stdout of the job.
def set_log_file(self, path)
Set the Condor log file.
def get_executable(self)
Return the name of the executable for this job.
def add_output_file(self, filename)
Add filename as a output file for this DAG node.
def get_opts(self)
Return the dictionary of opts for the job.
def get_condor_cmds(self)
Return the dictionary of condor keywords to add to the job.
def set_notification(self, value)
Set the email address to send notification to.
def get_opt(self, opt)
Returns the value associated with the given command line option.
def set_stdout_file(self, path)
Set the file to which Condor directs the stdout of the job.
def set_executable_installed(self, installed)
If executable installed is true, then no copying of the executable is done.
def get_stderr_file(self)
Get the file to which Condor directs the stderr of the job.
def get_stdin_file(self)
Get the file from which Condor directs the stdin of the job.
def set_universe(self, universe)
Set the condor universe for the job to run in.
def __init__(self, universe, executable, queue)
def set_grid_server(self, grid_server)
Set the grid server on which to run the job.
def add_checkpoint_file(self, filename)
Add filename as a checkpoint file for this DAG job.
def add_condor_cmd(self, cmd, value)
Add a Condor command to the submit file (e.g.
def add_ini_opts(self, cp, section)
Parse command line options from a given section in an ini file and pass to the executable.
def add_file_opt(self, opt, filename)
Add a command line option to the executable.
def get_args(self)
Return the list of arguments that are to be passed to the executable.
def get_short_opts(self)
Return the dictionary of short options for the job.
def get_output_files(self)
Return list of output files for this DAG node.
def set_stdin_file(self, path)
Set the file from which Condor directs the stdin of the job.
def get_checkpoint_files(self)
Return a list of checkpoint files for this DAG node.
def write_sub_file(self)
Write a submit file for this Condor job.
def set_grid_scheduler(self, grid_scheduler)
Set the grid scheduler.
def get_sub_file(self)
Get the name of the file which the Condor submit file will be written to when write_sub_file() is cal...
def get_grid_type(self)
Return the grid type of the job.
def set_grid_type(self, grid_type)
Set the type of grid resource for the job.
def get_executable_installed(self)
return whether or not the executable is installed
def set_sub_file(self, path)
Set the name of the file to write the Condor submit file to when write_sub_file() is called.
def get_grid_server(self)
Return the grid server on which the job will run.
def add_input_file(self, filename)
Add filename as a necessary input file for this DAG node.
def add_arg(self, arg)
Add an argument to the executable.
def add_file_arg(self, filename)
Add a file argument to the executable.
def set_executable(self, executable)
Set the name of the executable for this job.
The standard SafeConfigParser no longer supports deepcopy() as of python 2.7 (see http://bugs....
def __deepcopy__(self, memo)
An LSCdataFind job used to locate data.
def __init__(self, cache_dir, log_dir, config_file, lsync_cache_file=None, lsync_type_regex=None)
def get_cache_dir(self)
returns the directroy that the cache files are written to.
def get_config_file(self)
return the configuration file object
A DataFindNode runs an instance of LSCdataFind in a Condor DAG.
def set_type(self, type)
sets the frame type that we are querying
def get_start(self)
Return the start time of the datafind query.
def get_output(self)
Return the output file, i.e.
def get_end(self)
Return the start time of the datafind query.
def get_observatory(self)
Return the start time of the datafind query.
def set_observatory(self, obs)
Set the IFO to retrieve data for.
def get_output_cache(self)
def get_type(self)
gets the frame type that we are querying
def set_start(self, time, pad=None)
Set the start time of the datafind query.
def set_end(self, time)
Set the end time of the datafind query.
A ligolw_add job can be used to concatenate several ligo lw files.
def __init__(self, log_dir, cp)
cp = ConfigParser object from which options are read.
Runs an instance of ligolw_add in a Condor DAG.
A ligolw_cut job can be used to remove parts of a ligo lw file.
def __init__(self, log_dir, cp)
cp = ConfigParser object from which options are read.
Runs an instance of ligolw_cut in a Condor DAG.
def set_replace(self)
Sets the –replace option.
def get_output(self)
Override standard get_output to return xml-file if xml-file is specified.
def get_input_cache(self)
Gets input cache.
def set_xml_output(self, xml_file)
Tell ligolw_sqlite to dump the contents of the database to a file.
def set_input_cache(self, input_cache)
Sets input cache.
def set_xml_input(self, xml_file)
Sets xml input file instead of cache.
def group(self, lst, n)
Group an iterable into an n-tuples iterable.
def parse(self, type_regex=None)
Each line of the frame cache file is like the following:
def get_lfns(self, site, frameType, gpsStart, gpsEnd)
def __init__(self, log_dir, cp)
cp = ConfigParser object from which options are read.
Run an noop job in a Condor DAG.
An object that can contain all the science data used in an analysis.
def make_chunks(self, length, overlap=0, play=0, sl=0, excl_play=0, pad_data=0)
Divide each ScienceSegment contained in this object into AnalysisChunks.
def __len__(self)
Returns the number of ScienceSegments associated with the ScienceData.
def make_optimised_chunks(self, min_length, max_length, pad_data=0)
Splits ScienceSegments up into chunks, of a given maximum length.
def coalesce(self)
Coalesces any adjacent ScienceSegments.
def intersection(self, other)
Replaces the ScienceSegments contained in this instance of ScienceData with the intersection of those...
def read(self, filename, min_length, slide_sec=0, buffer=0)
Parse the science segments from the segwizard output contained in file.
def __getitem__(self, i)
Allows direct access to or iteration over the ScienceSegments associated with the ScienceData.
def split(self, dt)
Split the segments in the list is subsegments at least as long as dt.
def play(self)
Keep only times in ScienceSegments which are in the playground.
def union(self, other)
Replaces the ScienceSegments contained in this instance of ScienceData with the union of those in the...
def intersect_3(self, second, third)
Intersection routine for three inputs.
def make_short_chunks_from_unused(self, min_length, overlap=0, play=0, sl=0, excl_play=0)
Create a chunk that uses up the unused data in the science segment.
def make_chunks_from_unused(self, length, trig_overlap, play=0, min_length=0, sl=0, excl_play=0, pad_data=0)
Create an extra chunk that uses up the unused data in the science segment.
def tama_read(self, filename)
Parse the science segments from a tama list of locked segments contained in file.
def append_from_tuple(self, seg_tuple)
def intersect_4(self, second, third, fourth)
Intersection routine for four inputs.
def invert(self)
Inverts the ScienceSegments in the class (i.e.
A ScienceSegment is a period of time where the experimenters determine that the inteferometer is in a...
def set_df_node(self, df_node)
Set the DataFind node associated with this ScienceSegment to df_node.
def __init__(self, segment)
def set_unused(self, unused)
Set the length of data in the science segment not used to make chunks.
def __cmp__(self, other)
ScienceSegments are compared by the GPS start time of the segment.
def make_chunks(self, length=0, overlap=0, play=0, sl=0, excl_play=0, pad_data=0)
Divides the science segment into chunks of length seconds overlapped by overlap seconds.
def unused(self)
Returns the length of data in the science segment not used to make chunks.
def end(self)
Returns the GPS end time of this ScienceSegment.
def set_start(self, t)
Override the GPS start time (and set the duration) of this ScienceSegment.
def set_end(self, t)
Override the GPS end time (and set the duration) of this ScienceSegment.
def start(self)
Returns the GPS start time of this ScienceSegment.
def id(self)
Returns the ID of this ScienceSegment.
def __len__(self)
Returns the number of AnalysisChunks contained in this ScienceSegment.
def __getitem__(self, i)
Allows iteration over and direct access to the AnalysisChunks contained in this ScienceSegment.
def dur(self)
Returns the length (duration) in seconds of this ScienceSegment.
def add_chunk(self, start, end, trig_start=0, trig_end=0)
Add an AnalysisChunk to the list associated with this ScienceSegment.
def get_df_node(self)
Returns the DataFind node for this ScienceSegment.
def __init__(self, args=None)
A cbc sqlite job adds to CondorDAGJob and AnalysisJob features common to jobs which read or write to ...
def get_exec_name(self)
Get the exec_name name.
def __init__(self, cp, sections, exec_name)
def set_exec_name(self, exec_name)
Set the exec_name name.
A cbc sqlite node adds to the standard AnalysisNode features common to nodes which read or write to a...
def get_database(self)
Gets database option.
def get_tmp_space(self)
Gets tmp-space path.
def set_database(self, database)
Sets database option.
def set_tmp_space(self, tmp_space)
Sets temp-space path.