Loading [MathJax]/extensions/TeX/AMSsymbols.js
LAL 7.7.0.1-5e288d3
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Modules Pages
pipeline.py
Go to the documentation of this file.
1"""
2This modules contains objects that make it simple for the user to
3create python scripts that build Condor DAGs to run code on the LSC
4Data Grid.
5
6This file is part of the Grid LSC User Environment (GLUE)
7
8GLUE is free software: you can redistribute it and/or modify it under the
9terms of the GNU General Public License as published by the Free Software
10Foundation, either version 3 of the License, or (at your option) any later
11version.
12
13This program is distributed in the hope that it will be useful, but WITHOUT
14ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
16details.
17
18You should have received a copy of the GNU General Public License along with
19this program. If not, see <http://www.gnu.org/licenses/>.
20"""
21
22from . import git_version
23__author__ = 'Duncan Brown <duncan@gravity.phys.uwm.edu>'
24__version__ = "git id %s" % git_version.id
25__date__ = git_version.date
26
27from collections import OrderedDict
28import configparser
29import igwn_segments as segments
30import itertools
31import math
32import os
33import random
34import re
35import stat
36import sys
37import time
38from hashlib import md5
39
40
41class CondorError(Exception):
42 """Error thrown by Condor Jobs"""
43 def __init__(self, args=None):
44 self.args = args
45
46
48 pass
49
50
51class CondorSubmitError(CondorError):
52 pass
53
54
56 pass
57
58
60 pass
61
62
64 pass
65
66
67class SegmentError(Exception):
68 def __init__(self, args=None):
69 self.args = args
70
71
72class CondorJob(object):
73 """
74 Generic condor job class. Provides methods to set the options in the
75 condor submit file for a particular executable
76 """
77 def __init__(self, universe, executable, queue):
78 """
79 @param universe: the condor universe to run the job in.
80 @param executable: the executable to run.
81 @param queue: number of jobs to queue.
82 """
83 self.__universe = universe
84 self.__executable = executable
85 self.__queue = queue
86
87 # These are set by methods in the class
88 self.__options = {}
89 self.__short_options = {}
90 self.__arguments = []
91 self.__condor_cmds = OrderedDict()
92 self.__notification = None
93 self.__log_file = None
94 self.__in_file = None
95 self.__err_file = None
96 self.__out_file = None
97 self.__sub_file_path = None
98 self.__output_files = []
99 self.__input_files = []
100 self.__checkpoint_files = []
101 self.__grid_type = None
102 self.__grid_server = None
103 self.__grid_scheduler = None
104 self.__executable_installed = True
105
106 def get_executable(self):
107 """
108 Return the name of the executable for this job.
109 """
110 return self.__executable
111
112 def set_executable(self, executable):
113 """
114 Set the name of the executable for this job.
115 """
116 self.__executable = executable
117
118 def get_universe(self):
119 """
120 Return the condor universe that the job will run in.
121 """
122 return self.__universe
123
124 def set_universe(self, universe):
125 """
126 Set the condor universe for the job to run in.
127 @param universe: the condor universe to run the job in.
128 """
129 self.__universe = universe
130
131 def get_grid_type(self):
132 """
133 Return the grid type of the job.
134 """
135 return self.__grid_type
136
137 def set_grid_type(self, grid_type):
138 """
139 Set the type of grid resource for the job.
140 @param grid_type: type of grid resource.
141 """
142 self.__grid_type = grid_type
143
144 def get_grid_server(self):
145 """
146 Return the grid server on which the job will run.
147 """
148 return self.__grid_server
149
150 def set_grid_server(self, grid_server):
151 """
152 Set the grid server on which to run the job.
153 @param grid_server: grid server on which to run.
154 """
155 self.__grid_server = grid_server
156
157 def get_grid_scheduler(self):
158 """
159 Return the grid scheduler.
160 """
161 return self.__grid_scheduler
162
163 def set_grid_scheduler(self, grid_scheduler):
164 """
165 Set the grid scheduler.
166 @param grid_scheduler: grid scheduler on which to run.
167 """
168 self.__grid_scheduler = grid_scheduler
169
170 def set_executable_installed(self,installed):
171 """
172 If executable installed is true, then no copying of the executable is
173 done. If it is false, pegasus stages the executable to the remote site.
174 Default is executable is installed (i.e. True).
175 @param installed: true or fale
176 """
177 self.__executable_installed = installed
178
179 def get_executable_installed(self):
180 """
181 return whether or not the executable is installed
182 """
183 return self.__executable_installed
184
185 def add_condor_cmd(self, cmd, value):
186 """
187 Add a Condor command to the submit file (e.g. a class add or evironment).
188 @param cmd: Condor command directive.
189 @param value: value for command.
190 """
191 self.__condor_cmds[cmd] = value
192
193 def get_condor_cmds(self):
194 """
195 Return the dictionary of condor keywords to add to the job
196 """
197 return self.__condor_cmds
198
199 def add_input_file(self, filename):
200 """
201 Add filename as a necessary input file for this DAG node.
202
203 @param filename: input filename to add
204 """
205 if filename not in self.__input_files:
206 self.__input_files.append(filename)
207
208 def add_output_file(self, filename):
209 """
210 Add filename as a output file for this DAG node.
211
212 @param filename: output filename to add
213 """
214 if filename not in self.__output_files:
215 self.__output_files.append(filename)
216
217 def add_checkpoint_file(self, filename):
218 """
219 Add filename as a checkpoint file for this DAG job.
220 """
221 if filename not in self.__checkpoint_files:
222 self.__checkpoint_files.append(filename)
223
224 def get_input_files(self):
225 """
226 Return list of input files for this DAG node.
227 """
228 return self.__input_files
229
230 def get_output_files(self):
231 """
232 Return list of output files for this DAG node.
233 """
234 return self.__output_files
235
236 def get_checkpoint_files(self):
237 """
238 Return a list of checkpoint files for this DAG node
239 """
240 return self.__checkpoint_files
241
242 def add_arg(self, arg):
243 """
244 Add an argument to the executable. Arguments are appended after any
245 options and their order is guaranteed.
246 @param arg: argument to add.
247 """
248 self.__arguments.append(arg)
249
250 def add_file_arg(self, filename):
251 """
252 Add a file argument to the executable. Arguments are appended after any
253 options and their order is guaranteed. Also adds the file name to the
254 list of required input data for this job.
255 @param filename: file to add as argument.
256 """
257 self.__arguments.append(filename)
258 if filename not in self.__input_files:
259 self.__input_files.append(filename)
260
261 def get_args(self):
262 """
263 Return the list of arguments that are to be passed to the executable.
264 """
265 return self.__arguments
266
267 def add_opt(self, opt, value):
268 """
269 Add a command line option to the executable. The order that the arguments
270 will be appended to the command line is not guaranteed, but they will
271 always be added before any command line arguments. The name of the option
272 is prefixed with double hyphen and the program is expected to parse it
273 with getopt_long().
274 @param opt: command line option to add.
275 @param value: value to pass to the option (None for no argument).
276 """
277 self.__options[opt] = value
278
279 def get_opt( self, opt):
280 """
281 Returns the value associated with the given command line option.
282 Returns None if the option does not exist in the options list.
283 @param opt: command line option
284 """
285 if opt in self.__options:
286 return self.__options[opt]
287 return None
288
289 def add_file_opt(self, opt, filename):
290 """
291 Add a command line option to the executable. The order that the arguments
292 will be appended to the command line is not guaranteed, but they will
293 always be added before any command line arguments. The name of the option
294 is prefixed with double hyphen and the program is expected to parse it
295 with getopt_long().
296 @param opt: command line option to add.
297 @param filename: value to pass to the option (None for no argument).
298 """
299 self.__options[opt] = filename
300 if filename not in self.__input_files:
301 self.__input_files.append(filename)
302
303 def get_opts(self):
304 """
305 Return the dictionary of opts for the job.
306 """
307 return self.__options
308
309 def add_short_opt(self, opt, value):
310 """
311 Add a command line option to the executable. The order that the arguments
312 will be appended to the command line is not guaranteed, but they will
313 always be added before any command line arguments. The name of the option
314 is prefixed with single hyphen and the program is expected to parse it
315 with getopt() or getopt_long() (if a single character option), or
316 getopt_long_only() (if multiple characters). Long and (single-character)
317 short options may be mixed if the executable permits this.
318 @param opt: command line option to add.
319 @param value: value to pass to the option (None for no argument).
320 """
321 self.__short_options[opt] = value
322
323 def get_short_opts(self):
324 """
325 Return the dictionary of short options for the job.
326 """
327 return self.__short_options
328
329 def add_ini_opts(self, cp, section):
330 """
331 Parse command line options from a given section in an ini file and
332 pass to the executable.
333 @param cp: ConfigParser object pointing to the ini file.
334 @param section: section of the ini file to add to the options.
335 """
336 for opt in cp.options(section):
337 arg = str(cp.get(section,opt)).strip()
338 self.__options[opt] = arg
339
340 def set_notification(self, value):
341 """
342 Set the email address to send notification to.
343 @param value: email address or never for no notification.
344 """
345 self.__notification = value
346
347 def set_log_file(self, path):
348 """
349 Set the Condor log file.
350 @param path: path to log file.
351 """
352 self.__log_file = path
353
354 def set_stdin_file(self, path):
355 """
356 Set the file from which Condor directs the stdin of the job.
357 @param path: path to stdin file.
358 """
359 self.__in_file = path
360
361 def get_stdin_file(self):
362 """
363 Get the file from which Condor directs the stdin of the job.
364 """
365 return self.__in_file
366
367 def set_stderr_file(self, path):
368 """
369 Set the file to which Condor directs the stderr of the job.
370 @param path: path to stderr file.
371 """
372 self.__err_file = path
373
374 def get_stderr_file(self):
375 """
376 Get the file to which Condor directs the stderr of the job.
377 """
378 return self.__err_file
379
380 def set_stdout_file(self, path):
381 """
382 Set the file to which Condor directs the stdout of the job.
383 @param path: path to stdout file.
384 """
385 self.__out_file = path
386
387 def get_stdout_file(self):
388 """
389 Get the file to which Condor directs the stdout of the job.
390 """
391 return self.__out_file
392
393 def set_sub_file(self, path):
394 """
395 Set the name of the file to write the Condor submit file to when
396 write_sub_file() is called.
397 @param path: path to submit file.
398 """
399 self.__sub_file_path = path
400
401 def get_sub_file(self):
402 """
403 Get the name of the file which the Condor submit file will be
404 written to when write_sub_file() is called.
405 """
406 return self.__sub_file_path
407
408 def write_sub_file(self):
409 """
410 Write a submit file for this Condor job.
411 """
412 if not self.__log_file:
413 raise CondorSubmitError("Log file not specified.")
414 if not self.__err_file:
415 raise CondorSubmitError("Error file not specified.")
416 if not self.__out_file:
417 raise CondorSubmitError("Output file not specified.")
418
419 if not self.__sub_file_path:
420 raise CondorSubmitError('No path for submit file.')
421 try:
422 subfile = open(self.__sub_file_path, 'w')
423 except:
424 raise CondorSubmitError("Cannot open file " + self.__sub_file_path)
425
426 if self.__universe == 'grid':
427 if self.__grid_type is None:
428 raise CondorSubmitError('No grid type specified.')
429 elif self.__grid_type == 'gt2':
430 if self.__grid_server is None:
431 raise CondorSubmitError('No server specified for grid resource.')
432 elif self.__grid_type == 'gt4':
433 if self.__grid_server is None:
434 raise CondorSubmitError('No server specified for grid resource.')
435 if self.__grid_scheduler is None:
436 raise CondorSubmitError('No scheduler specified for grid resource.')
437 else:
438 raise CondorSubmitError('Unsupported grid resource.')
439
440 subfile.write( 'universe = ' + self.__universe + '\n' )
441 subfile.write( 'executable = ' + self.__executable + '\n' )
442
443 if self.__universe == 'grid':
444 if self.__grid_type == 'gt2':
445 subfile.write('grid_resource = %s %s\n' % (self.__grid_type,
446 self.__grid_server))
447 if self.__grid_type == 'gt4':
448 subfile.write('grid_resource = %s %s %s\n' % (self.__grid_type,
450
451 if self.__universe == 'grid':
452 subfile.write('when_to_transfer_output = ON_EXIT\n')
453 subfile.write('transfer_output_files = $(macrooutput)\n')
454 subfile.write('transfer_input_files = $(macroinput)\n')
455
456 if list(self.__options.keys()) or list(self.__short_options.keys()) or self.__arguments:
457 subfile.write( 'arguments = "' )
458 for c in self.__arguments:
459 subfile.write( ' ' + c )
460 for c in self.__options.keys():
461 if self.__options[c]:
462 subfile.write( ' --' + c + ' ' + self.__options[c] )
463 else:
464 subfile.write( ' --' + c )
465 for c in self.__short_options.keys():
466 if self.__short_options[c]:
467 subfile.write( ' -' + c + ' ' + self.__short_options[c] )
468 else:
469 subfile.write( ' -' + c )
470 subfile.write( ' "\n' )
471
472 for cmd in self.__condor_cmds.keys():
473 subfile.write( str(cmd) + " = " + str(self.__condor_cmds[cmd]) + '\n' )
474
475 subfile.write( 'log = ' + self.__log_file + '\n' )
476 if self.__in_file is not None:
477 subfile.write( 'input = ' + self.__in_file + '\n' )
478 subfile.write( 'error = ' + self.__err_file + '\n' )
479 subfile.write( 'output = ' + self.__out_file + '\n' )
480 if self.__notification:
481 subfile.write( 'notification = ' + self.__notification + '\n' )
482 subfile.write( 'queue ' + str(self.__queue) + '\n' )
483
484 subfile.close()
485
486
488 """
489 A Condor DAG job never notifies the user on completion and can have variable
490 options that are set for a particular node in the DAG. Inherits methods
491 from a CondorJob.
492 """
493 def __init__(self, universe, executable):
494 """
495 universe = the condor universe to run the job in.
496 executable = the executable to run in the DAG.
497 """
498 super(CondorDAGJob,self).__init__(universe, executable, 1)
499 CondorJob.set_notification(self, 'never')
500 self.__var_opts = []
501 self.__arg_index = 0
502 self.__var_args = []
503 self.__var_cmds = []
504 self.__grid_site = None
505 self.__bad_macro_chars = re.compile(r'[_-]')
506
507 def create_node(self):
508 """
509 Create a condor node from this job. This provides a basic interface to
510 the CondorDAGNode class. Most jobs in a workflow will subclass the
511 CondorDAGNode class and overwrite this to give more details when
512 initializing the node. However, this will work fine for jobs with very simp
513 input/output.
514 """
515 return CondorDAGNode(self)
516
517 def set_grid_site(self,site):
518 """
519 Set the grid site to run on. If not specified,
520 will not give hint to Pegasus
521 """
522 self.__grid_site=str(site)
523 if site != 'local':
524 self.set_executable_installed(False)
525
526 def get_grid_site(self):
527 """
528 Return the grid site for this node
529 """
530 return self.__grid_site
531
532 def add_var_opt(self, opt, short=False):
533 """
534 Add a variable (or macro) option to the condor job. The option is added
535 to the submit file and a different argument to the option can be set for
536 each node in the DAG.
537 @param opt: name of option to add.
538 @param short: opt is short
539 """
540 if opt not in self.__var_opts:
541 self.__var_opts.append(opt)
542 macro = self.__bad_macro_chars.sub( r'', opt )
543 if short:
544 self.add_short_opt(opt,'$(macro' + macro + ')')
545 else:
546 self.add_opt(opt,'$(macro' + macro + ')')
547
548 def add_var_condor_cmd(self, command):
549 """
550 Add a condor command to the submit file that allows variable (macro)
551 arguments to be passes to the executable.
552 """
553 if command not in self.__var_cmds:
554 self.__var_cmds.append(command)
555 macro = self.__bad_macro_chars.sub( r'', command )
556 self.add_condor_cmd(command, '$(macro' + macro + ')')
557
558 def add_var_arg(self,arg_index,quote=False):
559 """
560 Add a command to the submit file to allow variable (macro) arguments
561 to be passed to the executable.
562 """
563 try:
564 self.__var_args[arg_index]
565 except IndexError:
566 if arg_index != self.__arg_index:
567 raise CondorDAGJobError("mismatch between job and node var_arg index")
568 if quote:
569 self.__var_args.append("'$(macroargument%s)'" % str(arg_index))
570 else:
571 self.__var_args.append('$(macroargument%s)' % str(arg_index))
572 self.add_arg(self.__var_args[self.__arg_index])
573 self.__arg_index += 1
574
575
576class CondorDAGManJob(object):
577 """
578 Condor DAGMan job class. Appropriate for setting up DAGs to run within a
579 DAG.
580 """
581 def __init__(self, dag, dir=None):
582 """
583 dag = the name of the condor dag file to run
584 dir = the diretory in which the dag file is located
585 """
586 self.__dag = dag
587 self.__notification = None
588 self.__dag_directory = dir
589
590 def create_node(self):
591 """
592 Create a condor node from this job. This provides a basic interface to
593 the CondorDAGManNode class. Most jobs in a workflow will subclass the
594 CondorDAGManNode class and overwrite this to give more details when
595 initializing the node. However, this will work fine for jobs with very simp
596 input/output.
597 """
598 return CondorDAGManNode(self)
599
600 def set_dag_directory(self, dir):
601 """
602 Set the directory where the dag will be run
603 @param dir: the name of the directory where the dag will be run
604 """
605 self.__dag_directory = dir
606
607 def get_dag_directory(self):
608 """
609 Get the directory where the dag will be run
610 """
611 return self.__dag_directory
612
613 def set_notification(self, value):
614 """
615 Set the email address to send notification to.
616 @param value: email address or never for no notification.
617 """
618 self.__notification = value
619
620 def get_sub_file(self):
621 """
622 Return the name of the dag as the submit file name for the
623 SUBDAG EXTERNAL command in the uber-dag
624 """
625 return self.__dag
626
627 def write_sub_file(self):
628 """
629 Do nothing as there is not need for a sub file with the
630 SUBDAG EXTERNAL command in the uber-dag
631 """
632 pass
633
634 def get_dag(self):
635 """
636 Return the name of any associated dag file
637 """
638 return self.__dag
639
640
641class CondorDAGNode(object):
642 """
643 A CondorDAGNode represents a node in the DAG. It corresponds to a particular
644 condor job (and so a particular submit file). If the job has variable
645 (macro) options, they can be set here so each nodes executes with the
646 correct options.
647 """
648 def __init__(self, job):
649 """
650 @param job: the CondorJob that this node corresponds to.
651 """
652 if not isinstance(job, CondorDAGJob) and \
653 not isinstance(job,CondorDAGManJob):
654 raise CondorDAGNodeError(
655 "A DAG node must correspond to a Condor DAG job or Condor DAGMan job")
656 self.__name = None
657 self.__job = job
658 self.__category = None
659 self.__priority = None
660 self.__pre_script = None
661 self.__pre_script_args = []
662 self.__post_script = None
663 self.__post_script_args = []
664 self.__macros = {}
665 self.__opts = {}
666 self.__args = []
667 self.__arg_index = 0
668 self.__retry = 0
669 self.__parents = []
670 self.__bad_macro_chars = re.compile(r'[_-]')
671 self.__output_files = []
672 self.__input_files = []
673 self.__checkpoint_files = []
674 self.__vds_group = None
675 if isinstance(job, CondorDAGJob) and job.get_universe() == 'standard':
676 self.__grid_start = 'none'
677 else:
678 self.__grid_start = None
679
680 # generate the md5 node name
681 t = str( int( time.time() * 1000 ) )
682 r = str( int( random.random() * 100000000000000000 ) )
683 a = str( self.__class__ )
684 self.__name = md5((t + r + a).encode()).hexdigest()
685 self.__md5name = self.__name
686
687 def __repr__(self):
688 return self.__name
689
690 def job(self):
691 """
692 Return the CondorJob that this node is associated with.
693 """
694 return self.__job
695
696 def set_pre_script(self,script):
697 """
698 Sets the name of the pre script that is executed before the DAG node is
699 run.
700 @param script: path to script
701 """
702 self.__pre_script = script
703
704 def add_pre_script_arg(self,arg):
705 """
706 Adds an argument to the pre script that is executed before the DAG node is
707 run.
708 """
709 self.__pre_script_args.append(arg)
710
711 def set_post_script(self,script):
712 """
713 Sets the name of the post script that is executed before the DAG node is
714 run.
715 @param script: path to script
716 """
717 self.__post_script = script
718
719 def get_post_script(self):
720 """
721 returns the name of the post script that is executed before the DAG node is
722 run.
723 """
724 return self.__post_script
725
726 def add_post_script_arg(self,arg):
727 """
728 Adds an argument to the post script that is executed before the DAG node is
729 run.
730 """
731 self.__post_script_args.append(arg)
732
733 def get_post_script_arg(self):
734 """
735 Returns and array of arguments to the post script that is executed before
736 the DAG node is run.
737 """
738 return self.__post_script_args
739
740 def set_name(self,name):
741 """
742 Set the name for this node in the DAG.
743 """
744 self.__name = str(name)
745
746 def get_name(self):
747 """
748 Get the name for this node in the DAG.
749 """
750 return self.__name
751
752 def set_category(self,category):
753 """
754 Set the category for this node in the DAG.
755 """
756 self.__category = str(category)
757
758 def get_category(self):
759 """
760 Get the category for this node in the DAG.
761 """
762 return self.__category
763
764 def set_priority(self,priority):
765 """
766 Set the priority for this node in the DAG.
767 """
768 self.__priority = str(priority)
769
770 def get_priority(self):
771 """
772 Get the priority for this node in the DAG.
773 """
774 return self.__priority
775
776 def add_input_file(self, filename):
777 """
778 Add filename as a necessary input file for this DAG node.
779
780 @param filename: input filename to add
781 """
782 if filename not in self.__input_files:
783 self.__input_files.append(filename)
784 if not isinstance(self.job(), CondorDAGManJob):
785 if self.job().get_universe() == 'grid':
786 self.add_input_macro(filename)
787
788 def add_output_file(self, filename):
789 """
790 Add filename as a output file for this DAG node.
791
792 @param filename: output filename to add
793 """
794 if filename not in self.__output_files:
795 self.__output_files.append(filename)
796 if not isinstance(self.job(), CondorDAGManJob):
797 if self.job().get_universe() == 'grid':
798 self.add_output_macro(filename)
799
800 def add_checkpoint_file(self,filename):
801 """
802 Add filename as a checkpoint file for this DAG node
803 @param filename: checkpoint filename to add
804 """
805 if filename not in self.__checkpoint_files:
806 self.__checkpoint_files.append(filename)
807 if not isinstance(self.job(), CondorDAGManJob):
808 if self.job().get_universe() == 'grid':
809 self.add_checkpoint_macro(filename)
810
811 def get_input_files(self):
812 """
813 Return list of input files for this DAG node and its job.
814 """
815 input_files = list(self.__input_files)
816 if isinstance(self.job(), CondorDAGJob):
817 input_files = input_files + self.job().get_input_files()
818 return input_files
819
820 def get_output_files(self):
821 """
822 Return list of output files for this DAG node and its job.
823 """
824 output_files = list(self.__output_files)
825 if isinstance(self.job(), CondorDAGJob):
826 output_files = output_files + self.job().get_output_files()
827 return output_files
828
829 def get_checkpoint_files(self):
830 """
831 Return a list of checkpoint files for this DAG node and its job.
832 """
833 checkpoint_files = list(self.__checkpoint_files)
834 if isinstance(self.job(), CondorDAGJob):
835 checkpoint_files = checkpoint_files + self.job().get_checkpoint_files()
836 return checkpoint_files
837
838 def set_vds_group(self,group):
839 """
840 Set the name of the VDS group key when generating a DAX
841 @param group: name of group for thus nore
842 """
843 self.__vds_group = str(group)
844
845 def get_vds_group(self):
846 """
847 Returns the VDS group key for this node
848 """
849 return self.__vds_group
850
851 def add_macro(self,name,value):
852 """
853 Add a variable (macro) for this node. This can be different for
854 each node in the DAG, even if they use the same CondorJob. Within
855 the CondorJob, the value of the macro can be referenced as
856 '$(name)' -- for instance, to define a unique output or error file
857 for each node.
858 @param name: macro name.
859 @param value: value of the macro for this node in the DAG
860 """
861 macro = self.__bad_macro_chars.sub( r'', name )
862 self.__opts[macro] = value
863
864 def add_io_macro(self,io,filename):
865 """
866 Add a variable (macro) for storing the input/output files associated
867 with this node.
868 @param io: macroinput or macrooutput
869 @param filename: filename of input/output file
870 """
871 io = self.__bad_macro_chars.sub( r'', io )
872 if io not in self.__opts:
873 self.__opts[io] = filename
874 else:
875 if filename not in self.__opts[io]:
876 self.__opts[io] += ',%s' % filename
877
878 def add_input_macro(self,filename):
879 """
880 Add a variable (macro) for storing the input files associated with
881 this node.
882 @param filename: filename of input file
883 """
884 self.add_io_macro('macroinput', filename)
885
886 def add_output_macro(self,filename):
887 """
888 Add a variable (macro) for storing the output files associated with
889 this node.
890 @param filename: filename of output file
891 """
892 self.add_io_macro('macrooutput', filename)
893
894 def add_checkpoint_macro(self,filename):
895 self.add_io_macro('macrocheckpoint',filename)
896
897 def get_opts(self):
898 """
899 Return the opts for this node. Note that this returns only
900 the options for this instance of the node and not those
901 associated with the underlying job template.
902 """
903 return self.__opts
904
905 def add_var_condor_cmd(self, command, value):
906 """
907 Add a variable (macro) condor command for this node. If the command
908 specified does not exist in the CondorJob, it is added so the submit file
909 will be correct.
910 PLEASE NOTE: AS with other add_var commands, the variable must be set for
911 all nodes that use the CondorJob instance.
912 @param command: command name
913 @param value: Value of the command for this node in the DAG/DAX.
914 """
915 macro = self.__bad_macro_chars.sub( r'', command )
916 self.__macros['macro' + macro] = value
917 self.__job.add_var_condor_cmd(command)
918
919 def add_var_opt(self,opt,value,short=False):
920 """
921 Add a variable (macro) option for this node. If the option
922 specified does not exist in the CondorJob, it is added so the submit
923 file will be correct when written.
924 @param opt: option name.
925 @param value: value of the option for this node in the DAG.
926 @param short: opt is short
927 """
928 macro = self.__bad_macro_chars.sub( r'', opt )
929 self.__opts['macro' + macro] = value
930 self.__job.add_var_opt(opt,short)
931
932 def add_file_opt(self,opt,filename,file_is_output_file=False):
933 """
934 Add a variable (macro) option for this node. If the option
935 specified does not exist in the CondorJob, it is added so the submit
936 file will be correct when written. The value of the option is also
937 added to the list of input files for the DAX.
938 @param opt: option name.
939 @param filename: value of the option for this node in the DAG.
940 @param file_is_output_file: A boolean if the file will be an output file
941 instead of an input file. The default is to have it be an input.
942 """
943 self.add_var_opt(opt,filename)
944 if file_is_output_file: self.add_output_file(filename)
945 else: self.add_input_file(filename)
946
947 def add_var_arg(self, arg,quote=False):
948 """
949 Add a variable (or macro) argument to the condor job. The argument is
950 added to the submit file and a different value of the argument can be set
951 for each node in the DAG.
952 @param arg: name of option to add.
953 @param quote: quote
954 """
955 self.__args.append(arg)
956 self.__job.add_var_arg(self.__arg_index,quote=quote)
957 self.__arg_index += 1
958
959 def add_file_arg(self, filename):
960 """
961 Add a variable (or macro) file name argument to the condor job. The
962 argument is added to the submit file and a different value of the
963 argument can be set for each node in the DAG. The file name is also
964 added to the list of input files for the DAX.
965 @param filename: name of option to add.
966 """
967 self.add_input_file(filename)
968 self.add_var_arg(filename)
969
970 def get_args(self):
971 """
972 Return the arguments for this node. Note that this returns
973 only the arguments for this instance of the node and not those
974 associated with the underlying job template.
975 """
976 return self.__args
977
978 def set_retry(self, retry):
979 """
980 Set the number of times that this node in the DAG should retry.
981 @param retry: number of times to retry node.
982 """
983 self.__retry = retry
984
985 def get_retry(self):
986 """
987 Return the number of times that this node in the DAG should retry.
988 """
989 return self.__retry
990
991 def write_job(self,fh):
992 """
993 Write the DAG entry for this node's job to the DAG file descriptor.
994 @param fh: descriptor of open DAG file.
995 """
996 if isinstance(self.job(),CondorDAGManJob):
997 # create an external subdag from this dag
998 fh.write( ' '.join(
999 ['SUBDAG EXTERNAL', self.__name, self.__job.get_sub_file()]) )
1000 if self.job().get_dag_directory():
1001 fh.write( ' DIR ' + self.job().get_dag_directory() )
1002 else:
1003 # write a regular condor job
1004 fh.write( 'JOB ' + self.__name + ' ' + self.__job.get_sub_file() )
1005 fh.write( '\n')
1006
1007 fh.write( 'RETRY ' + self.__name + ' ' + str(self.__retry) + '\n' )
1008
1009 def write_category(self,fh):
1010 """
1011 Write the DAG entry for this node's category to the DAG file descriptor.
1012 @param fh: descriptor of open DAG file.
1013 """
1014 fh.write('CATEGORY ' + self.__name + ' ' + self.__category + '\n')
1015
1016 def write_priority(self,fh):
1017 """
1018 Write the DAG entry for this node's priority to the DAG file descriptor.
1019 @param fh: descriptor of open DAG file.
1020 """
1021 fh.write('PRIORITY ' + self.__name + ' ' + self.__priority + '\n')
1022
1023 def write_vars(self,fh):
1024 """
1025 Write the variable (macro) options and arguments to the DAG file
1026 descriptor.
1027 @param fh: descriptor of open DAG file.
1028 """
1029 if list(self.__macros.keys()) or list(self.__opts.keys()) or self.__args:
1030 fh.write( 'VARS ' + self.__name )
1031 for k in self.__macros.keys():
1032 fh.write( ' ' + str(k) + '="' + str(self.__macros[k]) + '"' )
1033 for k in self.__opts.keys():
1034 fh.write( ' ' + str(k) + '="' + str(self.__opts[k]) + '"' )
1035 if self.__args:
1036 for i in range(self.__arg_index):
1037 fh.write( ' macroargument' + str(i) + '="' + self.__args[i] + '"' )
1038 fh.write( '\n' )
1039
1040 def write_parents(self,fh):
1041 """
1042 Write the parent/child relations for this job to the DAG file descriptor.
1043 @param fh: descriptor of open DAG file.
1044 """
1045 if len(self.__parents) > 0:
1046 fh.write( 'PARENT ' + " ".join((str(p) for p in self.__parents)) + ' CHILD ' + str(self) + '\n' )
1047
1048 def write_pre_script(self,fh):
1049 """
1050 Write the pre script for the job, if there is one
1051 @param fh: descriptor of open DAG file.
1052 """
1053 if self.__pre_script:
1054 fh.write( 'SCRIPT PRE ' + str(self) + ' ' + self.__pre_script + ' ' +
1055 ' '.join(self.__pre_script_args) + '\n' )
1056
1057 def write_post_script(self,fh):
1058 """
1059 Write the post script for the job, if there is one
1060 @param fh: descriptor of open DAG file.
1061 """
1062 if self.__post_script:
1063 fh.write( 'SCRIPT POST ' + str(self) + ' ' + self.__post_script + ' ' +
1064 ' '.join(self.__post_script_args) + '\n' )
1065
1066 def write_input_files(self, fh):
1067 """
1068 Write as a comment into the DAG file the list of input files
1069 for this DAG node.
1070
1071 @param fh: descriptor of open DAG file.
1072 """
1073 for f in self.__input_files:
1074 fh.write("## Job %s requires input file %s\n" % (self.__name, f))
1075
1076 def write_output_files(self, fh):
1077 """
1078 Write as a comment into the DAG file the list of output files
1079 for this DAG node.
1080
1081 @param fh: descriptor of open DAG file.
1082 """
1083 for f in self.__output_files:
1084 fh.write("## Job %s generates output file %s\n" % (self.__name, f))
1085
1086 def set_log_file(self,log):
1087 """
1088 Set the Condor log file to be used by this CondorJob.
1089 @param log: path of Condor log file.
1090 """
1091 self.__job.set_log_file(log)
1092
1093 def add_parent(self,node):
1094 """
1095 Add a parent to this node. This node will not be executed until the
1096 parent node has run sucessfully.
1097 @param node: CondorDAGNode to add as a parent.
1098 """
1099 if not isinstance(node, (CondorDAGNode,CondorDAGManNode) ):
1100 raise CondorDAGNodeError("Parent must be a CondorDAGNode or a CondorDAGManNode")
1101 self.__parents.append( node )
1102
1103 def get_cmd_tuple_list(self):
1104 """
1105 Return a list of tuples containg the command line arguments
1106 """
1107
1108 # pattern to find DAGman macros
1109 pat = re.compile(r'\$\‍((.+)\‍)')
1110 argpat = re.compile(r'\d+')
1111
1112 # first parse the arguments and replace macros with values
1113 args = self.job().get_args()
1114 macros = self.get_args()
1115
1116 cmd_list = []
1117
1118 for a in args:
1119 m = pat.search(a)
1120 if m:
1121 arg_index = int(argpat.findall(a)[0])
1122 try:
1123 cmd_list.append(("%s" % macros[arg_index], ""))
1124 except IndexError:
1125 cmd_list.append("")
1126 else:
1127 cmd_list.append(("%s" % a, ""))
1128
1129 # second parse the options and replace macros with values
1130 options = self.job().get_opts()
1131 macros = self.get_opts()
1132
1133 for k in options:
1134 val = options[k]
1135 m = pat.match(val)
1136 if m:
1137 key = m.group(1)
1138 value = macros[key]
1139
1140 cmd_list.append(("--%s" % k, str(value)))
1141 else:
1142 cmd_list.append(("--%s" % k, str(val)))
1143
1144 # lastly parse the short options and replace macros with values
1145 options = self.job().get_short_opts()
1146
1147 for k in options:
1148 val = options[k]
1149 m = pat.match(val)
1150 if m:
1151 key = m.group(1)
1152 value = macros[key]
1153
1154 cmd_list.append(("-%s" % k, str(value)))
1155 else:
1156 cmd_list.append(("-%s" % k, str(val)))
1157
1158 return cmd_list
1159
1160 def get_cmd_line(self):
1161 """
1162 Return the full command line that will be used when this node
1163 is run by DAGman.
1164 """
1165
1166 cmd = ""
1167 cmd_list = self.get_cmd_tuple_list()
1168 for argument in cmd_list:
1169 cmd += ' '.join(argument) + " "
1170
1171 return cmd
1172
1173 def finalize(self):
1174 """
1175 The finalize method of a node is called before the node is
1176 finally added to the DAG and can be overridden to do any last
1177 minute clean up (such as setting extra command line arguments)
1178 """
1179 pass
1180
1181
1183 """
1184 Condor DAGMan node class. Appropriate for setting up DAGs to run within a
1185 DAG. Adds the user-tag functionality to condor_dagman processes running in
1186 the DAG. May also be used to extend dagman-node specific functionality.
1187 """
1188 def __init__(self, job):
1189 """
1190 @param job: a CondorDAGNodeJob
1191 """
1192 super(CondorDAGManNode,self).__init__(job)
1193 self.__user_tag = None
1194 self.__maxjobs_categories = []
1195 self.__cluster_jobs = None
1196
1197 def set_user_tag(self,usertag):
1198 """
1199 Set the user tag that is passed to the analysis code.
1200 @param usertag: the user tag to identify the job
1201 """
1202 self.__user_tag = str(usertag)
1203
1204 def get_user_tag(self):
1205 """
1206 Returns the usertag string
1207 """
1208 return self.__user_tag
1209
1210 def add_maxjobs_category(self,categoryName,maxJobsNum):
1211 """
1212 Add a category to this DAG called categoryName with a maxjobs of maxJobsNum.
1213 @param categoryName: category name
1214 @param maxJobsNum: max jobs num
1215 """
1216 self.__maxjobs_categories.append((str(categoryName),str(maxJobsNum)))
1217
1218 def get_maxjobs_categories(self):
1219 """
1220 Return an array of tuples containing (categoryName,maxJobsNum)
1221 """
1222 return self.__maxjobs_categories
1223
1224 def set_cluster_jobs(self,cluster):
1225 """
1226 Set the type of job clustering pegasus can use to collapse jobs
1227 @param cluster: clustering type
1228 """
1229 self.__cluster_jobs = str(cluster)
1230
1231 def get_cluster_jobs(self):
1232 """
1233 Returns the usertag string
1234 """
1235 return self.__cluster_jobs
1236
1237
1238class CondorDAG(object):
1239 """
1240 A CondorDAG is a Condor Directed Acyclic Graph that describes a collection
1241 of Condor jobs and the order in which to run them. All Condor jobs in the
1242 DAG must write their Codor logs to the same file.
1243 NOTE: The log file must not be on an NFS mounted system as the Condor jobs
1244 must be able to get an exclusive file lock on the log file.
1245 """
1246 def __init__(self,log):
1247 """
1248 @param log: path to log file which must not be on an NFS mounted file system.
1249 """
1250 self.__log_file_path = log
1251 self.__dag_file_path = None
1252 self.__jobs = []
1253 self.__nodes = []
1254 self.__maxjobs_categories = []
1255 self.__integer_node_names = 0
1256 self.__node_count = 0
1257 self.__nodes_finalized = 0
1258
1259 def get_nodes(self):
1260 """
1261 Return a list containing all the nodes in the DAG
1262 """
1263 return self.__nodes
1264
1265 def get_jobs(self):
1266 """
1267 Return a list containing all the jobs in the DAG
1268 """
1269 return self.__jobs
1270
1271 def set_integer_node_names(self):
1272 """
1273 Use integer node names for the DAG
1274 """
1275 self.__integer_node_names = 1
1276
1277 def set_dag_file(self, path):
1278 """
1279 Set the name of the file into which the DAG is written.
1280 @param path: path to DAG file.
1281 """
1282 self.__dag_file_path = path + '.dag'
1283
1284 def get_dag_file(self):
1285 """
1286 Return the path to the DAG file.
1287 """
1288 if not self.__log_file_path:
1289 raise CondorDAGError("No path for DAG file")
1290 else:
1291 return self.__dag_file_path
1292
1293 def add_node(self,node):
1294 """
1295 Add a CondorDAGNode to this DAG. The CondorJob that the node uses is
1296 also added to the list of Condor jobs in the DAG so that a list of the
1297 submit files needed by the DAG can be maintained. Each unique CondorJob
1298 will be added once to prevent duplicate submit files being written.
1299 @param node: CondorDAGNode to add to the CondorDAG.
1300 """
1301 if not isinstance(node, CondorDAGNode):
1302 raise CondorDAGError("Nodes must be class CondorDAGNode or subclass")
1303 if not isinstance(node.job(), CondorDAGManJob):
1304 node.set_log_file(self.__log_file_path)
1305 self.__nodes.append(node)
1306 if self.__integer_node_names:
1307 node.set_name(str(self.__node_count))
1308 self.__node_count += 1
1309 if node.job() not in self.__jobs:
1310 self.__jobs.append(node.job())
1311
1312 def add_maxjobs_category(self,categoryName,maxJobsNum):
1313 """
1314 Add a category to this DAG called categoryName with a maxjobs of maxJobsNum.
1315 @param categoryName: category name
1316 @param maxJobsNum: max jobs num
1317 """
1318 self.__maxjobs_categories.append((str(categoryName),str(maxJobsNum)))
1319
1320 def get_maxjobs_categories(self):
1321 """
1322 Return an array of tuples containing (categoryName,maxJobsNum)
1323 """
1324 return self.__maxjobs_categories
1325
1326 def write_maxjobs(self,fh,category):
1327 """
1328 Write the DAG entry for this category's maxjobs to the DAG file descriptor.
1329 @param fh: descriptor of open DAG file.
1330 @param category: tuple containing type of jobs to set a maxjobs limit for
1331 and the maximum number of jobs of that type to run at once.
1332 """
1333 fh.write('MAXJOBS ' + str(category[0]) + ' ' + str(category[1]) + '\n')
1334
1335 def write_sub_files(self):
1336 """
1337 Write all the submit files used by the dag to disk. Each submit file is
1338 written to the file name set in the CondorJob.
1339 """
1340 if not self.__nodes_finalized:
1341 for node in self.__nodes:
1342 node.finalize()
1343 for job in self.__jobs:
1344 job.write_sub_file()
1345
1346 def write_concrete_dag(self):
1347 """
1348 Write all the nodes in the DAG to the DAG file.
1349 """
1350 if not self.__dag_file_path:
1351 raise CondorDAGError("No path for DAG file")
1352 try:
1353 dagfile = open( self.__dag_file_path, 'w' )
1354 except:
1355 raise CondorDAGError("Cannot open file " + self.__dag_file_path)
1356 for node in self.__nodes:
1357 node.write_job(dagfile)
1358 node.write_vars(dagfile)
1359 if node.get_category():
1360 node.write_category(dagfile)
1361 if node.get_priority():
1362 node.write_priority(dagfile)
1363 node.write_pre_script(dagfile)
1364 node.write_post_script(dagfile)
1365 node.write_input_files(dagfile)
1366 node.write_output_files(dagfile)
1367 for node in self.__nodes:
1368 node.write_parents(dagfile)
1369 for category in self.__maxjobs_categories:
1370 self.write_maxjobs(dagfile, category)
1371 dagfile.close()
1372
1373 def write_dag(self):
1374 """
1375 Write a dag.
1376 """
1377 if not self.__nodes_finalized:
1378 for node in self.__nodes:
1379 node.finalize()
1380 self.write_concrete_dag()
1381
1382 def write_script(self):
1383 """
1384 Write the workflow to a script (.sh instead of .dag).
1385
1386 Assuming that parents were added to the DAG before their children,
1387 dependencies should be handled correctly.
1388 """
1389 if not self.__dag_file_path:
1390 raise CondorDAGError("No path for DAG file")
1391 try:
1392 dfp = self.__dag_file_path
1393 outfilename = ".".join(dfp.split(".")[:-1]) + ".sh"
1394 outfile = open(outfilename, "w")
1395 except:
1396 raise CondorDAGError("Cannot open file " + self.__dag_file_path)
1397
1398 for node in self.__nodes:
1399 outfile.write("# Job %s\n" % node.get_name())
1400 # Check if this is a DAGMAN Node
1401 if isinstance(node,CondorDAGManNode):
1402 outfile.write("condor_submit_dag %s\n\n" % (node.job().get_dag()))
1403 else:
1404 outfile.write("%s %s\n\n" % (node.job().get_executable(),
1405 node.get_cmd_line()))
1406 outfile.close()
1407
1408 os.chmod(outfilename, os.stat(outfilename)[0] | stat.S_IEXEC)
1409
1410
1411class AnalysisJob(object):
1412 """
1413 Describes a generic analysis job that filters LIGO data as configured by
1414 an ini file.
1415 """
1416 def __init__(self,cp):
1417 """
1418 @param cp: ConfigParser object that contains the configuration for this job.
1419 """
1420 self.__cp = cp
1421 try:
1422 self.__channel = str(self.__cp.get('input','channel')).strip()
1423 except:
1424 self.__channel = None
1425
1426 def get_config(self,sec,opt):
1427 """
1428 Get the configration variable in a particular section of this jobs ini
1429 file.
1430 @param sec: ini file section.
1431 @param opt: option from section sec.
1432 """
1433 return str(self.__cp.get(sec,opt)).strip()
1434
1435 def set_channel(self,channel):
1436 """
1437 Set the name of the channel that this job is filtering. This will
1438 overwrite the value obtained at initialization.
1439 """
1440 self.__channel = channel
1441
1442 def channel(self):
1443 """
1444 Returns the name of the channel that this job is filtering. Note that
1445 channel is defined to be IFO independent, so this may be LSC-AS_Q or
1446 IOO-MC_F. The IFO is set on a per node basis, not a per job basis.
1447 """
1448 return self.__channel
1449
1450
1451class AnalysisNode(object):
1452 """
1453 Contains the methods that allow an object to be built to analyse LIGO
1454 data in a Condor DAG.
1455 """
1456 def __init__(self):
1457 self.__start = 0
1458 self.__end = 0
1459 self.__data_start = 0
1460 self.__pad_data = 0
1461 self.__data_end = 0
1462 self.__trig_start = 0
1463 self.__trig_end = 0
1464 self.__ifo = None
1465 self.__ifo_tag = None
1466 self.__input = None
1467 self.__output = None
1468 self.__calibration = None
1469 self.__calibration_cache = None
1470 self.__LHO2k = re.compile(r'H2')
1471 self.__user_tag = self.job().get_opts().get("user-tag", None)
1472
1473 def set_start(self,time,pass_to_command_line=True):
1474 """
1475 Set the GPS start time of the analysis node by setting a --gps-start-time
1476 option to the node when it is executed.
1477 @param time: GPS start time of job.
1478 @param pass_to_command_line: add gps-start-time as variable option.
1479 """
1480 if pass_to_command_line:
1481 self.add_var_opt('gps-start-time',time)
1482 self.__start = time
1483 self.__data_start = time
1484 #if not self.__calibration and self.__ifo and self.__start > 0:
1485 # self.calibration()
1486
1487 def get_start(self):
1488 """
1489 Get the GPS start time of the node.
1490 """
1491 return self.__start
1492
1493 def set_end(self,time,pass_to_command_line=True):
1494 """
1495 Set the GPS end time of the analysis node by setting a --gps-end-time
1496 option to the node when it is executed.
1497 @param time: GPS end time of job.
1498 @param pass_to_command_line: add gps-end-time as variable option.
1499 """
1500 if pass_to_command_line:
1501 self.add_var_opt('gps-end-time',time)
1502 self.__end = time
1503 self.__data_end = time
1504
1505 def get_end(self):
1506 """
1507 Get the GPS end time of the node.
1508 """
1509 return self.__end
1510
1511 def set_data_start(self,time):
1512 """
1513 Set the GPS start time of the data needed by this analysis node.
1514 @param time: GPS start time of job.
1515 """
1516 self.__data_start = time
1517
1518 def get_data_start(self):
1519 """
1520 Get the GPS start time of the data needed by this node.
1521 """
1522 return self.__data_start
1523
1524 def set_pad_data(self,pad):
1525 """
1526 Set the GPS start time of the data needed by this analysis node.
1527 @param pad: pad
1528 """
1529 self.__pad_data = pad
1530
1531 def get_pad_data(self):
1532 """
1533 Get the GPS start time of the data needed by this node.
1534 """
1535 return self.__pad_data
1536
1537 def set_data_end(self,time):
1538 """
1539 Set the GPS end time of the data needed by this analysis node.
1540 @param time: GPS end time of job.
1541 """
1542 self.__data_end = time
1543
1544 def get_data_end(self):
1545 """
1546 Get the GPS end time of the data needed by this node.
1547 """
1548 return self.__data_end
1549
1550 def set_trig_start(self,time,pass_to_command_line=True):
1551 """
1552 Set the trig start time of the analysis node by setting a
1553 --trig-start-time option to the node when it is executed.
1554 @param time: trig start time of job.
1555 @param pass_to_command_line: add trig-start-time as a variable option.
1556 """
1557 if pass_to_command_line:
1558 self.add_var_opt('trig-start-time',time)
1559 self.__trig_start = time
1560
1561 def get_trig_start(self):
1562 """
1563 Get the trig start time of the node.
1564 """
1565 return self.__trig_start
1566
1567 def set_trig_end(self,time,pass_to_command_line=True):
1568 """
1569 Set the trig end time of the analysis node by setting a --trig-end-time
1570 option to the node when it is executed.
1571 @param time: trig end time of job.
1572 @param pass_to_command_line: add trig-end-time as a variable option.
1573 """
1574 if pass_to_command_line:
1575 self.add_var_opt('trig-end-time',time)
1576 self.__trig_end = time
1577
1578 def get_trig_end(self):
1579 """
1580 Get the trig end time of the node.
1581 """
1582 return self.__trig_end
1583
1584 def set_input(self,filename,pass_to_command_line=True):
1585 """
1586 Add an input to the node by adding a --input option.
1587 @param filename: option argument to pass as input.
1588 @param pass_to_command_line: add input as a variable option.
1589 """
1590 self.__input = filename
1591 if pass_to_command_line:
1592 self.add_var_opt('input', filename)
1593 self.add_input_file(filename)
1594
1595 def get_input(self):
1596 """
1597 Get the file that will be passed as input.
1598 """
1599 return self.__input
1600
1601 def set_output(self,filename,pass_to_command_line=True):
1602 """
1603 Add an output to the node by adding a --output option.
1604 @param filename: option argument to pass as output.
1605 @param pass_to_command_line: add output as a variable option.
1606 """
1607 self.__output = filename
1608 if pass_to_command_line:
1609 self.add_var_opt('output', filename)
1610 self.add_output_file(filename)
1611
1612 def get_output(self):
1613 """
1614 Get the file that will be passed as output.
1615 """
1616 return self.__output
1617
1618 def set_ifo(self,ifo):
1619 """
1620 Set the ifo name to analyze. If the channel name for the job is defined,
1621 then the name of the ifo is prepended to the channel name obtained
1622 from the job configuration file and passed with a --channel-name option.
1623 @param ifo: two letter ifo code (e.g. L1, H1 or H2).
1624 """
1625 self.__ifo = ifo
1626 if self.job().channel():
1627 self.add_var_opt('channel-name', ifo + ':' + self.job().channel())
1628
1629 def get_ifo(self):
1630 """
1631 Returns the two letter IFO code for this node.
1632 """
1633 return self.__ifo
1634
1635 def set_ifo_tag(self,ifo_tag,pass_to_command_line=True):
1636 """
1637 Set the ifo tag that is passed to the analysis code.
1638 @param ifo_tag: a string to identify one or more IFOs
1639 @param pass_to_command_line: add ifo-tag as a variable option.
1640 """
1641 self.__ifo_tag = ifo_tag
1642 if pass_to_command_line:
1643 self.add_var_opt('ifo-tag', ifo_tag)
1644
1645 def get_ifo_tag(self):
1646 """
1647 Returns the IFO tag string
1648 """
1649 return self.__ifo_tag
1650
1651 def set_user_tag(self,usertag,pass_to_command_line=True):
1652 """
1653 Set the user tag that is passed to the analysis code.
1654 @param usertag: the user tag to identify the job
1655 @param pass_to_command_line: add user-tag as a variable option.
1656 """
1657 self.__user_tag = usertag
1658 if pass_to_command_line:
1659 self.add_var_opt('user-tag', usertag)
1660
1661 def get_user_tag(self):
1662 """
1663 Returns the usertag string
1664 """
1665 return self.__user_tag
1666
1667 def set_cache(self,filename):
1668 """
1669 Set the LAL frame cache to to use. The frame cache is passed to the job
1670 with the --frame-cache argument.
1671 @param filename: calibration file to use.
1672 """
1673 if isinstance( filename, str ):
1674 # the name of a lal cache file created by a datafind node
1675 self.add_var_opt('frame-cache', filename)
1676 self.add_input_file(filename)
1677 elif isinstance( filename, list ):
1678 # we have an LFN list
1679 self.add_var_opt('glob-frame-data',' ')
1680 # only add the LFNs that actually overlap with this job
1681 # XXX FIXME this is a very slow algorithm
1682 if len(filename) == 0:
1683 raise CondorDAGNodeError(
1684 "LDR did not return any LFNs for query: check ifo and frame type")
1685 for lfn in filename:
1686 a, b, c, d = lfn.split('.')[0].split('-')
1687 t_start = int(c)
1688 t_end = int(c) + int(d)
1689 if (t_start <= (self.get_data_end() + self.get_pad_data() + int(d) + 1)
1690 and t_end >= (self.get_data_start() - self.get_pad_data() - int(d) - 1)):
1691 self.add_input_file(lfn)
1692 # set the frame type based on the LFNs returned by datafind
1693 self.add_var_opt('frame-type', b)
1694 else:
1695 raise CondorDAGNodeError("Unknown LFN cache format")
1696
1697 def calibration_cache_path(self):
1698 """
1699 Determine the path to the correct calibration cache file to use.
1700 """
1701 if self.__ifo and self.__start > 0:
1702 cal_path = self.job().get_config('calibration', 'path')
1703
1704 # check if this is S2: split calibration epochs
1705 if ( self.__LHO2k.match(self.__ifo) and
1706 (self.__start >= 729273613) and (self.__start <= 734367613) ):
1707 if self.__start < int(
1708 self.job().get_config('calibration','H2-cal-epoch-boundary')):
1709 cal_file = self.job().get_config('calibration','H2-1')
1710 else:
1711 cal_file = self.job().get_config('calibration','H2-2')
1712 else:
1713 # if not: just add calibration cache
1714 cal_file = self.job().get_config('calibration',self.__ifo)
1715
1716 cal = os.path.join(cal_path,cal_file)
1717 self.__calibration_cache = cal
1718 else:
1719 msg = "IFO and start-time must be set first"
1720 raise CondorDAGNodeError(msg)
1721
1722 def calibration(self):
1723 """
1724 Set the path to the calibration cache file for the given IFO.
1725 During S2 the Hanford 2km IFO had two calibration epochs, so
1726 if the start time is during S2, we use the correct cache file.
1727 """
1728 # figure out the name of the calibration cache files
1729 # as specified in the ini-file
1731
1732 # old .calibration for DAG's
1733 self.add_var_opt('calibration-cache', self.__calibration_cache)
1735 self.add_input_file(self.__calibration)
1736
1737 def get_calibration(self):
1738 """
1739 Return the calibration cache file to be used by the
1740 DAG.
1741 """
1742 return self.__calibration_cache
1743
1744
1745class AnalysisChunk(object):
1746 """
1747 An AnalysisChunk is the unit of data that a node works with, usually some
1748 subset of a ScienceSegment.
1749 """
1750 def __init__(self, start, end, trig_start=0, trig_end=0):
1751 """
1752 @param start: GPS start time of the chunk.
1753 @param end: GPS end time of the chunk.
1754 @param trig_start: GPS time at which to start generating triggers
1755 @param trig_end: GPS time at which to stop generating triggers
1756 """
1757 self.__start = start
1758 self.__end = end
1759 self.__length = end - start
1760 self.__trig_start = trig_start
1761 self.__trig_end = trig_end
1762
1763 def __repr__(self):
1764 if self.__trig_start and self.__trig_end:
1765 return '<AnalysisChunk: start %d, end %d, trig_start %d, trig_end %d>' % (
1766 self.__start, self.__end, self.__trig_start, self.__trig_end)
1767 elif self.__trig_start and not self.__trig_end:
1768 return '<AnalysisChunk: start %d, end %d, trig_start %d>' % (
1769 self.__start, self.__end, self.__trig_start)
1770 elif not self.__trig_start and self.__trig_end:
1771 return '<AnalysisChunk: start %d, end %d, trig_end %d>' % (
1772 self.__start, self.__end, self.__trig_end)
1773 else:
1774 return '<AnalysisChunk: start %d, end %d>' % (self.__start, self.__end)
1775
1776 def __len__(self):
1777 """
1778 Returns the length of data for which this AnalysisChunk will produce
1779 triggers (in seconds).
1780 """
1781 if self.__trig_start and self.__trig_end:
1782 x = self.__trig_end - self.__trig_start
1783 elif self.__trig_start and not self.__trig_end:
1784 x = self.__end - self.__trig_start
1785 elif not self.__trig_start and self.__trig_end:
1786 x = self.__trig_end - self.__start
1787 else:
1788 x = self.__end - self.__start
1789
1790 if x < 0:
1791 raise SegmentError(self + 'has negative length')
1792 else:
1793 return x
1794
1795 def start(self):
1796 """
1797 Returns the GPS start time of the chunk.
1798 """
1799 return self.__start
1800
1801 def end(self):
1802 """
1803 Returns the GPS end time of the chunk.
1804 """
1805 return self.__end
1806
1807 def dur(self):
1808 """
1809 Returns the length (duration) of the chunk in seconds.
1810 """
1811 return self.__length
1812
1813 def trig_start(self):
1814 """
1815 Return the first GPS time at which triggers for this chunk should be
1816 generated.
1817 """
1818 return self.__trig_start
1819
1820 def trig_end(self):
1821 """
1822 Return the last GPS time at which triggers for this chunk should be
1823 generated.
1824 """
1825 return self.__trig_end
1826
1827 def set_trig_start(self,start):
1828 """
1829 Set the first GPS time at which triggers for this chunk should be
1830 generated.
1831 """
1832 self.__trig_start = start
1833
1834 def set_trig_end(self,end):
1835 """
1836 Set the last GPS time at which triggers for this chunk should be
1837 generated.
1838 """
1839 self.__trig_end = end
1840
1841
1842class ScienceSegment(object):
1843 """
1844 A ScienceSegment is a period of time where the experimenters determine
1845 that the inteferometer is in a state where the data is suitable for
1846 scientific analysis. A science segment can have a list of AnalysisChunks
1847 asscociated with it that break the segment up into (possibly overlapping)
1848 smaller time intervals for analysis.
1849 """
1850 def __init__(self,segment):
1851 """
1852 @param segment: a tuple containing the (segment id, gps start time, gps end
1853 time, duration) of the segment.
1854 """
1855 self.__id = segment[0]
1856 self.__start = segment[1]
1857 self.__end = segment[2]
1858 self.__dur = segment[3]
1859 self.__chunks = []
1860 self.__unused = self.dur()
1861 self.__ifo = None
1862 self.__df_node = None
1863
1864 def __getitem__(self,i):
1865 """
1866 Allows iteration over and direct access to the AnalysisChunks contained
1867 in this ScienceSegment.
1868 """
1869 if i < 0: raise IndexError("list index out of range")
1870 return self.__chunks[i]
1871
1872 def __len__(self):
1873 """
1874 Returns the number of AnalysisChunks contained in this ScienceSegment.
1875 """
1876 return len(self.__chunks)
1877
1878 def __repr__(self):
1879 return '<ScienceSegment: id %d, start %d, end %d, dur %d, unused %d>' % (
1880 self.id(),self.start(),self.end(),self.dur(),self.__unused)
1881
1882 def __cmp__(self,other):
1883 """
1884 ScienceSegments are compared by the GPS start time of the segment.
1885 """
1886 return cmp(self.start(),other.start())
1887
1888 def make_chunks(self,length=0,overlap=0,play=0,sl=0,excl_play=0,pad_data=0):
1889 """
1890 Divides the science segment into chunks of length seconds overlapped by
1891 overlap seconds. If the play option is set, only chunks that contain S2
1892 playground data are generated. If the user has a more complicated way
1893 of generating chunks, this method should be overriden in a sub-class.
1894 Any data at the end of the ScienceSegment that is too short to contain a
1895 chunk is ignored. The length of this unused data is stored and can be
1896 retrieved with the unused() method.
1897 @param length: length of chunk in seconds.
1898 @param overlap: overlap between chunks in seconds.
1899 @param play: 1 : only generate chunks that overlap with S2 playground data.
1900 2 : as play = 1 plus compute trig start and end times to
1901 coincide with the start/end of the playground
1902 @param sl: slide by sl seconds before determining playground data.
1903 @param excl_play: exclude the first excl_play second from the start and end
1904 of the chunk when computing if the chunk overlaps with playground.
1905 @param pad_data: exclude the first and last pad_data seconds of the segment
1906 when generating chunks
1907 """
1908 time_left = self.dur() - (2 * pad_data)
1909 start = self.start() + pad_data
1910 increment = length - overlap
1911 while time_left >= length:
1912 end = start + length
1913 if (not play) or (play and (((end - sl - excl_play - 729273613) % 6370)
1914 < (600 + length - 2 * excl_play))):
1915 if (play == 2):
1916 # calculate the start of the playground preceeding the chunk end
1917 play_start = 729273613 + 6370 * \
1918 math.floor((end - sl - excl_play - 729273613) / 6370)
1919 play_end = play_start + 600
1920 trig_start = 0
1921 trig_end = 0
1922 if ( (play_end - 6370) > start ):
1923 print("Two playground segments in this chunk:", end=' ')
1924 print(" Code to handle this case has not been implemented")
1925 sys.exit(1)
1926 else:
1927 if play_start > start:
1928 trig_start = int(play_start)
1929 if play_end < end:
1930 trig_end = int(play_end)
1931 self.__chunks.append(AnalysisChunk(start, end, trig_start, trig_end))
1932 else:
1933 self.__chunks.append(AnalysisChunk(start, end))
1934 start += increment
1935 time_left -= increment
1936 self.__unused = time_left - overlap
1937
1938 def add_chunk(self, start, end, trig_start=0, trig_end=0):
1939 """
1940 Add an AnalysisChunk to the list associated with this ScienceSegment.
1941 @param start: GPS start time of chunk.
1942 @param end: GPS end time of chunk.
1943 @param trig_start: GPS start time for triggers from chunk
1944 @param trig_end: trig_end
1945 """
1946 self.__chunks.append(AnalysisChunk(start, end, trig_start, trig_end))
1947
1948 def unused(self):
1949 """
1950 Returns the length of data in the science segment not used to make chunks.
1951 """
1952 return self.__unused
1953
1954 def set_unused(self,unused):
1955 """
1956 Set the length of data in the science segment not used to make chunks.
1957 """
1958 self.__unused = unused
1959
1960 def id(self):
1961 """
1962 Returns the ID of this ScienceSegment.
1963 """
1964 return self.__id
1965
1966 def start(self):
1967 """
1968 Returns the GPS start time of this ScienceSegment.
1969 """
1970 return self.__start
1971
1972 def end(self):
1973 """
1974 Returns the GPS end time of this ScienceSegment.
1975 """
1976 return self.__end
1977
1978 def set_start(self,t):
1979 """
1980 Override the GPS start time (and set the duration) of this ScienceSegment.
1981 @param t: new GPS start time.
1982 """
1983 self.__dur += self.__start - t
1984 self.__start = t
1985
1986 def set_end(self,t):
1987 """
1988 Override the GPS end time (and set the duration) of this ScienceSegment.
1989 @param t: new GPS end time.
1990 """
1991 self.__dur -= self.__end - t
1992 self.__end = t
1993
1994 def dur(self):
1995 """
1996 Returns the length (duration) in seconds of this ScienceSegment.
1997 """
1998 return self.__dur
1999
2000 def set_df_node(self,df_node):
2001 """
2002 Set the DataFind node associated with this ScienceSegment to df_node.
2003 @param df_node: the DataFind node for this ScienceSegment.
2004 """
2005 self.__df_node = df_node
2006
2007 def get_df_node(self):
2008 """
2009 Returns the DataFind node for this ScienceSegment.
2010 """
2011 return self.__df_node
2012
2013
2014class ScienceData(object):
2015 """
2016 An object that can contain all the science data used in an analysis. Can
2017 contain multiple ScienceSegments and has a method to generate these from
2018 a text file produces by the LIGOtools segwizard program.
2019 """
2020 def __init__(self):
2021 self.__sci_segs = []
2022 self.__filename = None
2023
2024 def __getitem__(self,i):
2025 """
2026 Allows direct access to or iteration over the ScienceSegments associated
2027 with the ScienceData.
2028 """
2029 return self.__sci_segs[i]
2030
2031 def __repr__(self):
2032 return '<ScienceData: file %s>' % self.__filename
2033
2034 def __len__(self):
2035 """
2036 Returns the number of ScienceSegments associated with the ScienceData.
2037 """
2038 return len(self.__sci_segs)
2039
2040 def read(self,filename,min_length,slide_sec=0,buffer=0):
2041 """
2042 Parse the science segments from the segwizard output contained in file.
2043 @param filename: input text file containing a list of science segments generated by
2044 segwizard.
2045 @param min_length: only append science segments that are longer than min_length.
2046 @param slide_sec: Slide each ScienceSegment by::
2047
2048 delta > 0:
2049 [s, e] -> [s+delta, e].
2050 delta < 0:
2051 [s, e] -> [s, e-delta].
2052
2053 @param buffer: shrink the ScienceSegment::
2054
2055 [s, e] -> [s+buffer, e-buffer]
2056 """
2057 self.__filename = filename
2058 octothorpe = re.compile(r'\A#')
2059 for line in open(filename):
2060 if not octothorpe.match(line) and int(line.split()[3]) >= min_length:
2061 (id, st, en, du) = list(map(int, line.split()))
2062
2063 # slide the data if doing a background estimation
2064 if slide_sec > 0:
2065 st += slide_sec
2066 elif slide_sec < 0:
2067 en += slide_sec
2068 du -= abs(slide_sec)
2069
2070 # add a buffer
2071 if buffer > 0:
2072 st += buffer
2073 en -= buffer
2074 du -= 2 * abs(buffer)
2075
2076 x = ScienceSegment(tuple([id, st, en, du]))
2077 self.__sci_segs.append(x)
2078
2079 def append_from_tuple(self, seg_tuple):
2080 x = ScienceSegment(seg_tuple)
2081 self.__sci_segs.append(x)
2082
2083 def tama_read(self, filename):
2084 """
2085 Parse the science segments from a tama list of locked segments contained in
2086 file.
2087 @param filename: input text file containing a list of tama segments.
2088 """
2089 self.__filename = filename
2090 for line in open(filename):
2091 columns = line.split()
2092 id = int(columns[0])
2093 start = int(math.ceil(float(columns[3])))
2094 end = int(math.floor(float(columns[4])))
2095 dur = end - start
2096
2097 x = ScienceSegment(tuple([id, start, end, dur]))
2098 self.__sci_segs.append(x)
2099
2100 def make_chunks(self, length, overlap=0, play=0, sl=0, excl_play=0, pad_data=0):
2101 """
2102 Divide each ScienceSegment contained in this object into AnalysisChunks.
2103 @param length: length of chunk in seconds.
2104 @param overlap: overlap between segments.
2105 @param play: if true, only generate chunks that overlap with S2 playground
2106 data.
2107 @param sl: slide by sl seconds before determining playground data.
2108 @param excl_play: exclude the first excl_play second from the start and end
2109 of the chunk when computing if the chunk overlaps with playground.
2110 @param pad_data: exclude the first and last pad_data seconds of the segment
2111 when generating chunks
2112 """
2113 for seg in self.__sci_segs:
2114 seg.make_chunks(length,overlap,play,sl,excl_play,pad_data)
2115
2116 def make_chunks_from_unused(self,length,trig_overlap,play=0,min_length=0,
2117 sl=0,excl_play=0,pad_data=0):
2118 """
2119 Create an extra chunk that uses up the unused data in the science segment.
2120 @param length: length of chunk in seconds.
2121 @param trig_overlap: length of time start generating triggers before the
2122 start of the unused data.
2123 @param play:
2124 - 1 : only generate chunks that overlap with S2 playground data.
2125 - 2 : as 1 plus compute trig start and end times to coincide
2126 with the start/end of the playground
2127 @param min_length: the unused data must be greater than min_length to make a
2128 chunk.
2129 @param sl: slide by sl seconds before determining playground data.
2130 @param excl_play: exclude the first excl_play second from the start and end
2131 of the chunk when computing if the chunk overlaps with playground.
2132 @param pad_data: exclude the first and last pad_data seconds of the segment
2133 when generating chunks
2135 """
2136 for seg in self.__sci_segs:
2137 # if there is unused data longer than the minimum chunk length
2138 if seg.unused() > min_length:
2139 end = seg.end() - pad_data
2140 start = end - length
2141 if (not play) or (play and (((end - sl - excl_play - 729273613) % 6370)
2142 < (600 + length - 2 * excl_play))):
2143 trig_start = end - seg.unused() - trig_overlap
2144 if (play == 2):
2145 # calculate the start of the playground preceeding the chunk end
2146 play_start = 729273613 + 6370 * \
2147 math.floor((end - sl - excl_play - 729273613) / 6370)
2148 play_end = play_start + 600
2149 trig_end = 0
2150 if ( (play_end - 6370) > start ):
2151 print("Two playground segments in this chunk")
2152 print(" Code to handle this case has not been implemented")
2153 sys.exit(1)
2154 else:
2155 if play_start > trig_start:
2156 trig_start = int(play_start)
2157 if (play_end < end):
2158 trig_end = int(play_end)
2159 if (trig_end == 0) or (trig_end > trig_start):
2160 seg.add_chunk(start, end, trig_start, trig_end)
2161 else:
2162 seg.add_chunk(start, end, trig_start)
2163 seg.set_unused(0)
2164
2166 self,min_length,overlap=0,play=0,sl=0,excl_play=0):
2167 """
2168 Create a chunk that uses up the unused data in the science segment
2169 @param min_length: the unused data must be greater than min_length to make a
2170 chunk.
2171 @param overlap: overlap between chunks in seconds.
2172 @param play: if true, only generate chunks that overlap with S2 playground data.
2173 @param sl: slide by sl seconds before determining playground data.
2174 @param excl_play: exclude the first excl_play second from the start and end
2175 of the chunk when computing if the chunk overlaps with playground.
2176 """
2177 for seg in self.__sci_segs:
2178 if seg.unused() > min_length:
2179 start = seg.end() - seg.unused() - overlap
2180 end = seg.end()
2181 length = start - end
2182 if (not play) or (play and (((end - sl - excl_play - 729273613) % 6370)
2183 < (600 + length - 2 * excl_play))):
2184 seg.add_chunk(start, end, start)
2185 seg.set_unused(0)
2186
2187 def make_optimised_chunks(self, min_length, max_length, pad_data=0):
2188 """
2189 Splits ScienceSegments up into chunks, of a given maximum length.
2190 The length of the last two chunks are chosen so that the data
2191 utilisation is optimised.
2192 @param min_length: minimum chunk length.
2193 @param max_length: maximum chunk length.
2194 @param pad_data: exclude the first and last pad_data seconds of the
2195 segment when generating chunks
2196 """
2197 for seg in self.__sci_segs:
2198 # pad data if requested
2199 seg_start = seg.start() + pad_data
2200 seg_end = seg.end() - pad_data
2201
2202 if seg.unused() > max_length:
2203 # get number of max_length chunks
2204 N = (seg_end - seg_start) / max_length
2205
2206 # split into chunks of max_length
2207 for i in range(N - 1):
2208 start = seg_start + (i * max_length)
2209 stop = start + max_length
2210 seg.add_chunk(start, stop)
2211
2212 # optimise data usage for last 2 chunks
2213 start = seg_start + ((N - 1) * max_length)
2214 middle = (start + seg_end) / 2
2215 seg.add_chunk(start, middle)
2216 seg.add_chunk(middle, seg_end)
2217 seg.set_unused(0)
2218 elif seg.unused() > min_length:
2219 # utilise as single chunk
2220 seg.add_chunk(seg_start, seg_end)
2221 else:
2222 # no chunk of usable length
2223 seg.set_unused(0)
2224
2225 def intersection(self, other):
2226 """
2227 Replaces the ScienceSegments contained in this instance of ScienceData
2228 with the intersection of those in the instance other. Returns the number
2229 of segments in the intersection.
2230 @param other: ScienceData to use to generate the intersection
2231 """
2232
2233 # initialize list of output segments
2234 ostart = -1
2235 outlist = []
2236 iseg2 = -1
2237 start2 = -1
2238 stop2 = -1
2239
2240 for seg1 in self:
2241 start1 = seg1.start()
2242 stop1 = seg1.end()
2243 id = seg1.id()
2244
2245 # loop over segments from the second list which overlap this segment
2246 while start2 < stop1:
2247 if stop2 > start1:
2248 # these overlap
2249
2250 # find the overlapping range
2251 if start1 < start2:
2252 ostart = start2
2253 else:
2254 ostart = start1
2255 if stop1 > stop2:
2256 ostop = stop2
2257 else:
2258 ostop = stop1
2259
2260 x = ScienceSegment(tuple([id, ostart, ostop, ostop - ostart]))
2261 outlist.append(x)
2262
2263 if stop2 > stop1:
2264 break
2265
2266 # step forward
2267 iseg2 += 1
2268 if iseg2 < len(other):
2269 seg2 = other[iseg2]
2270 start2 = seg2.start()
2271 stop2 = seg2.end()
2272 else:
2273 # pseudo-segment in the far future
2274 start2 = 2000000000
2275 stop2 = 2000000000
2276
2277 # save the intersection and return the length
2278 self.__sci_segs = outlist
2279 return len(self)
2280
2281 def union(self, other):
2282 """
2283 Replaces the ScienceSegments contained in this instance of ScienceData
2284 with the union of those in the instance other. Returns the number of
2285 ScienceSegments in the union.
2286 @param other: ScienceData to use to generate the intersection
2287 """
2288
2289 # we only deal with the case of two lists here
2290 length1 = len(self)
2291 length2 = len(other)
2292
2293 # initialize list of output segments
2294 ostart = -1
2295 seglist = []
2296
2297 i1 = -1
2298 i2 = -1
2299 start1 = -1
2300 start2 = -1
2301 id = -1
2302
2303 while 1:
2304 # if necessary, get a segment from list 1
2305 if start1 == -1:
2306 i1 += 1
2307 if i1 < length1:
2308 start1 = self[i1].start()
2309 stop1 = self[i1].end()
2310 id = self[i1].id()
2311 elif i2 == length2:
2312 break
2313
2314 # if necessary, get a segment from list 2
2315 if start2 == -1:
2316 i2 += 1
2317 if i2 < length2:
2318 start2 = other[i2].start()
2319 stop2 = other[i2].end()
2320 elif i1 == length1:
2321 break
2322
2323 # pick the earlier segment from the two lists
2324 if start1 > -1 and ( start2 == -1 or start1 <= start2):
2325 ustart = start1
2326 ustop = stop1
2327 # mark this segment has having been consumed
2328 start1 = -1
2329 elif start2 > -1:
2330 ustart = start2
2331 ustop = stop2
2332 # mark this segment has having been consumed
2333 start2 = -1
2334 else:
2335 break
2336
2337 # if the output segment is blank, initialize it; otherwise, see
2338 # whether the new segment extends it or is disjoint
2339 if ostart == -1:
2340 ostart = ustart
2341 ostop = ustop
2342 elif ustart <= ostop:
2343 if ustop > ostop:
2344 # this extends the output segment
2345 ostop = ustop
2346 else:
2347 # This lies entirely within the current output segment
2348 pass
2349 else:
2350 # flush the current output segment, and replace it with the
2351 # new segment
2352 x = ScienceSegment(tuple([id, ostart, ostop, ostop - ostart]))
2353 seglist.append(x)
2354 ostart = ustart
2355 ostop = ustop
2356
2357 # flush out the final output segment (if any)
2358 if ostart != -1:
2359 x = ScienceSegment(tuple([id, ostart, ostop, ostop - ostart]))
2360 seglist.append(x)
2361
2362 self.__sci_segs = seglist
2363 return len(self)
2364
2365 def coalesce(self):
2366 """
2367 Coalesces any adjacent ScienceSegments. Returns the number of
2368 ScienceSegments in the coalesced list.
2369 """
2370
2371 # check for an empty list
2372 if len(self) == 0:
2373 return 0
2374
2375 # sort the list of science segments
2376 self.__sci_segs.sort()
2377
2378 # coalesce the list, checking each segment for validity as we go
2379 outlist = []
2380 ostop = -1
2381
2382 for seg in self:
2383 start = seg.start()
2384 stop = seg.end()
2385 id = seg.id()
2386 if start > ostop:
2387 # disconnected, so flush out the existing segment (if any)
2388 if ostop >= 0:
2389 # the following line produces a flake8 issue with ostart; see https://git.ligo.org/lscsoft/glue/-/issues/37
2390 x = ScienceSegment(tuple([id, ostart, ostop, ostop - ostart])) # noqa: F821
2391 outlist.append(x)
2392 ostart = start # noqa: F841
2393 ostop = stop # noqa: F841
2394 elif stop > ostop:
2395 # extend the current segment
2396 ostop = stop
2397
2398 # flush out the final segment (if any)
2399 if ostop >= 0:
2400 x = ScienceSegment(tuple([id, ostart, ostop, ostop - ostart]))
2401 outlist.append(x)
2402
2403 self.__sci_segs = outlist
2404 return len(self)
2405
2406 def invert(self):
2407 """
2408 Inverts the ScienceSegments in the class (i.e. set NOT). Returns the
2409 number of ScienceSegments after inversion.
2410 """
2411
2412 # check for an empty list
2413 if len(self) == 0:
2414 # return a segment representing all time
2415 self.__sci_segs = ScienceSegment(tuple([0,0,1999999999,1999999999]))
2416
2417 # go through the list checking for validity as we go
2418 outlist = []
2419 ostart = 0
2420 for seg in self:
2421 start = seg.start()
2422 stop = seg.end()
2423 if start < 0 or stop < start or start < ostart:
2424 raise SegmentError("Invalid list")
2425 if start > 0:
2426 x = ScienceSegment(tuple([0, ostart, start, start - ostart]))
2427 outlist.append(x)
2428 ostart = stop
2429
2430 if ostart < 1999999999:
2431 x = ScienceSegment(tuple([0, ostart, 1999999999, 1999999999 - ostart]))
2432 outlist.append(x)
2433
2434 self.__sci_segs = outlist
2435 return len(self)
2436
2437 def play(self):
2438 """
2439 Keep only times in ScienceSegments which are in the playground
2440 """
2441
2442 # initialize list of output segments
2443 ostart = -1
2444 outlist = []
2445 begin_s2 = 729273613
2446 play_space = 6370
2447 play_len = 600
2448
2449 for seg in self:
2450 start = seg.start()
2451 stop = seg.end()
2452 id = seg.id()
2453
2454 # select first playground segment which ends after start of seg
2455 play_start = begin_s2 + play_space * ( 1
2456 + int((start - begin_s2 - play_len) / play_space) )
2457
2458 while play_start < stop:
2459 if play_start > start:
2460 ostart = play_start
2461 else:
2462 ostart = start
2463
2464 play_stop = play_start + play_len
2465
2466 if play_stop < stop:
2467 ostop = play_stop
2468 else:
2469 ostop = stop
2470
2471 x = ScienceSegment(tuple([id, ostart, ostop, ostop - ostart]))
2472 outlist.append(x)
2473
2474 # step forward
2475 play_start = play_start + play_space
2476
2477 # save the playground segs and return the length
2478 self.__sci_segs = outlist
2479 return len(self)
2480
2481 def intersect_3(self, second, third):
2482 """
2483 Intersection routine for three inputs. Built out of the intersect,
2484 coalesce and play routines
2485 """
2486 self.intersection(second)
2487 self.intersection(third)
2488 self.coalesce()
2489 return len(self)
2490
2491 def intersect_4(self, second, third, fourth):
2492 """
2493 Intersection routine for four inputs.
2494 """
2495 self.intersection(second)
2496 self.intersection(third)
2497 self.intersection(fourth)
2498 self.coalesce()
2499 return len(self)
2500
2501 def split(self, dt):
2502 """
2503 Split the segments in the list is subsegments at least as long as dt
2504 """
2505 outlist = []
2506 for seg in self:
2507 start = seg.start()
2508 stop = seg.end()
2509 id = seg.id()
2510
2511 while start < stop:
2512 tmpstop = start + dt
2513 if tmpstop > stop:
2514 tmpstop = stop
2515 elif tmpstop + dt > stop:
2516 tmpstop = int( (start + stop) / 2 )
2517 x = ScienceSegment(tuple([id, start, tmpstop, tmpstop - start]))
2518 outlist.append(x)
2519 start = tmpstop
2520
2521 # save the split list and return length
2522 self.__sci_segs = outlist
2523 return len(self)
2524
2525
2526class LsyncCache(object):
2527 def __init__(self,path):
2528 # location of the cache file
2529 self.__path = path
2530
2531 # dictionary where the keys are data types like 'gwf', 'sft', 'xml'
2532 # and the values are dictionaries
2533 self.cache = {'gwf': None, 'sft': None, 'xml': None}
2534
2535 # for each type create a dictionary where keys are sites and values
2536 # are dictionaries
2537 for type in self.cache.keys():
2538 self.cache[type] = {}
2539
2540 def group(self, lst, n):
2541 """
2542 Group an iterable into an n-tuples iterable. Incomplete
2543 tuples are discarded
2544 """
2545 return itertools.izip(*[itertools.islice(lst, i, None, n) for i in range(n)])
2546
2547 def parse(self,type_regex=None):
2548 """
2549 Each line of the frame cache file is like the following:
2550
2551 /frames/E13/LHO/frames/hoftMon_H1/H-H1_DMT_C00_L2-9246,H,H1_DMT_C00_L2,1,16 1240664820 6231 {924600000 924646720 924646784 924647472 924647712 924700000}
2552
2553 The description is as follows:
2554
2555 1.1) Directory path of files
2556 1.2) Site
2557 1.3) Type
2558 1.4) Number of frames in the files (assumed to be 1)
2559 1.5) Duration of the frame files.
2560
2561 2) UNIX timestamp for directory modification time.
2562
2563 3) Number of files that that match the above pattern in the directory.
2564
2565 4) List of time range or segments [start, stop)
2566
2567 We store the cache for each site and frameType combination
2568 as a dictionary where the keys are (directory, duration)
2569 tuples and the values are segment lists.
2570
2571 Since the cache file is already coalesced we do not
2572 have to call the coalesce method on the segment lists.
2573 """
2574 path = self.__path
2575 cache = self.cache
2576 if type_regex:
2577 type_filter = re.compile(type_regex)
2578 else:
2579 type_filter = None
2580
2581 f = open(path, 'r')
2582
2583 # holds this iteration of the cache
2584 gwfDict = {}
2585
2586 # parse each line in the cache file
2587 for line in f:
2588 # ignore lines that don't match the regex
2589 if type_filter and type_filter.search(line) is None:
2590 continue
2591
2592 # split on spaces and then comma to get the parts
2593 header, modTime, fileCount, times = line.strip().split(' ', 3)
2594 dir, site, frameType, frameCount, duration = header.split(',')
2595 duration = int(duration)
2596
2597 # times string has form { t1 t2 t3 t4 t5 t6 ... tN t(N+1) }
2598 # where the (ti, t(i+1)) represent segments
2599 #
2600 # first turn the times string into a list of integers
2601 times = [ int(s) for s in times[1:-1].split(' ') ]
2602
2603 # group the integers by two and turn those tuples into segments
2604 segs = [ segments.segment(a) for a in self.group(times, 2) ]
2605
2606 # initialize if necessary for this site
2607 if site not in gwfDict:
2608 gwfDict[site] = {}
2609
2610 # initialize if necessary for this frame type
2611 if frameType not in gwfDict[site]:
2612 gwfDict[site][frameType] = {}
2613
2614 # record segment list as value indexed by the (directory, duration) tuple
2615 key = (dir, duration)
2616 if key in gwfDict[site][frameType]:
2617 msg = "The combination %s is not unique in the frame cache file" \
2618 % str(key)
2619 raise RuntimeError(msg)
2620
2621 gwfDict[site][frameType][key] = segments.segmentlist(segs)
2622 f.close()
2623
2624 cache['gwf'] = gwfDict
2625
2626 def get_lfns(self, site, frameType, gpsStart, gpsEnd):
2627 """
2628 """
2629 # get the cache from the manager
2630 cache = self.cache
2631
2632 # if the cache does not contain any mappings for this site type return empty list
2633 if site not in cache['gwf']:
2634 return []
2635
2636 # if the cache does nto contain any mappings for this frame type return empty list
2637 if frameType not in cache['gwf'][site]:
2638 return []
2639
2640 # segment representing the search interval
2641 search = segments.segment(gpsStart, gpsEnd)
2642
2643 # segment list representing the search interval
2644 searchlist = segments.segmentlist([search])
2645
2646 # dict of LFNs returned that match the metadata query
2647 lfnDict = {}
2648
2649 for key,seglist in cache['gwf'][site][frameType].items():
2650 dir, dur = key
2651
2652 # see if the seglist overlaps with our search
2653 overlap = seglist.intersects(searchlist)
2654
2655 if not overlap: continue
2656
2657 # the seglist does overlap with search so build file paths
2658 # but reject those outside of the search segment
2659
2660 for s in seglist:
2661 if s.intersects(search):
2662 t1, t2 = s
2663 times = range(t1, t2, dur)
2664
2665 # loop through the times and create paths
2666 for t in times:
2667 if search.intersects(segments.segment(t, t + dur)):
2668 lfn = "%s-%s-%d-%d.gwf" % (site, frameType, t, dur)
2669 lfnDict[lfn] = None
2670
2671 # sort the LFNs to deliver URLs in GPS order
2672 lfns = list(lfnDict.keys())
2673 lfns.sort()
2674
2675 return lfns
2676
2677
2679 """
2680 An LSCdataFind job used to locate data. The static options are
2681 read from the section [datafind] in the ini file. The stdout from
2682 LSCdataFind contains the paths to the frame files and is directed to a file
2683 in the cache directory named by site and GPS start and end times. The stderr
2684 is directed to the logs directory. The job always runs in the scheduler
2685 universe. The path to the executable is determined from the ini file.
2686 """
2687 def __init__(self,cache_dir,log_dir,config_file,lsync_cache_file=None,lsync_type_regex=None):
2688 """
2689 @param cache_dir: the directory to write the output lal cache files to.
2690 @param log_dir: the directory to write the stderr file to.
2691 @param config_file: ConfigParser object containing the path to the LSCdataFind executable in the [condor] section and a [datafind] section from which the LSCdataFind options are read.
2692 @param lsync_cache_file: lsync_cache_file
2693 @param lsync_type_regex: lsync_type_regex
2694 """
2695 self.__executable__executable = config_file.get('condor','datafind')
2696 self.__universe__universe = 'local'
2697 CondorDAGJob.__init__(self,self.__universe__universe,self.__executable__executable)
2698 AnalysisJob.__init__(self,config_file)
2699 self.__cache_dir = cache_dir
2700 self.__config_file = config_file
2701 self.__lsync_cache = None
2702 if config_file.has_option('condor','accounting_group'):
2703 self.add_condor_cmd('accounting_group',config_file.get('condor','accounting_group'))
2704 if lsync_cache_file:
2705 self.__lsync_cache = LsyncCache(lsync_cache_file)
2706 self.__lsync_cache.parse(lsync_type_regex)
2707
2708 # we have to do this manually for backwards compatibility with type
2709 for o in self.__config_file.options('datafind'):
2710 opt = str(o).strip()
2711 if opt[:4] != "type":
2712 arg = str(self.__config_file.get('datafind',opt)).strip()
2713 self.add_opt(opt,arg)
2714
2715 # we need a lal cache for file PFNs
2716 self.add_opt('lal-cache','')
2717 self.add_opt('url-type','file')
2718
2719 self.add_condor_cmd('getenv','True')
2720
2721 self.set_stderr_file(os.path.join(log_dir, 'datafind-$(macroobservatory)-$(macrotype)-$(macrogpsstarttime)-$(macrogpsendtime)-$(cluster)-$(process).err'))
2722 self.set_stdout_file(os.path.join(log_dir, 'datafind-$(macroobservatory)-$(macrotype)-$(macrogpsstarttime)-$(macrogpsendtime)-$(cluster)-$(process).out'))
2723 self.set_sub_file('datafind.sub')
2724
2725 def get_cache_dir(self):
2726 """
2727 returns the directroy that the cache files are written to.
2728 """
2729 return self.__cache_dir
2730
2731 def get_config_file(self):
2732 """
2733 return the configuration file object
2734 """
2735 return self.__config_file
2736
2737 def lsync_cache(self):
2738 return self.__lsync_cache
2739
2740
2742 """
2743 A DataFindNode runs an instance of LSCdataFind in a Condor DAG.
2744 """
2745 def __init__(self,job):
2746 """
2747 @param job: A CondorDAGJob that can run an instance of LALdataFind.
2748 """
2749 CondorDAGNode.__init__(self,job)
2750 AnalysisNode.__init__(self)
2751 self.__start__start = 0
2752 self.__end__end = 0
2753 self.__observatory = None
2754 self.__output__output = None
2755 self.__job__job = job
2756 self.__lfn_list = None
2757
2758 # try and get a type from the ini file and default to type None
2759 try:
2760 self.set_type(self.job().get_config_file().get('datafind','type'))
2761 except:
2762 self.__type = None
2763
2764 def __set_output(self):
2765 """
2766 Private method to set the file to write the cache to. Automaticaly set
2767 once the ifo, start and end times have been set.
2768 """
2769 if self.__start__start and self.__end__end and self.__observatory and self.__type:
2770 self.__output__output = os.path.join(self.__job__job.get_cache_dir(), self.__observatory + '-' + self.__type + '_CACHE' + '-' + str(self.__start__start) + '-' + str(self.__end__end - self.__start__start) + '.lcf')
2771 self.set_output(self.__output__output)
2772
2773 def set_start(self, time, pad=None):
2774 """
2775 Set the start time of the datafind query.
2776 @param time: GPS start time of query.
2777 @param pad: pad
2778 """
2779 if pad:
2780 self.add_var_opt('gps-start-time', int(time) - int(pad))
2781 else:
2782 self.add_var_opt('gps-start-time', int(time))
2783 self.__start__start = time
2784 self.__set_output()
2785
2786 def get_start(self):
2787 """
2788 Return the start time of the datafind query
2789 """
2790 return self.__start__start
2791
2792 def set_end(self,time):
2793 """
2794 Set the end time of the datafind query.
2795 @param time: GPS end time of query.
2796 """
2797 self.add_var_opt('gps-end-time', time)
2798 self.__end__end = time
2799 self.__set_output()
2800
2801 def get_end(self):
2802 """
2803 Return the start time of the datafind query
2804 """
2805 return self.__end__end
2806
2807 def set_observatory(self,obs):
2808 """
2809 Set the IFO to retrieve data for. Since the data from both Hanford
2810 interferometers is stored in the same frame file, this takes the first
2811 letter of the IFO (e.g. L or H) and passes it to the --observatory option
2812 of LSCdataFind.
2813 @param obs: IFO to obtain data for.
2814 """
2815 self.add_var_opt('observatory',obs)
2816 self.__observatory = str(obs)
2817 self.__set_output()
2818
2819 def get_observatory(self):
2820 """
2821 Return the start time of the datafind query
2822 """
2823 return self.__observatory
2824
2825 def set_type(self,type):
2826 """
2827 sets the frame type that we are querying
2828 """
2829 self.add_var_opt('type',str(type))
2830 self.__type = str(type)
2831 self.__set_output()
2832
2833 def get_type(self):
2834 """
2835 gets the frame type that we are querying
2836 """
2837 return self.__type
2838
2840 return self.__output__output
2841
2842 def get_output(self):
2843 """
2844 Return the output file, i.e. the file containing the frame cache data.
2845 or the files itself as tuple (for DAX)
2846 """
2847 return self.__output__output
2848
2849
2851 """
2852 A ligolw_add job can be used to concatenate several ligo lw files
2853 """
2854 def __init__(self,log_dir,cp):
2855 """
2856 cp = ConfigParser object from which options are read.
2857 """
2858 self.__executable__executable = cp.get('condor','ligolw_add')
2859 self.__universe__universe = 'vanilla'
2860 CondorDAGJob.__init__(self,self.__universe__universe,self.__executable__executable)
2861 AnalysisJob.__init__(self,cp)
2862 self.add_ini_opts(cp, "ligolw_add")
2863
2864 self.add_condor_cmd('getenv','True')
2865 if cp.has_option('condor','accounting_group'):
2866 self.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
2867
2868 self.set_stdout_file(os.path.join( log_dir, 'ligolw_add-$(cluster)-$(process).out') )
2869 self.set_stderr_file(os.path.join( log_dir, 'ligolw_add-$(cluster)-$(process).err') )
2870 self.set_sub_file('ligolw_add.sub')
2871
2872
2874 """
2875 Runs an instance of ligolw_add in a Condor DAG.
2876 """
2877 def __init__(self,job):
2878 """
2879 @param job: A CondorDAGJob that can run an instance of ligolw_add
2880 """
2881 CondorDAGNode.__init__(self,job)
2882 AnalysisNode.__init__(self)
2883
2884
2886 """
2887 A ligolw_cut job can be used to remove parts of a ligo lw file
2888 """
2889 def __init__(self,log_dir,cp):
2890 """
2891 cp = ConfigParser object from which options are read.
2892 """
2893 self.__executable__executable = cp.get('condor','ligolw_cut')
2894 self.__universe__universe = 'vanilla'
2895 CondorDAGJob.__init__(self,self.__universe__universe,self.__executable__executable)
2896 AnalysisJob.__init__(self,cp)
2897
2898 self.add_condor_cmd('getenv','True')
2899
2900 self.set_stdout_file(os.path.join( log_dir, 'ligolw_cut-$(cluster)-$(process).out') )
2901 self.set_stderr_file(os.path.join( log_dir, 'ligolw_cut-$(cluster)-$(process).err') )
2902 self.set_sub_file('ligolw_cut.sub')
2903
2904
2906 """
2907 Runs an instance of ligolw_cut in a Condor DAG.
2908 """
2909 def __init__(self,job):
2910 """
2911 @param job: A CondorDAGJob that can run an instance of ligolw_cut
2912 """
2913 CondorDAGNode.__init__(self,job)
2914 AnalysisNode.__init__(self)
2915
2916
2918 """
2919 A Noop Job does nothing.
2920 """
2921 def __init__(self,log_dir,cp):
2922 """
2923 cp = ConfigParser object from which options are read.
2924 """
2925 self.__executable__executable = 'true'
2926 self.__universe__universe = 'local'
2927 CondorDAGJob.__init__(self,self.__universe__universe,self.__executable__executable)
2928 AnalysisJob.__init__(self,cp)
2929
2930 self.add_condor_cmd('getenv','True')
2931 self.add_condor_cmd('noop_job','True')
2932 if cp.has_option('condor','accounting_group'):
2933 self.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
2934
2935 self.set_stdout_file(os.path.join( log_dir, 'noop-$(cluster)-$(process).out') )
2936 self.set_stderr_file(os.path.join( log_dir, 'noop-$(cluster)-$(process).err') )
2937 self.set_sub_file('noop.sub')
2938
2939
2941 """
2942 Run an noop job in a Condor DAG.
2943 """
2944 def __init__(self,job):
2945 """
2946 @param job: A CondorDAGJob that does nothing.
2947 """
2948 CondorDAGNode.__init__(self,job)
2949 AnalysisNode.__init__(self)
2950 self.__server = None
2951 self.__identity = None
2952 self.__insert = None
2953 self.__pfn = None
2954 self.__query = None
2955
2956
2958 """
2959 A cbc sqlite job adds to CondorDAGJob and AnalysisJob features common to jobs
2960 which read or write to a sqlite database. Of note, the universe is always set to
2961 local regardless of what's in the cp file, the extension is set
2962 to None so that it may be set by individual SqliteNodes, log files do not
2963 have macrogpsstarttime and endtime in them, and get_env is set to True.
2964 """
2965 def __init__(self, cp, sections, exec_name):
2966 """
2967 @param cp: a ConfigParser object from which options are read
2968 @param sections: list of sections in cp to get added options
2969 @param exec_name: the name of the sql executable
2970 """
2971 self.__exec_name = exec_name
2972 executable = cp.get('condor', exec_name)
2973 universe = 'vanilla'
2974 CondorDAGJob.__init__(self, universe, executable)
2975 AnalysisJob.__init__(self, cp)
2976
2977 for sec in sections:
2978 if cp.has_section(sec):
2979 self.add_ini_opts(cp, sec)
2980 else:
2981 sys.stderr.write("warning: config file is missing section [" + sec + "]\n")
2982
2983 self.add_condor_cmd('getenv', 'True')
2984 if cp.has_option('condor','accounting_group'):
2985 self.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
2986 self.set_stdout_file('logs/' + exec_name + '-$(cluster)-$(process).out')
2987 self.set_stderr_file('logs/' + exec_name + '-$(cluster)-$(process).err')
2988
2989 def set_exec_name(self, exec_name):
2990 """
2991 Set the exec_name name
2992 """
2993 self.__exec_name = exec_name
2994
2995 def get_exec_name(self):
2996 """
2997 Get the exec_name name
2998 """
2999 return self.__exec_name
3000
3001
3003 """
3004 A cbc sqlite node adds to the standard AnalysisNode features common to nodes
3005 which read or write to a sqlite database. Specifically, it adds the set_tmp_space_path
3006 and set_database methods.
3007 """
3008 def __init__(self, job):
3009 """
3010 @param job: an Sqlite job
3011 """
3012 CondorDAGNode.__init__(self, job)
3013 AnalysisNode.__init__(self)
3014 self.__tmp_space = None
3015 self.__database = None
3016
3017 def set_tmp_space(self, tmp_space):
3018 """
3019 Sets temp-space path. This should be on a local disk.
3020 @param tmp_space: tmp_space
3021 """
3022 self.add_var_opt('tmp-space', tmp_space)
3023 self.__tmp_space = tmp_space
3024
3025 def get_tmp_space(self):
3026 """
3027 Gets tmp-space path.
3028 """
3029 return self.__tmp_space
3030
3031 def set_database(self, database):
3032 """
3033 Sets database option.
3034 @param database: database
3035 """
3036 self.add_file_opt('database', database)
3037 self.__database = database
3038
3039 def get_database(self):
3040 """
3041 Gets database option.
3042 """
3043 return self.__database
3044
3045
3047 """
3048 A LigolwSqlite job. The static options are read from the
3049 section [ligolw_sqlite] in the ini file.
3050 """
3051 def __init__(self, cp):
3052 """
3053 @param cp: ConfigParser object from which options are read.
3054 """
3055 exec_name = 'ligolw_sqlite'
3056 sections = ['ligolw_sqlite']
3057 super(LigolwSqliteJob,self).__init__(cp, sections, exec_name)
3058
3059 def set_replace(self):
3060 """
3061 Sets the --replace option. This will cause the job
3062 to overwrite existing databases rather than add to them.
3063 """
3064 self.add_opt('replace','')
3065
3066
3068 """
3069 A LigolwSqlite node.
3070 """
3071 def __init__(self, job):
3072 """
3073 @param job: a LigolwSqliteJob
3074 """
3075 super(LigolwSqliteNode,self).__init__(job)
3076 self.__input_cache = None
3077 self.__xml_output = None
3078 self.__xml_input = None
3079
3080 def set_input_cache(self, input_cache):
3081 """
3082 Sets input cache.
3083 @param input_cache: input_cache
3084 """
3085 self.add_file_opt('input-cache', input_cache)
3086 self.__input_cache = input_cache
3087
3088 def get_input_cache(self):
3089 """
3090 Gets input cache.
3091 """
3092 return self.__input_cache
3093
3094 def set_xml_input(self, xml_file):
3095 """
3096 Sets xml input file instead of cache
3097 @param xml_file: xml_file
3098 """
3099 self.add_var_arg(xml_file)
3100
3101 def set_xml_output(self, xml_file):
3102 """
3103 Tell ligolw_sqlite to dump the contents of the database to a file.
3104 @param xml_file: xml_file
3105 """
3106 if self.get_database() is None:
3107 raise ValueError("no database specified")
3108 self.add_file_opt('extract', xml_file)
3109 self.__xml_output = xml_file
3110
3111 def get_output(self):
3112 """
3113 Override standard get_output to return xml-file if xml-file is specified.
3114 Otherwise, will return database.
3115 """
3116 if self.__xml_output:
3117 return self.__xml_output
3118 elif self.get_database():
3119 return self.get_database()
3120 else:
3121 raise ValueError("no output xml file or database specified")
3122
3123
3124class DeepCopyableConfigParser(configparser.ConfigParser):
3125 """
3126 The standard SafeConfigParser no longer supports deepcopy() as of python
3127 2.7 (see http://bugs.python.org/issue16058). This subclass restores that
3128 functionality.
3129 """
3130 def __deepcopy__(self, memo):
3131 # http://stackoverflow.com/questions/23416370
3132 # /manually-building-a-deep-copy-of-a-configparser-in-python-2-7
3133 from io import StringIO
3134 config_string = StringIO()
3135 self.write(config_string)
3136 config_string.seek(0)
3137 new_config = self.__class__()
3138 new_config.readfp(config_string)
3139 return new_config
static int cmp(REAL4Sequence *a, REAL4Sequence *b)
Definition: SequenceTest.c:62
An AnalysisChunk is the unit of data that a node works with, usually some subset of a ScienceSegment.
Definition: pipeline.py:1749
def dur(self)
Returns the length (duration) of the chunk in seconds.
Definition: pipeline.py:1810
def set_trig_end(self, end)
Set the last GPS time at which triggers for this chunk should be generated.
Definition: pipeline.py:1838
def end(self)
Returns the GPS end time of the chunk.
Definition: pipeline.py:1804
def set_trig_start(self, start)
Set the first GPS time at which triggers for this chunk should be generated.
Definition: pipeline.py:1831
def __len__(self)
Returns the length of data for which this AnalysisChunk will produce triggers (in seconds).
Definition: pipeline.py:1780
def trig_start(self)
Return the first GPS time at which triggers for this chunk should be generated.
Definition: pipeline.py:1817
def start(self)
Returns the GPS start time of the chunk.
Definition: pipeline.py:1798
def __init__(self, start, end, trig_start=0, trig_end=0)
Definition: pipeline.py:1756
def trig_end(self)
Return the last GPS time at which triggers for this chunk should be generated.
Definition: pipeline.py:1824
Describes a generic analysis job that filters LIGO data as configured by an ini file.
Definition: pipeline.py:1415
def set_channel(self, channel)
Set the name of the channel that this job is filtering.
Definition: pipeline.py:1439
def channel(self)
Returns the name of the channel that this job is filtering.
Definition: pipeline.py:1447
def __init__(self, cp)
Definition: pipeline.py:1419
def get_config(self, sec, opt)
Get the configration variable in a particular section of this jobs ini file.
Definition: pipeline.py:1432
Contains the methods that allow an object to be built to analyse LIGO data in a Condor DAG.
Definition: pipeline.py:1455
def set_cache(self, filename)
Set the LAL frame cache to to use.
Definition: pipeline.py:1672
def set_data_start(self, time)
Set the GPS start time of the data needed by this analysis node.
Definition: pipeline.py:1515
def set_input(self, filename, pass_to_command_line=True)
Add an input to the node by adding a –input option.
Definition: pipeline.py:1589
def set_trig_start(self, time, pass_to_command_line=True)
Set the trig start time of the analysis node by setting a –trig-start-time option to the node when it...
Definition: pipeline.py:1556
def set_user_tag(self, usertag, pass_to_command_line=True)
Set the user tag that is passed to the analysis code.
Definition: pipeline.py:1656
def get_end(self)
Get the GPS end time of the node.
Definition: pipeline.py:1508
def get_trig_start(self)
Get the trig start time of the node.
Definition: pipeline.py:1564
def get_ifo_tag(self)
Returns the IFO tag string.
Definition: pipeline.py:1648
def get_calibration(self)
Return the calibration cache file to be used by the DAG.
Definition: pipeline.py:1741
def set_output(self, filename, pass_to_command_line=True)
Add an output to the node by adding a –output option.
Definition: pipeline.py:1606
def set_start(self, time, pass_to_command_line=True)
Set the GPS start time of the analysis node by setting a –gps-start-time option to the node when it i...
Definition: pipeline.py:1479
def get_user_tag(self)
Returns the usertag string.
Definition: pipeline.py:1664
def set_trig_end(self, time, pass_to_command_line=True)
Set the trig end time of the analysis node by setting a –trig-end-time option to the node when it is ...
Definition: pipeline.py:1573
def set_ifo_tag(self, ifo_tag, pass_to_command_line=True)
Set the ifo tag that is passed to the analysis code.
Definition: pipeline.py:1640
def calibration_cache_path(self)
Determine the path to the correct calibration cache file to use.
Definition: pipeline.py:1700
def get_start(self)
Get the GPS start time of the node.
Definition: pipeline.py:1490
def get_ifo(self)
Returns the two letter IFO code for this node.
Definition: pipeline.py:1632
def set_pad_data(self, pad)
Set the GPS start time of the data needed by this analysis node.
Definition: pipeline.py:1528
def set_end(self, time, pass_to_command_line=True)
Set the GPS end time of the analysis node by setting a –gps-end-time option to the node when it is ex...
Definition: pipeline.py:1499
def set_ifo(self, ifo)
Set the ifo name to analyze.
Definition: pipeline.py:1624
def calibration(self)
Set the path to the calibration cache file for the given IFO.
Definition: pipeline.py:1727
def get_input(self)
Get the file that will be passed as input.
Definition: pipeline.py:1598
def get_data_end(self)
Get the GPS end time of the data needed by this node.
Definition: pipeline.py:1547
def get_output(self)
Get the file that will be passed as output.
Definition: pipeline.py:1615
def get_data_start(self)
Get the GPS start time of the data needed by this node.
Definition: pipeline.py:1521
def set_data_end(self, time)
Set the GPS end time of the data needed by this analysis node.
Definition: pipeline.py:1541
def get_pad_data(self)
Get the GPS start time of the data needed by this node.
Definition: pipeline.py:1534
def get_trig_end(self)
Get the trig end time of the node.
Definition: pipeline.py:1581
A CondorDAG is a Condor Directed Acyclic Graph that describes a collection of Condor jobs and the ord...
Definition: pipeline.py:1245
def get_nodes(self)
Return a list containing all the nodes in the DAG.
Definition: pipeline.py:1262
def write_script(self)
Write the workflow to a script (.sh instead of .dag).
Definition: pipeline.py:1388
def add_maxjobs_category(self, categoryName, maxJobsNum)
Add a category to this DAG called categoryName with a maxjobs of maxJobsNum.
Definition: pipeline.py:1317
def set_integer_node_names(self)
Use integer node names for the DAG.
Definition: pipeline.py:1274
def write_maxjobs(self, fh, category)
Write the DAG entry for this category's maxjobs to the DAG file descriptor.
Definition: pipeline.py:1332
def set_dag_file(self, path)
Set the name of the file into which the DAG is written.
Definition: pipeline.py:1281
def add_node(self, node)
Add a CondorDAGNode to this DAG.
Definition: pipeline.py:1300
def get_maxjobs_categories(self)
Return an array of tuples containing (categoryName,maxJobsNum)
Definition: pipeline.py:1323
def __init__(self, log)
Definition: pipeline.py:1249
def write_concrete_dag(self)
Write all the nodes in the DAG to the DAG file.
Definition: pipeline.py:1349
def write_sub_files(self)
Write all the submit files used by the dag to disk.
Definition: pipeline.py:1339
def get_dag_file(self)
Return the path to the DAG file.
Definition: pipeline.py:1287
def get_jobs(self)
Return a list containing all the jobs in the DAG.
Definition: pipeline.py:1268
def write_dag(self)
Write a dag.
Definition: pipeline.py:1376
A Condor DAG job never notifies the user on completion and can have variable options that are set for...
Definition: pipeline.py:492
def get_grid_site(self)
Return the grid site for this node.
Definition: pipeline.py:529
def add_var_arg(self, arg_index, quote=False)
Add a command to the submit file to allow variable (macro) arguments to be passed to the executable.
Definition: pipeline.py:562
def add_var_condor_cmd(self, command)
Add a condor command to the submit file that allows variable (macro) arguments to be passes to the ex...
Definition: pipeline.py:552
def add_var_opt(self, opt, short=False)
Add a variable (or macro) option to the condor job.
Definition: pipeline.py:539
def create_node(self)
Create a condor node from this job.
Definition: pipeline.py:514
def set_grid_site(self, site)
Set the grid site to run on.
Definition: pipeline.py:521
def __init__(self, universe, executable)
universe = the condor universe to run the job in.
Definition: pipeline.py:497
Condor DAGMan job class.
Definition: pipeline.py:580
def set_dag_directory(self, dir)
Set the directory where the dag will be run.
Definition: pipeline.py:604
def set_notification(self, value)
Set the email address to send notification to.
Definition: pipeline.py:617
def get_sub_file(self)
Return the name of the dag as the submit file name for the SUBDAG EXTERNAL command in the uber-dag.
Definition: pipeline.py:624
def create_node(self)
Create a condor node from this job.
Definition: pipeline.py:597
def get_dag_directory(self)
Get the directory where the dag will be run.
Definition: pipeline.py:610
def get_dag(self)
Return the name of any associated dag file.
Definition: pipeline.py:637
def write_sub_file(self)
Do nothing as there is not need for a sub file with the SUBDAG EXTERNAL command in the uber-dag.
Definition: pipeline.py:631
def __init__(self, dag, dir=None)
dag = the name of the condor dag file to run dir = the diretory in which the dag file is located
Definition: pipeline.py:585
Condor DAGMan node class.
Definition: pipeline.py:1187
def get_user_tag(self)
Returns the usertag string.
Definition: pipeline.py:1207
def get_maxjobs_categories(self)
Return an array of tuples containing (categoryName,maxJobsNum)
Definition: pipeline.py:1221
def get_cluster_jobs(self)
Returns the usertag string.
Definition: pipeline.py:1234
def set_cluster_jobs(self, cluster)
Set the type of job clustering pegasus can use to collapse jobs.
Definition: pipeline.py:1228
def set_user_tag(self, usertag)
Set the user tag that is passed to the analysis code.
Definition: pipeline.py:1201
def add_maxjobs_category(self, categoryName, maxJobsNum)
Add a category to this DAG called categoryName with a maxjobs of maxJobsNum.
Definition: pipeline.py:1215
A CondorDAGNode represents a node in the DAG.
Definition: pipeline.py:647
def get_post_script_arg(self)
Returns and array of arguments to the post script that is executed before the DAG node is run.
Definition: pipeline.py:737
def set_priority(self, priority)
Set the priority for this node in the DAG.
Definition: pipeline.py:767
def set_vds_group(self, group)
Set the name of the VDS group key when generating a DAX.
Definition: pipeline.py:842
def add_var_arg(self, arg, quote=False)
Add a variable (or macro) argument to the condor job.
Definition: pipeline.py:954
def set_pre_script(self, script)
Sets the name of the pre script that is executed before the DAG node is run.
Definition: pipeline.py:701
def get_output_files(self)
Return list of output files for this DAG node and its job.
Definition: pipeline.py:823
def set_log_file(self, log)
Set the Condor log file to be used by this CondorJob.
Definition: pipeline.py:1090
def get_input_files(self)
Return list of input files for this DAG node and its job.
Definition: pipeline.py:814
def add_output_file(self, filename)
Add filename as a output file for this DAG node.
Definition: pipeline.py:793
def add_io_macro(self, io, filename)
Add a variable (macro) for storing the input/output files associated with this node.
Definition: pipeline.py:870
def __init__(self, job)
Definition: pipeline.py:651
def get_checkpoint_files(self)
Return a list of checkpoint files for this DAG node and its job.
Definition: pipeline.py:832
def add_input_macro(self, filename)
Add a variable (macro) for storing the input files associated with this node.
Definition: pipeline.py:883
def finalize(self)
The finalize method of a node is called before the node is finally added to the DAG and can be overri...
Definition: pipeline.py:1178
def get_cmd_tuple_list(self)
Return a list of tuples containg the command line arguments.
Definition: pipeline.py:1106
def get_retry(self)
Return the number of times that this node in the DAG should retry.
Definition: pipeline.py:988
def get_category(self)
Get the category for this node in the DAG.
Definition: pipeline.py:761
def write_parents(self, fh)
Write the parent/child relations for this job to the DAG file descriptor.
Definition: pipeline.py:1044
def set_retry(self, retry)
Set the number of times that this node in the DAG should retry.
Definition: pipeline.py:982
def write_input_files(self, fh)
Write as a comment into the DAG file the list of input files for this DAG node.
Definition: pipeline.py:1072
def add_output_macro(self, filename)
Add a variable (macro) for storing the output files associated with this node.
Definition: pipeline.py:891
def add_checkpoint_macro(self, filename)
Definition: pipeline.py:894
def get_args(self)
Return the arguments for this node.
Definition: pipeline.py:975
def add_var_condor_cmd(self, command, value)
Add a variable (macro) condor command for this node.
Definition: pipeline.py:914
def job(self)
Return the CondorJob that this node is associated with.
Definition: pipeline.py:693
def write_category(self, fh)
Write the DAG entry for this node's category to the DAG file descriptor.
Definition: pipeline.py:1013
def get_opts(self)
Return the opts for this node.
Definition: pipeline.py:902
def write_pre_script(self, fh)
Write the pre script for the job, if there is one.
Definition: pipeline.py:1052
def get_vds_group(self)
Returns the VDS group key for this node.
Definition: pipeline.py:848
def get_post_script(self)
returns the name of the post script that is executed before the DAG node is run.
Definition: pipeline.py:723
def add_file_opt(self, opt, filename, file_is_output_file=False)
Add a variable (macro) option for this node.
Definition: pipeline.py:942
def write_priority(self, fh)
Write the DAG entry for this node's priority to the DAG file descriptor.
Definition: pipeline.py:1020
def add_macro(self, name, value)
Add a variable (macro) for this node.
Definition: pipeline.py:860
def add_post_script_arg(self, arg)
Adds an argument to the post script that is executed before the DAG node is run.
Definition: pipeline.py:730
def write_vars(self, fh)
Write the variable (macro) options and arguments to the DAG file descriptor.
Definition: pipeline.py:1028
def set_name(self, name)
Set the name for this node in the DAG.
Definition: pipeline.py:743
def get_priority(self)
Get the priority for this node in the DAG.
Definition: pipeline.py:773
def write_post_script(self, fh)
Write the post script for the job, if there is one.
Definition: pipeline.py:1061
def write_job(self, fh)
Write the DAG entry for this node's job to the DAG file descriptor.
Definition: pipeline.py:995
def add_input_file(self, filename)
Add filename as a necessary input file for this DAG node.
Definition: pipeline.py:781
def set_post_script(self, script)
Sets the name of the post script that is executed before the DAG node is run.
Definition: pipeline.py:716
def add_file_arg(self, filename)
Add a variable (or macro) file name argument to the condor job.
Definition: pipeline.py:966
def add_checkpoint_file(self, filename)
Add filename as a checkpoint file for this DAG node.
Definition: pipeline.py:804
def get_cmd_line(self)
Return the full command line that will be used when this node is run by DAGman.
Definition: pipeline.py:1164
def write_output_files(self, fh)
Write as a comment into the DAG file the list of output files for this DAG node.
Definition: pipeline.py:1082
def set_category(self, category)
Set the category for this node in the DAG.
Definition: pipeline.py:755
def get_name(self)
Get the name for this node in the DAG.
Definition: pipeline.py:749
def add_pre_script_arg(self, arg)
Adds an argument to the pre script that is executed before the DAG node is run.
Definition: pipeline.py:708
def add_parent(self, node)
Add a parent to this node.
Definition: pipeline.py:1098
def add_var_opt(self, opt, value, short=False)
Add a variable (macro) option for this node.
Definition: pipeline.py:927
Error thrown by Condor Jobs.
Definition: pipeline.py:42
def __init__(self, args=None)
Definition: pipeline.py:43
Generic condor job class.
Definition: pipeline.py:76
def add_short_opt(self, opt, value)
Add a command line option to the executable.
Definition: pipeline.py:320
def get_universe(self)
Return the condor universe that the job will run in.
Definition: pipeline.py:121
def get_grid_scheduler(self)
Return the grid scheduler.
Definition: pipeline.py:160
def get_input_files(self)
Return list of input files for this DAG node.
Definition: pipeline.py:227
def set_stderr_file(self, path)
Set the file to which Condor directs the stderr of the job.
Definition: pipeline.py:371
def add_opt(self, opt, value)
Add a command line option to the executable.
Definition: pipeline.py:276
def get_stdout_file(self)
Get the file to which Condor directs the stdout of the job.
Definition: pipeline.py:390
def set_log_file(self, path)
Set the Condor log file.
Definition: pipeline.py:351
def get_executable(self)
Return the name of the executable for this job.
Definition: pipeline.py:109
def add_output_file(self, filename)
Add filename as a output file for this DAG node.
Definition: pipeline.py:213
def get_opts(self)
Return the dictionary of opts for the job.
Definition: pipeline.py:306
def get_condor_cmds(self)
Return the dictionary of condor keywords to add to the job.
Definition: pipeline.py:196
def set_notification(self, value)
Set the email address to send notification to.
Definition: pipeline.py:344
def get_opt(self, opt)
Returns the value associated with the given command line option.
Definition: pipeline.py:284
def set_stdout_file(self, path)
Set the file to which Condor directs the stdout of the job.
Definition: pipeline.py:384
def set_executable_installed(self, installed)
If executable installed is true, then no copying of the executable is done.
Definition: pipeline.py:176
def get_stderr_file(self)
Get the file to which Condor directs the stderr of the job.
Definition: pipeline.py:377
def get_stdin_file(self)
Get the file from which Condor directs the stdin of the job.
Definition: pipeline.py:364
def set_universe(self, universe)
Set the condor universe for the job to run in.
Definition: pipeline.py:128
def __init__(self, universe, executable, queue)
Definition: pipeline.py:82
def set_grid_server(self, grid_server)
Set the grid server on which to run the job.
Definition: pipeline.py:154
def add_checkpoint_file(self, filename)
Add filename as a checkpoint file for this DAG job.
Definition: pipeline.py:220
def add_condor_cmd(self, cmd, value)
Add a Condor command to the submit file (e.g.
Definition: pipeline.py:190
def add_ini_opts(self, cp, section)
Parse command line options from a given section in an ini file and pass to the executable.
Definition: pipeline.py:335
def add_file_opt(self, opt, filename)
Add a command line option to the executable.
Definition: pipeline.py:298
def get_args(self)
Return the list of arguments that are to be passed to the executable.
Definition: pipeline.py:264
def get_short_opts(self)
Return the dictionary of short options for the job.
Definition: pipeline.py:326
def get_output_files(self)
Return list of output files for this DAG node.
Definition: pipeline.py:233
def set_stdin_file(self, path)
Set the file from which Condor directs the stdin of the job.
Definition: pipeline.py:358
def get_checkpoint_files(self)
Return a list of checkpoint files for this DAG node.
Definition: pipeline.py:239
def write_sub_file(self)
Write a submit file for this Condor job.
Definition: pipeline.py:411
def set_grid_scheduler(self, grid_scheduler)
Set the grid scheduler.
Definition: pipeline.py:167
def get_sub_file(self)
Get the name of the file which the Condor submit file will be written to when write_sub_file() is cal...
Definition: pipeline.py:405
def get_grid_type(self)
Return the grid type of the job.
Definition: pipeline.py:134
def set_grid_type(self, grid_type)
Set the type of grid resource for the job.
Definition: pipeline.py:141
def get_executable_installed(self)
return whether or not the executable is installed
Definition: pipeline.py:182
def set_sub_file(self, path)
Set the name of the file to write the Condor submit file to when write_sub_file() is called.
Definition: pipeline.py:398
def get_grid_server(self)
Return the grid server on which the job will run.
Definition: pipeline.py:147
def add_input_file(self, filename)
Add filename as a necessary input file for this DAG node.
Definition: pipeline.py:204
def add_arg(self, arg)
Add an argument to the executable.
Definition: pipeline.py:247
def add_file_arg(self, filename)
Add a file argument to the executable.
Definition: pipeline.py:256
def set_executable(self, executable)
Set the name of the executable for this job.
Definition: pipeline.py:115
The standard SafeConfigParser no longer supports deepcopy() as of python 2.7 (see http://bugs....
Definition: pipeline.py:3129
An LSCdataFind job used to locate data.
Definition: pipeline.py:2686
def __init__(self, cache_dir, log_dir, config_file, lsync_cache_file=None, lsync_type_regex=None)
Definition: pipeline.py:2694
def get_cache_dir(self)
returns the directroy that the cache files are written to.
Definition: pipeline.py:2728
def get_config_file(self)
return the configuration file object
Definition: pipeline.py:2734
A DataFindNode runs an instance of LSCdataFind in a Condor DAG.
Definition: pipeline.py:2744
def set_type(self, type)
sets the frame type that we are querying
Definition: pipeline.py:2828
def get_start(self)
Return the start time of the datafind query.
Definition: pipeline.py:2789
def get_output(self)
Return the output file, i.e.
Definition: pipeline.py:2846
def get_end(self)
Return the start time of the datafind query.
Definition: pipeline.py:2804
def get_observatory(self)
Return the start time of the datafind query.
Definition: pipeline.py:2822
def set_observatory(self, obs)
Set the IFO to retrieve data for.
Definition: pipeline.py:2814
def get_type(self)
gets the frame type that we are querying
Definition: pipeline.py:2836
def set_start(self, time, pad=None)
Set the start time of the datafind query.
Definition: pipeline.py:2778
def set_end(self, time)
Set the end time of the datafind query.
Definition: pipeline.py:2796
A ligolw_add job can be used to concatenate several ligo lw files.
Definition: pipeline.py:2853
def __init__(self, log_dir, cp)
cp = ConfigParser object from which options are read.
Definition: pipeline.py:2857
Runs an instance of ligolw_add in a Condor DAG.
Definition: pipeline.py:2876
def __init__(self, job)
Definition: pipeline.py:2880
A ligolw_cut job can be used to remove parts of a ligo lw file.
Definition: pipeline.py:2888
def __init__(self, log_dir, cp)
cp = ConfigParser object from which options are read.
Definition: pipeline.py:2892
Runs an instance of ligolw_cut in a Condor DAG.
Definition: pipeline.py:2908
def __init__(self, job)
Definition: pipeline.py:2912
A LigolwSqlite job.
Definition: pipeline.py:3050
def set_replace(self)
Sets the –replace option.
Definition: pipeline.py:3063
A LigolwSqlite node.
Definition: pipeline.py:3070
def get_output(self)
Override standard get_output to return xml-file if xml-file is specified.
Definition: pipeline.py:3115
def get_input_cache(self)
Gets input cache.
Definition: pipeline.py:3091
def set_xml_output(self, xml_file)
Tell ligolw_sqlite to dump the contents of the database to a file.
Definition: pipeline.py:3105
def set_input_cache(self, input_cache)
Sets input cache.
Definition: pipeline.py:3084
def set_xml_input(self, xml_file)
Sets xml input file instead of cache.
Definition: pipeline.py:3098
def group(self, lst, n)
Group an iterable into an n-tuples iterable.
Definition: pipeline.py:2544
def __init__(self, path)
Definition: pipeline.py:2527
def parse(self, type_regex=None)
Each line of the frame cache file is like the following:
Definition: pipeline.py:2573
def get_lfns(self, site, frameType, gpsStart, gpsEnd)
Definition: pipeline.py:2628
A Noop Job does nothing.
Definition: pipeline.py:2920
def __init__(self, log_dir, cp)
cp = ConfigParser object from which options are read.
Definition: pipeline.py:2924
Run an noop job in a Condor DAG.
Definition: pipeline.py:2943
def __init__(self, job)
Definition: pipeline.py:2947
An object that can contain all the science data used in an analysis.
Definition: pipeline.py:2019
def make_chunks(self, length, overlap=0, play=0, sl=0, excl_play=0, pad_data=0)
Divide each ScienceSegment contained in this object into AnalysisChunks.
Definition: pipeline.py:2112
def __len__(self)
Returns the number of ScienceSegments associated with the ScienceData.
Definition: pipeline.py:2037
def make_optimised_chunks(self, min_length, max_length, pad_data=0)
Splits ScienceSegments up into chunks, of a given maximum length.
Definition: pipeline.py:2196
def coalesce(self)
Coalesces any adjacent ScienceSegments.
Definition: pipeline.py:2369
def intersection(self, other)
Replaces the ScienceSegments contained in this instance of ScienceData with the intersection of those...
Definition: pipeline.py:2231
def read(self, filename, min_length, slide_sec=0, buffer=0)
Parse the science segments from the segwizard output contained in file.
Definition: pipeline.py:2056
def __getitem__(self, i)
Allows direct access to or iteration over the ScienceSegments associated with the ScienceData.
Definition: pipeline.py:2028
def split(self, dt)
Split the segments in the list is subsegments at least as long as dt.
Definition: pipeline.py:2504
def play(self)
Keep only times in ScienceSegments which are in the playground.
Definition: pipeline.py:2440
def union(self, other)
Replaces the ScienceSegments contained in this instance of ScienceData with the union of those in the...
Definition: pipeline.py:2287
def intersect_3(self, second, third)
Intersection routine for three inputs.
Definition: pipeline.py:2485
def make_short_chunks_from_unused(self, min_length, overlap=0, play=0, sl=0, excl_play=0)
Create a chunk that uses up the unused data in the science segment.
Definition: pipeline.py:2176
def make_chunks_from_unused(self, length, trig_overlap, play=0, min_length=0, sl=0, excl_play=0, pad_data=0)
Create an extra chunk that uses up the unused data in the science segment.
Definition: pipeline.py:2135
def tama_read(self, filename)
Parse the science segments from a tama list of locked segments contained in file.
Definition: pipeline.py:2088
def append_from_tuple(self, seg_tuple)
Definition: pipeline.py:2079
def intersect_4(self, second, third, fourth)
Intersection routine for four inputs.
Definition: pipeline.py:2494
def invert(self)
Inverts the ScienceSegments in the class (i.e.
Definition: pipeline.py:2410
A ScienceSegment is a period of time where the experimenters determine that the inteferometer is in a...
Definition: pipeline.py:1849
def set_df_node(self, df_node)
Set the DataFind node associated with this ScienceSegment to df_node.
Definition: pipeline.py:2004
def __init__(self, segment)
Definition: pipeline.py:1854
def set_unused(self, unused)
Set the length of data in the science segment not used to make chunks.
Definition: pipeline.py:1957
def __cmp__(self, other)
ScienceSegments are compared by the GPS start time of the segment.
Definition: pipeline.py:1885
def make_chunks(self, length=0, overlap=0, play=0, sl=0, excl_play=0, pad_data=0)
Divides the science segment into chunks of length seconds overlapped by overlap seconds.
Definition: pipeline.py:1907
def unused(self)
Returns the length of data in the science segment not used to make chunks.
Definition: pipeline.py:1951
def end(self)
Returns the GPS end time of this ScienceSegment.
Definition: pipeline.py:1975
def set_start(self, t)
Override the GPS start time (and set the duration) of this ScienceSegment.
Definition: pipeline.py:1982
def set_end(self, t)
Override the GPS end time (and set the duration) of this ScienceSegment.
Definition: pipeline.py:1990
def start(self)
Returns the GPS start time of this ScienceSegment.
Definition: pipeline.py:1969
def id(self)
Returns the ID of this ScienceSegment.
Definition: pipeline.py:1963
def __len__(self)
Returns the number of AnalysisChunks contained in this ScienceSegment.
Definition: pipeline.py:1875
def __getitem__(self, i)
Allows iteration over and direct access to the AnalysisChunks contained in this ScienceSegment.
Definition: pipeline.py:1868
def dur(self)
Returns the length (duration) in seconds of this ScienceSegment.
Definition: pipeline.py:1997
def add_chunk(self, start, end, trig_start=0, trig_end=0)
Add an AnalysisChunk to the list associated with this ScienceSegment.
Definition: pipeline.py:1945
def get_df_node(self)
Returns the DataFind node for this ScienceSegment.
Definition: pipeline.py:2010
def __init__(self, args=None)
Definition: pipeline.py:68
A cbc sqlite job adds to CondorDAGJob and AnalysisJob features common to jobs which read or write to ...
Definition: pipeline.py:2964
def get_exec_name(self)
Get the exec_name name.
Definition: pipeline.py:2998
def __init__(self, cp, sections, exec_name)
Definition: pipeline.py:2970
def set_exec_name(self, exec_name)
Set the exec_name name.
Definition: pipeline.py:2992
A cbc sqlite node adds to the standard AnalysisNode features common to nodes which read or write to a...
Definition: pipeline.py:3007
def __init__(self, job)
Definition: pipeline.py:3011
def get_database(self)
Gets database option.
Definition: pipeline.py:3042
def get_tmp_space(self)
Gets tmp-space path.
Definition: pipeline.py:3028
def set_database(self, database)
Sets database option.
Definition: pipeline.py:3035
def set_tmp_space(self, tmp_space)
Sets temp-space path.
Definition: pipeline.py:3021