LALPulsar  6.1.0.1-c9a8ef6
lalpulsar_MakeSFTDAG.py
Go to the documentation of this file.
1 # Copyright (C) 2013, 2014, 2020--2022 Evan Goetz
2 # Copyright (C) 2011, 2021, 2022 Karl Wette
3 # Copyright (C) 2005, 2007 Gregory Mendell
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with with program; see the file COPYING. If not, write to the
17 # Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
18 # MA 02110-1301 USA
19 
20 """Creates DAGs to run jobs that generates SFTs"""
21 
22 import math
23 import argparse
24 import os
25 import re
26 
27 from lalpulsar import git_version
28 
29 __author__ = "Evan Goetz <evan.goetz@ligo.org>, Greg Mendell"
30 __version__ = git_version.id
31 __date__ = git_version.date
32 
33 
34 # REVISIONS:
35 # 12/02/05 gam; generate datafind.sub and MakeSFTs.sub as well as dag file in
36 # PWD, with log files based subLogPath and dag filename.
37 # 12/28/05 gam; Add option --make-gps-dirs, -D <num>, to make directory based
38 # on this many GPS digits.
39 # 12/28/05 gam; Add option --misc-desc, -X <string> giving misc. part of the
40 # SFT description field in the filename.
41 # 12/28/05 gam; Add options --start-freq -F and --band -B options to enter
42 # these.
43 # 12/28/05 gam; Add in --window-type, -w options; 0 = no window, 1 = default =
44 # Matlab style Tukey window; 2 = make_sfts.c Tukey window; 3 =
45 # Hann window.
46 # 12/28/05 gam; Add option --overlap-fraction -P (for use with windows; e.g.,
47 # use -P 0.5 with -w 3 Hann windows; default is 0.0)
48 # 12/28/05 gam; Add --sft-version, -v option to select output SFT version (1 =
49 # default is version 1 SFTs; 2 = version 2 SFTs.
50 # 12/28/05 gam; Add --comment-field, -c option, for comment for version 2 SFTs.
51 # 12/28/05 gam; Remove sample rate option
52 # 01/09/06 gam; Add -Z option; write SFT to .*.tmp file, then move to final
53 # file name.
54 # 01/14/07 gam; Add -u option to specify frame struct and type; add -i option
55 # to specify IFO name.
56 # 07/24/07 gam; Add in -q option to read in list of nodes on which to output
57 # SFTs, -Q option to give node path, and -R option for number of
58 # jobs per node.
59 # 04/XX/13 eag; Add -y option to synchronize the start times of SFTs.
60 # 07/24/14 eag; Change default to version 2 SFTs
61 # 12/2020 eag; Update script to conform to modern python3 and pep8
62 # 10/2020 kww; Pass args directly to writeToDag(), use Python f-strings
63 # 10/2022 kww; Deprecate options that have been removed from MakeSFTs
64 # 10/2022 kww; Parse window type as a string, parameter separated by colon
65 # 10/2022 kww; Merge -O and -o log path options to free up -O option
66 # 10/2022 kww; Implement public SFT file naming convention
67 # 11/2022 kww; -R command line option now used for --observing-revision
68 # instead of --output-jobs-per-node, which now uses -r
69 # 11/2022 kww; --datafind-path and --makesfts-path accept executable names
70 
71 
72 #
73 # FUNCTION THAT WRITE ONE JOB TO DAG FILE
74 #
75 def writeToDag(dagFID, nodeCount, startTimeThisNode, endTimeThisNode, site, args):
76  datafind = f"datafind_{nodeCount}"
77  MakeSFTs = f"MakeSFTs_{nodeCount}"
78  startTimeDatafind = startTimeThisNode - args.extra_datafind_time
79  endTimeDatafind = endTimeThisNode + args.extra_datafind_time
80  tagStringOut = f"{args.tag_string}_{nodeCount}"
81  if args.cache_file:
82  cacheFile = args.cache_file
83  else:
84  cacheFile = (
85  f"{args.cache_path}/{site}-{startTimeDatafind}-{endTimeDatafind}.cache"
86  )
87 
88  argList = []
89  argList.append(f"-O {args.observing_run}")
90  if args.observing_run > 0:
91  argList.append(f"-K {args.observing_kind}")
92  argList.append(f"-R {args.observing_revision}")
93  elif args.misc_desc:
94  argList.append(f"-X {args.misc_desc}")
95  argList.append(f"-f {args.filter_knee_freq}")
96  argList.append(f"-t {args.time_baseline}")
97  argList.append(f"-p {','.join(args.output_sft_path)}")
98  argList.append(f"-C {cacheFile}")
99  argList.append(f"-s {startTimeThisNode}")
100  argList.append(f"-e {endTimeThisNode}")
101  argList.append(f"-N {','.join(args.channel_name)}")
102  argList.append(f"-F {args.start_freq}")
103  argList.append(f"-B {args.band}")
104  if args.comment_field:
105  argList.append(f"-c {args.comment_field}")
106  if args.window_type:
107  if ":" in args.window_type:
108  window_type, window_param = args.window_type.split(":")
109  argList.append(f"-w {window_type} -r {window_param}")
110  else:
111  argList.append(f"-w {args.window_type}")
112  if args.overlap_fraction:
113  argList.append(f"-P {args.overlap_fraction}")
114  if args.allow_skipping:
115  argList.append("--allow-skipping TRUE")
116  argStr = " ".join(argList)
117 
118  # gw_data_find job
119  if not args.cache_file:
120  dagFID.write(
121  f"JOB {datafind} {os.path.join(os.path.dirname(dagFID.name), 'datafind.sub')}\n"
122  )
123  dagFID.write(f"RETRY {datafind} 1\n")
124  dagFID.write(
125  f'VARS {datafind} gpsstarttime="{startTimeDatafind}" gpsendtime="{endTimeDatafind}" observatory="{site}" inputdatatype="{args.input_data_type}" tagstring="{tagStringOut}"\n'
126  )
127 
128  # MakeSFT job
129  dagFID.write(
130  f"JOB {MakeSFTs} {os.path.join(os.path.dirname(dagFID.name), 'MakeSFTs.sub')}\n"
131  )
132  dagFID.write(f"RETRY {MakeSFTs} 1\n")
133  dagFID.write(f'VARS {MakeSFTs} argList="{argStr}" tagstring="{tagStringOut}"\n')
134  if not args.cache_file:
135  dagFID.write(f"PARENT {datafind} CHILD {MakeSFTs}\n")
136 
137 
138 #
139 # MAIN CODE START HERE
140 #
141 
142 parser = argparse.ArgumentParser(
143  description="This script creates datafind.sub, MakeSFTs.sub, and a dag \
144  file that generates SFTs based on the options given.",
145  fromfile_prefix_chars="@",
146 )
147 parser.add_argument(
148  "-O",
149  "--observing-run",
150  required=True,
151  type=int,
152  help="For public SFTs, observing run data the SFTs are generated from, or \
153  (in the case of mock data challenge data) the observing \
154  run on which the data is most closely based",
155 )
156 parser.add_argument(
157  "-K",
158  "--observing-kind",
159  type=str,
160  choices=["RUN", "AUX", "SIM", "DEV"],
161  help='For public SFTs, one of: "RUN" for production SFTs of h(t) channels; \
162  "AUX" for SFTs of non-h(t) channels; \
163  "SIM" for mock data challenge or other simulated data; or \
164  "DEV" for development/testing purposes',
165 )
166 parser.add_argument(
167  "-R",
168  "--observing-revision",
169  type=int,
170  help="For public SFTs: revision number starts at 1, and should be incremented once \
171  SFTs have been widely distributed across clusters, advertised \
172  as being ready for use, etc. For example, if mistakes are found \
173  in the initial SFT production run after they have been published, \
174  regenerated SFTs should have a revision number of at least 2",
175 )
176 parser.add_argument(
177  "-X",
178  "--misc-desc",
179  type=str,
180  help="For private SFTs, miscellaneous part of the SFT \
181  description field in the filename",
182 )
183 parser.add_argument(
184  "-a",
185  "--analysis-start-time",
186  type=int,
187  help="GPS start time of data from which to generate \
188  SFTs (optional and unused if a segment file is given)",
189 )
190 parser.add_argument(
191  "-b",
192  "--analysis-end-time",
193  type=int,
194  help="GPS end time of data from which to generate SFTs \
195  (optional and unused if a segment file is given)",
196 )
197 parser.add_argument(
198  "-f",
199  "--dag-file",
200  required=True,
201  type=str,
202  help="filename for .dag file (should end in .dag)",
203 )
204 parser.add_argument(
205  "-G",
206  "--tag-string",
207  required=True,
208  type=str,
209  help="tag string used in names of various files unique to \
210  jobs that will run under the DAG",
211 )
212 parser.add_argument(
213  "-d",
214  "--input-data-type",
215  required=True,
216  type=str,
217  help="input data type for use with the gw_data_find --type \
218  option",
219 )
220 parser.add_argument(
221  "-x",
222  "--extra-datafind-time",
223  type=int,
224  default=0,
225  help="extra time to subtract/add from/to start/end time \
226  arguments of gw_data_find",
227 )
228 parser.add_argument(
229  "-M",
230  "--datafind-match",
231  type=str,
232  help="string to use with the gw_data_find --match option",
233 )
234 parser.add_argument(
235  "-y",
236  "--synchronize-start",
237  action="store_true",
238  help="synchronize the start times of the SFTs so that the \
239  start times are synchronized when there are gaps in the \
240  data",
241 )
242 parser.add_argument(
243  "-k",
244  "--filter-knee-freq",
245  required=True,
246  type=int,
247  help="high pass filter knee frequency used on time domain \
248  data before generating SFTs",
249 )
250 parser.add_argument(
251  "-T",
252  "--time-baseline",
253  required=True,
254  type=int,
255  help="time baseline of SFTs (e.g., 60 or 1800 seconds)",
256 )
257 parser.add_argument(
258  "-p", "--output-sft-path", nargs="+", type=str, help="path to output SFTs"
259 )
260 parser.add_argument(
261  "-C",
262  "--cache-path",
263  type=str,
264  default="cache",
265  help="path to cache files that will be produced by \
266  gw_data_find (default is $PWD/cache; this directory is \
267  created if it does not exist and must agree with that \
268  given in .sub files)",
269 )
270 parser.add_argument(
271  "-e",
272  "--cache-file",
273  type=str,
274  help="path and filename to frame cache file to use instead \
275  of gw_data_find",
276 )
277 parser.add_argument(
278  "-o",
279  "--log-path",
280  type=str,
281  default="logs",
282  help="path to log, output, and error files (default \
283  is $PWD/logs; this directory is created if it does not \
284  exist and usually should be under a local file system)",
285 )
286 parser.add_argument(
287  "-N",
288  "--channel-name",
289  nargs="+",
290  type=str,
291  help="name of input time-domain channel to read from \
292  frames",
293 )
294 parser.add_argument(
295  "--allow-skipping",
296  action="store_true",
297  help="allow channels to be skipped if not in frames or too low sampling \
298  frequency",
299 )
300 parser.add_argument("-c", "--comment-field", type=str, help="comment for SFT header")
301 parser.add_argument(
302  "-F", "--start-freq", type=int, default=10, help="start frequency of the SFTs"
303 )
304 parser.add_argument(
305  "-B", "--band", type=int, default=1990, help="frequency band of the SFTs"
306 )
307 parser.add_argument(
308  "-w",
309  "--window-type",
310  type=str,
311  help='type of windowing of time-domain to do \
312  before generating SFTs, e.g. "rectangular", \
313  "hann", "tukey:<beta in [0,1], required>"; \
314  if unspecified use lalpulsar_MakeSFTs defaults',
315 )
316 parser.add_argument(
317  "-P",
318  "--overlap-fraction",
319  type=float,
320  default=0,
321  help="overlap fraction (for use with windows; e.g., use \
322  --overlap-fraction 0.5 with --window-type hann windows)",
323 )
324 parser.add_argument(
325  "-m",
326  "--max-num-per-node",
327  type=int,
328  default=1,
329  help="maximum number of SFTs to generate on one node",
330 )
331 parser.add_argument(
332  "-L",
333  "--max-length-all-jobs",
334  type=int,
335  help="maximum total amount of data to process, in seconds \
336  (optional and unused if a segment file is given)",
337 )
338 parser.add_argument(
339  "-g",
340  "--segment-file",
341  type=str,
342  help="alternative file with segments to use, rather than \
343  the input times",
344 )
345 parser.add_argument(
346  "-l",
347  "--min-seg-length",
348  type=int,
349  default=0,
350  help="minimum length segments to process in seconds (used \
351  only if a segment file is given)",
352 )
353 parser.add_argument(
354  "-q",
355  "--list-of-nodes",
356  type=str,
357  help="file with list of nodes on which to output SFTs",
358 )
359 parser.add_argument(
360  "-Q",
361  "--node-path",
362  type=str,
363  help="path to nodes to output SFTs; the node name is \
364  appended to this path, followed by path given by the -p \
365  option; for example, if -q point to file with the list \
366  node1 node2 ... and the -Q /data/ -p /frames/S5/sfts/LHO \
367  options are given, the first output file will go into \
368  /data/node1/frames/S5/sfts/LHO; the next node in the list \
369  is used in constructing the path when the number of jobs \
370  given by the -r option reached, and so on",
371 )
372 parser.add_argument(
373  "-r",
374  "--output-jobs-per-node",
375  type=int,
376  default=0,
377  help="number of jobs to output per node in the list of \
378  nodes given with the -q option",
379 )
380 parser.add_argument(
381  "-j",
382  "--datafind-path",
383  type=str,
384  help="string specifying the gw_data_find executable, \
385  or a path to it; if not set, will use \
386  LSC_DATAFIND_PATH env variable or system default (in \
387  that order)",
388 )
389 parser.add_argument(
390  "-J",
391  "--makesfts-path",
392  type=str,
393  help="string specifying the lalpulsar_MakeSFTs executable, \
394  or a path to it; if not set, will use \
395  MAKESFTS_PATH env variable or system default (in that \
396  order)",
397 )
398 parser.add_argument(
399  "-Y",
400  "--request-memory",
401  type=int,
402  default=2048,
403  help="memory allocation in MB to request from condor for \
404  lalpulsar_MakeSFTs step",
405 )
406 parser.add_argument(
407  "-s",
408  "--request-disk",
409  type=int,
410  default=1024,
411  help="disk space allocation in MB to request from condor \
412  for lalpulsar_MakeSFTs step",
413 )
414 parser.add_argument(
415  "-A",
416  "--accounting-group",
417  required=True,
418  type=str,
419  help="Condor tag for the production of SFTs",
420 )
421 parser.add_argument(
422  "-U",
423  "--accounting-group-user",
424  required=True,
425  type=str,
426  help="albert.einstein username (do not add @LIGO.ORG)",
427 )
428 
429 
430 ##### DEPRECATED OPTIONS #####
431 class DeprecateAction(argparse.Action):
432  def __call__(self, parser, namespace, values, option_string=None):
433  parser.error(
434  f"Argument {self.option_strings} has been deprecated in lalpulsar_MakeSFTs"
435  )
436 
437 
438 parser.add_argument(
439  "-u",
440  "--frame-struct-type",
441  nargs=0,
442  action=DeprecateAction,
443  help="DEPRECATED. No longer required; \
444  the frame channel type is determined automatically",
445 )
446 parser.add_argument(
447  "-H",
448  "--use-hot",
449  nargs=0,
450  action=DeprecateAction,
451  help="DEPRECATED. No longer required; \
452  the frame channel type is determined automatically",
453 )
454 parser.add_argument(
455  "-i",
456  "--ifo",
457  nargs=0,
458  action=DeprecateAction,
459  help="DEPRECATED. No longer required; \
460  the detector prefix is deduced from the channel name",
461 )
462 parser.add_argument(
463  "-D",
464  "--make-gps-dirs",
465  nargs=0,
466  action=DeprecateAction,
467  help="DEPRECATED. No longer supported",
468 )
469 parser.add_argument(
470  "-Z",
471  "--make-tmp-file",
472  nargs=0,
473  action=DeprecateAction,
474  help="DEPRECATED. Default behaviour",
475 )
476 parser.add_argument(
477  "-v",
478  "--sft-version",
479  nargs=0,
480  action=DeprecateAction,
481  help="DEPRECATED. No longer supported",
482 )
483 parser.add_argument(
484  "-S",
485  "--use-single",
486  nargs=0,
487  action=DeprecateAction,
488  help="DEPRECATED. No longer supported",
489 )
490 
491 args = parser.parse_args()
492 
493 # Some basic argument value checking
494 if args.observing_run < 0:
495  raise argparse.error("--observing-run must be >= 0")
496 
497 if args.observing_run > 0 and not args.observing_kind:
498  raise argparse.error("--observing-run requires --observing-kind")
499 
500 if args.observing_run > 0 and not args.observing_revision:
501  raise argparse.error("--observing-run requires --observing-revision")
502 
503 if args.observing_revision and args.observing_revision <= 0:
504  raise argparse.error("--observing-revision must be > 0")
505 
506 if args.observing_run > 0 and args.misc_desc:
507  raise argparse.error(
508  f"--observing-run={args.observing_run} incompatible with --misc-desc"
509  )
510 
511 if args.misc_desc and not re.compile(r"^[A-Za-z0-9]+$").match(args.misc_desc):
512  raise argparse.error("--misc-desc may only contain A-Z, a-z, 0-9 characters")
513 
514 if args.extra_datafind_time < 0:
515  raise argparse.error("--extra-datafind-time must be >= 0")
516 
517 if args.filter_knee_freq < 0:
518  raise argparse.error("--filter-knee-freq must be >= 0")
519 
520 if args.time_baseline <= 0:
521  raise argparse.error("--time-baseline must be > 0")
522 
523 if args.overlap_fraction < 0.0 or args.overlap_fraction >= 1.0:
524  raise argparse.error("--overlap-fraction must be in the range [0,1)")
525 
526 if args.start_freq < 0.0 or args.start_freq >= 7192.0:
527  raise argparse.error("--start-freq must be in the range [0,7192)")
528 
529 if args.band <= 0 or args.band >= 8192.0:
530  raise argparse.error("--band must be in the range (0,8192)")
531 
532 if args.start_freq + args.band >= 8192.0:
533  raise argparse.error("--start-freq + --band must be < 8192")
534 
535 if args.max_num_per_node <= 0:
536  raise argparse.error("--max-num-per-node must be > 0")
537 
538 if (
539  len(args.channel_name) != len(args.output_sft_path)
540  and len(args.output_sft_path) != 1
541 ):
542  raise argparse.error(
543  "--channel-name and --output-sft-path must be the "
544  "same length or --output-sft-path must be length of 1"
545  )
546 
547 # Set the data find executable and lalpulsar_MakeSFTs executable
548 dataFindExe = "gw_data_find"
549 if args.datafind_path:
550  if os.path.isfile(args.datafind_path):
551  dataFindExe = args.datafind_path
552  else:
553  dataFindExe = os.path.join(args.datafind_path, dataFindExe)
554 elif "LSC_DATAFIND_PATH" in os.environ:
555  dataFindExe = os.path.join("$ENV(LSC_DATAFIND_PATH)", dataFindExe)
556 else:
557  dataFindExe = os.path.join("/usr/bin", dataFindExe)
558 
559 makeSFTsExe = "lalpulsar_MakeSFTs"
560 if args.makesfts_path:
561  if os.path.isfile(args.makesfts_path):
562  makeSFTsExe = args.makesfts_path
563  else:
564  makeSFTsExe = os.path.join(args.makesfts_path, makeSFTsExe)
565 elif "MAKESFTS_PATH" in os.environ:
566  makeSFTsExe = os.path.join("$ENV(MAKESFTS_PATH)", makeSFTsExe)
567 else:
568  makeSFTsExe = os.path.join("@LALSUITE_BINDIR@", makeSFTsExe)
569 
570 # try and make a directory to store the cache files and job logs
571 try:
572  os.mkdir(args.log_path)
573 except:
574  pass
575 if not args.cache_file:
576  try:
577  os.mkdir(args.cache_path)
578  except:
579  pass
580 
581 # Check if list of nodes is given, on which to output SFTs.
582 nodeList = []
583 useNodeList = False
584 savedOutputSFTPath = None
585 if args.list_of_nodes is not None:
586  if args.node_path is None:
587  raise argparse.error("Node file list given, but no node path specified")
588 
589  if args.output_jobs_per_node < 1:
590  raise argparse.error(
591  "Node file list given, but invalid output jobs per node specified"
592  )
593 
594  with open(args.list_of_nodes) as fp_nodelist:
595  for idx, line in enumerate(fp_nodelist):
596  splitLine = line.split()
597  nodeList.append(splitLine[0])
598  if len(nodeList) < 1:
599  raise ValueError(
600  "No nodes found in node list file: {}".format(args.list_of_nodes)
601  )
602 
603  # Set flag to use list of nodes in constructing output files
604  useNodeList = True
605  savedOutputSFTPath = args.output_sft_path
606 # END if (args.list_of_nodes != None)
607 
608 # Check if segment file was given, else set up one segment from the command line
609 segList = []
610 adjustSegExtraTime = False
611 if args.segment_file is not None:
612  if args.min_seg_length < 0:
613  raise argparse.error("--min-seg-length must be >= 0")
614 
615  # the next flag causes extra time that cannot be processes to be trimmed
616  # from the start and end of a segment
617  adjustSegExtraTime = True
618 
619  with open(args.segment_file) as fp_segfile:
620  for idx, line in enumerate(fp_segfile):
621  splitLine = line.split()
622  oneSeg = []
623  oneSeg.append(int(splitLine[0]))
624  oneSeg.append(int(splitLine[1]))
625  if (oneSeg[1] - oneSeg[0]) >= args.min_seg_length:
626  segList.append(oneSeg)
627 
628  if len(segList) < 1:
629  raise ValueError(
630  "No segments found in segment file: {}".format(args.segment_file)
631  )
632 else:
633  if args.analysis_start_time is None:
634  raise argparse.error(
635  "--analysis-start-time must be specified if no segment file is \
636  given"
637  )
638 
639  if args.analysis_end_time is None:
640  raise argparse.error(
641  "--analysis-start-time must be specified if no segment file is \
642  given"
643  )
644 
645  if args.max_length_all_jobs is None:
646  raise argparse.error(
647  "--max-length-all-jobs must be specified if no segment file is \
648  given"
649  )
650 
651  # Make sure not to exceed maximum allow analysis
652  if args.analysis_end_time > (args.analysis_start_time + args.max_length_all_jobs):
653  args.analysis_end_time = args.analysis_start_time + args.max_length_all_jobs
654 
655  oneSeg = []
656  oneSeg.append(args.analysis_start_time)
657  oneSeg.append(args.analysis_end_time)
658  segList.append(oneSeg)
659 # END if (args.segment_file != None)
660 
661 # Get the IFO site, which is the first letter of the channel name.
662 site = args.channel_name[0][0]
663 
664 # initialize count of nodes
665 nodeCount = 0
666 
667 # Create .sub files
668 path_to_dag_file = os.path.dirname(args.dag_file)
669 dag_filename = os.path.basename(args.dag_file)
670 datafind_sub = os.path.join(path_to_dag_file, "datafind.sub")
671 makesfts_sub = os.path.join(path_to_dag_file, "MakeSFTs.sub")
672 
673 # create datafind.sub
674 if not args.cache_file:
675  with open(datafind_sub, "w") as datafindFID:
676  datafindLogFile = f"{args.log_path}/datafind_{dag_filename}.log"
677  datafindFID.write("universe = vanilla\n")
678  datafindFID.write(f"executable = {dataFindExe}\n")
679  datafindFID.write("arguments = ")
680  datafindFID.write("--observatory $(observatory) --url-type file ")
681  datafindFID.write("--gps-start-time $(gpsstarttime) ")
682  datafindFID.write("--gps-end-time $(gpsendtime) --lal-cache --gaps ")
683  datafindFID.write(f"--type $(inputdatatype)")
684  if args.datafind_match:
685  datafindFID.write(f" --match {args.datafind_match}\n")
686  else:
687  datafindFID.write("\n")
688  datafindFID.write(
689  "getenv = *DATAFIND*, KRB5*, X509*, BEARER_TOKEN*, SCITOKEN*\n"
690  )
691  datafindFID.write("request_disk = 5MB\n")
692  datafindFID.write("request_memory = 2000MB\n")
693  datafindFID.write(f"accounting_group = {args.accounting_group}\n")
694  datafindFID.write(f"accounting_group_user = {args.accounting_group_user}\n")
695  datafindFID.write(f"log = {datafindLogFile}\n")
696  datafindFID.write(f"error = {args.log_path}/datafind_$(tagstring).err\n")
697  datafindFID.write(f"output = {args.cache_path}/")
698  datafindFID.write("$(observatory)-$(gpsstarttime)-$(gpsendtime).cache\n")
699  datafindFID.write("notification = never\n")
700  datafindFID.write("queue 1\n")
701 
702 # create MakeSFTs.sub
703 with open(makesfts_sub, "w") as MakeSFTsFID:
704  MakeSFTsLogFile = "{}/MakeSFTs_{}.log".format(args.log_path, dag_filename)
705  MakeSFTsFID.write("universe = vanilla\n")
706  MakeSFTsFID.write("executable = {}\n".format(makeSFTsExe))
707  MakeSFTsFID.write("arguments = $(argList)\n")
708  MakeSFTsFID.write("accounting_group = {}\n".format(args.accounting_group))
709  MakeSFTsFID.write("accounting_group_user = {}\n".format(args.accounting_group_user))
710  MakeSFTsFID.write("log = {}\n".format(MakeSFTsLogFile))
711  MakeSFTsFID.write("error = {}/MakeSFTs_$(tagstring).err\n".format(args.log_path))
712  MakeSFTsFID.write("output = {}/MakeSFTs_$(tagstring).out\n".format(args.log_path))
713  MakeSFTsFID.write("notification = never\n")
714  MakeSFTsFID.write(f"request_memory = {args.request_memory}MB\n")
715  MakeSFTsFID.write(f"request_disk = {args.request_disk}MB\n")
716  MakeSFTsFID.write("RequestCpus = 1\n")
717  MakeSFTsFID.write("queue 1\n")
718 
719 # create the DAG file with the jobs to run
720 with open(args.dag_file, "w") as dagFID:
721  startTimeAllNodes = None
722  firstSFTstartTime = 0 # need this for the synchronized start option
723  nodeListIndex = 0
724 
725  # Loop over the segment list to generate the SFTs for each segment
726  for seg in segList:
727  # Each segment in the segList runs on one or more nodes;
728  # initialize the number SFTs produced by the current node:
729  numThisNode = 0
730  numThisSeg = 0
731 
732  # Case 1: a segment file was given but the SFTs do not need their
733  # start times to be synchronized
734  if adjustSegExtraTime and not args.synchronize_start:
735  segStartTime = seg[0]
736  segEndTime = seg[1]
737 
738  # First we figure out how much extra time is in the segment so that
739  # SFTs are fit within the segment:
740  # |..<SFT><SFT><SFT>..|
741  # where the .. represent the extra time in the segment
742  # The amount of extra time in a segment is given as the remainder
743  # of (total segment time) / (SFT time baseline)
744  segExtraTime = (segEndTime - segStartTime) % args.time_baseline
745 
746  # If there is overlap of SFTs requested, then we compute the extra
747  # time as:
748  # the remainder of (end - start - Tsft) / (non-overlap time)
749  # provided there was at least one SFT that is in the segment
750  if args.overlap_fraction != 0.0:
751  if (segEndTime - segStartTime) > args.time_baseline:
752  segExtraTime = (
753  segEndTime - segStartTime - args.time_baseline
754  ) % int((1.0 - args.overlap_fraction) * args.time_baseline)
755 
756  # We'll add half the extra time to the start of the SFTs to be
757  # created in this segment and half at the end
758  segExtraStart = int(segExtraTime / 2)
759  segExtraEnd = segExtraTime - segExtraStart
760  args.analysis_start_time = segStartTime + segExtraStart
761 
762  # This shift may have pushed past the end time of the segment. In
763  # that case, just fix the start time to the end time of the segment
764  if args.analysis_start_time > segEndTime:
765  args.analysis_start_time = segEndTime
766 
767  # shifting the end time by the other portion of the extra time
768  # amount ...
769  args.analysis_end_time = segEndTime - segExtraEnd
770 
771  # Again, this shift could have pushed the end time beyond the start
772  # of the segment, so just fix the end time to the segment start
773  if args.analysis_end_time < segStartTime:
774  args.analysis_end_time = segStartTime
775 
776  # Case 2: SFTs need a synchronized start. This is a special case for
777  # methods like TwoSpect, where signal periodicity spacing must be
778  # maintained
779  elif args.synchronize_start:
780  segStartTime = seg[0]
781  segEndTime = seg[1]
782 
783  # If we haven't set the first SFT start time, then set it equal to
784  # the start time of the first segment
785  if firstSFTstartTime == 0:
786  firstSFTstartTime = segStartTime
787 
788  # This is a tricky bit of math to set the start time based on when
789  # the first SFT start time of all the segments
790  args.analysis_start_time = (
791  int(
792  round(
793  math.ceil(
794  (segStartTime - firstSFTstartTime)
795  / ((1.0 - args.overlap_fraction) * args.time_baseline)
796  )
797  * (1.0 - args.overlap_fraction)
798  * args.time_baseline
799  )
800  )
801  + firstSFTstartTime
802  )
803 
804  # This shift may have pushed past the end time of the segment. In
805  # that case, just fix the start time to the end time of the segment
806  if args.analysis_start_time > segEndTime:
807  args.analysis_start_time = segEndTime
808 
809  # This is a tricky bit of math to set the end time based on when
810  # the first SFT start time of all the segments
811  args.analysis_end_time = (
812  int(
813  round(
814  math.floor(
815  (segEndTime - args.analysis_start_time - args.time_baseline)
816  / ((1.0 - args.overlap_fraction) * args.time_baseline)
817  )
818  * (1.0 - args.overlap_fraction)
819  * args.time_baseline
820  )
821  )
822  + args.time_baseline
823  + args.analysis_start_time
824  )
825 
826  # Again, this shift could have pushed the end time beyond the start
827  # of the segment, so just fix the end time to the segment start
828  if args.analysis_end_time < segStartTime:
829  args.analysis_end_time = segStartTime
830 
831  # If no segment file given and no synchronized starts, just set the
832  # start time and end time to the segment start and end
833  else:
834  args.analysis_start_time = seg[0]
835  args.analysis_end_time = seg[1]
836 
837  # Loop through the analysis time; make sure no more than
838  # args.max_num_per_node SFTs are produced by any one node
839  startTimeThisNode = args.analysis_start_time
840  endTimeThisNode = args.analysis_start_time
841  endTimeAllNodes = args.analysis_start_time
842  while endTimeAllNodes < args.analysis_end_time:
843  # increment endTimeAllNodes by the args.time_baseline until we get
844  # past the args.analysis_end_time
845  if args.overlap_fraction != 0.0:
846  # handle overlap
847  if numThisSeg == 0:
848  endTimeAllNodes = endTimeAllNodes + args.time_baseline
849  else:
850  endTimeAllNodes = endTimeAllNodes + int(
851  (1.0 - args.overlap_fraction) * args.time_baseline
852  )
853  else:
854  # default case, no overlap
855  endTimeAllNodes = endTimeAllNodes + args.time_baseline
856  if endTimeAllNodes <= args.analysis_end_time:
857  # increment the number of SFTs output from this node, and
858  # update the end time this node.
859  numThisNode = numThisNode + 1
860  numThisSeg = numThisSeg + 1
861  endTimeThisNode = endTimeAllNodes
862  if numThisNode < args.max_num_per_node:
863  continue
864  else:
865  # write jobs to dag for this node
866  nodeCount = nodeCount + 1
867 
868  if useNodeList:
869  args.output_sft_path = (
870  args.node_path
871  + nodeList[nodeListIndex]
872  + savedOutputSFTPath
873  )
874  if (nodeCount % args.output_jobs_per_node) == 0:
875  nodeListIndex = nodeListIndex + 1
876  # END if ((nodeCount % args.output_jobs_per_node) == 0L)
877  # END if (useNodeList)
878 
879  if nodeCount == 1:
880  startTimeAllNodes = startTimeThisNode
881  writeToDag(
882  dagFID,
883  nodeCount,
884  startTimeThisNode,
885  endTimeThisNode,
886  site,
887  args,
888  )
889  # Update for next node
890  numThisNode = 0
891  if args.overlap_fraction != 0.0:
892  # handle overlap
893  startTimeThisNode = endTimeThisNode - int(
894  (args.overlap_fraction) * args.time_baseline
895  )
896  else:
897  # default case, no overlap
898  startTimeThisNode = endTimeThisNode
899  else:
900  # we are at or past the args.analysis_end_time; output job for last
901  # node if needed.
902  if numThisNode > 0:
903  # write jobs to dag for this node
904  nodeCount = nodeCount + 1
905 
906  if useNodeList:
907  args.output_sft_path = (
908  args.node_path + nodeList[nodeListIndex] + savedOutputSFTPath
909  )
910  if (nodeCount % args.output_jobs_per_node) == 0:
911  nodeListIndex = nodeListIndex + 1
912  # END if ((nodeCount % args.output_jobs_per_node) == 0L)
913  # END if (useNodeList)
914 
915  if nodeCount == 1:
916  startTimeAllNodes = startTimeThisNode
917  writeToDag(
918  dagFID, nodeCount, startTimeThisNode, endTimeThisNode, site, args
919  )
920  # END while (endTimeAllNodes < args.analysis_end_time)
921  # END for seg in segList
922 # Close the DAG file
923 
924 # Update actual end time of the last job and print out the times all jobs will run on:
925 endTimeAllNodes = endTimeThisNode
926 
927 if startTimeAllNodes is None:
928  raise Exception("The startTimeAllNodes == none; the DAG file contains no jobs!")
929 
930 if endTimeAllNodes <= startTimeAllNodes:
931  raise Exception(
932  "The endTimeAllNodes <= startTimeAllNodes; the DAG file contains no jobs!"
933  )
934 
935 print(startTimeAllNodes, endTimeAllNodes)
def __call__(self, parser, namespace, values, option_string=None)
def writeToDag(dagFID, nodeCount, startTimeThisNode, endTimeThisNode, site, args)