Loading [MathJax]/extensions/TeX/AMSsymbols.js
LALPulsar 7.1.1.1-3a66518
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Modules Pages
lalpulsar_MakeSFTDAG.py
Go to the documentation of this file.
1##python
2# Copyright (C) 2013, 2014, 2020--2024 Evan Goetz
3# Copyright (C) 2011, 2021, 2022 Karl Wette
4# Copyright (C) 2005, 2007 Gregory Mendell
5#
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with with program; see the file COPYING. If not, write to the
18# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19# MA 02110-1301 USA
20
21## \file
22## \ingroup lalpulsar_bin_SFTTools
23"""Creates DAGs to run jobs that generates SFTs"""
24
25import math
26import argparse
27import os
28import re
29from pathlib import Path
30
31from lalpulsar import (
32 git_version,
33 SFTFilenameSpec,
34 FillSFTFilenameSpecStrings,
35 BuildSFTFilenameFromSpec,
36)
37
38__author__ = "Evan Goetz <evan.goetz@ligo.org>, Greg Mendell"
39__version__ = git_version.id
40__date__ = git_version.date
41
42
43# REVISIONS:
44# 12/02/05 gam; generate datafind.sub and MakeSFTs.sub as well as dag file in
45# PWD, with log files based subLogPath and dag filename.
46# 12/28/05 gam; Add option --make-gps-dirs, -D <num>, to make directory based
47# on this many GPS digits.
48# 12/28/05 gam; Add option --misc-desc, -X <string> giving misc. part of the
49# SFT description field in the filename.
50# 12/28/05 gam; Add options --start-freq -F and --band -B options to enter
51# these.
52# 12/28/05 gam; Add in --window-type, -w options; 0 = no window, 1 = default =
53# Matlab style Tukey window; 2 = make_sfts.c Tukey window; 3 =
54# Hann window.
55# 12/28/05 gam; Add option --overlap-fraction -P (for use with windows; e.g.,
56# use -P 0.5 with -w 3 Hann windows; default is 0.0)
57# 12/28/05 gam; Add --sft-version, -v option to select output SFT version (1 =
58# default is version 1 SFTs; 2 = version 2 SFTs.
59# 12/28/05 gam; Add --comment-field, -c option, for comment for version 2 SFTs.
60# 12/28/05 gam; Remove sample rate option
61# 01/09/06 gam; Add -Z option; write SFT to .*.tmp file, then move to final
62# file name.
63# 01/14/07 gam; Add -u option to specify frame struct and type; add -i option
64# to specify IFO name.
65# 07/24/07 gam; Add in -q option to read in list of nodes on which to output
66# SFTs, -Q option to give node path, and -R option for number of
67# jobs per node.
68# 04/XX/13 eag; Add -y option to synchronize the start times of SFTs.
69# 07/24/14 eag; Change default to version 2 SFTs
70# 12/2020 eag; Update script to conform to modern python3 and pep8
71# 10/2020 kww; Pass args directly to writeToDag(), use Python f-strings
72# 10/2022 kww; Deprecate options that have been removed from MakeSFTs
73# 10/2022 kww; Parse window type as a string, parameter separated by colon
74# 10/2022 kww; Merge -O and -o log path options to free up -O option
75# 10/2022 kww; Implement public SFT file naming convention
76# 11/2022 kww; -R command line option now used for --observing-revision
77# instead of --output-jobs-per-node, which now uses -r
78# 11/2022 kww; --datafind-path and --makesfts-path accept executable names
79# 03/2023 eag; Allow user to pass a frame cache file --cache-file
80# 04/2023 kww; Improve documentation of --window-type argument
81# 05/2023 eag; Add the --gaps flag to gw_data_find
82# 08/2023 eag; Allow for multiple channel names to be provided
83# 09/2023 eag; Modify use of environment variables
84# 01/2024 eag; Allow skipping of channels if not in frames or too low data
85# rate
86# 10/2024 eag; Modify workflow for version 3 SFTs and HTCondor file transfer
87# workflow
88# 12/2024 eag; Modify workflow to use lalpulsar_MoveSFTs script instead of
89# remapping files
90
91
93 obs,
94 gpsstart,
95 Tsft,
96 channel=None,
97 kind=None,
98 rev=None,
99 window="unknown",
100 par=None,
101 miscstr=None,
102):
103 """Create SFT file name from specification"""
104
105 spec = SFTFilenameSpec()
106
107 FillSFTFilenameSpecStrings(
108 spec=spec,
109 path=None,
110 extn=None,
111 detector=channel[:2],
112 window_type=window,
113 privMisc=miscstr,
114 pubObsKind=kind,
115 pubChannel=channel,
116 )
117 spec.pubObsRun = obs or 0
118 spec.pubRevision = rev or 0
119 spec.window_param = par or 0
120 spec.numSFTs = 1 # MakeSFTDAG will only ever generate 1 SFT per file
121 spec.SFTtimebase = Tsft
122 spec.gpsStart = gpsstart
123 spec.SFTspan = Tsft # MakeSFTDAG will only ever generate 1 SFT per file
124
125 return BuildSFTFilenameFromSpec(spec)
126
127
128#
129# FUNCTION THAT WRITE ONE JOB TO DAG FILE
130#
131def writeToDag(dagFID, nodeCount, startTimeThisNode, endTimeThisNode, site, args):
132 datafind = f"datafind_{nodeCount}"
133 MakeSFTs = f"MakeSFTs_{nodeCount}"
134 startTimeDatafind = startTimeThisNode - args.extra_datafind_time
135 endTimeDatafind = endTimeThisNode + args.extra_datafind_time
136 tagStringOut = f"{args.tag_string}_{nodeCount}"
137 if args.cache_file:
138 cacheFile = args.cache_file
139 else:
140 cacheFile = (
141 args.cache_path / f"{site}-{startTimeDatafind}-{endTimeDatafind}.cache"
142 )
143
144 argList = []
145 argList.append(f"-O {args.observing_run}")
146 if args.observing_run > 0:
147 argList.append(f"-K {args.observing_kind}")
148 argList.append(f"-R {args.observing_revision}")
149 elif args.misc_desc:
150 argList.append(f"-X {args.misc_desc}")
151 argList.append(f"-f {args.filter_knee_freq}")
152 argList.append(f"-t {args.time_baseline}")
153 # To work with the condor file transfer protocol, we save everything to the
154 # scratch directory because the files will have unique names. Since
155 # lalpulsar_MakeSFTs wants to have a path to save the file to, we provide .
156 argList.append(f"-p {','.join(['.' for p in args.output_sft_path])}")
157 # To work with the condor file transfer protocol, the cache file is saved
158 # to the scratch directory on transfer so we just need the name, not the
159 # full path
160 argList.append(f"-C {cacheFile.name}")
161 argList.append(f"-s {startTimeThisNode}")
162 argList.append(f"-e {endTimeThisNode}")
163 argList.append(f"-N {','.join(args.channel_name)}")
164 argList.append(f"-F {args.start_freq}")
165 argList.append(f"-B {args.band}")
166 if args.comment_field:
167 argList.append(f"-c {args.comment_field}")
168 if ":" in args.window_type:
169 window_type, window_param = args.window_type.split(":")
170 window_param = float(window_param)
171 argList.append(f"-w {window_type} -r {window_param}")
172 else:
173 window_type = args.window_type
174 window_param = None
175 argList.append(f"-w {window_type}")
176 if args.overlap_fraction:
177 argList.append(f"-P {args.overlap_fraction}")
178 if args.allow_skipping:
179 argList.append("--allow-skipping TRUE")
180 argStr = " ".join(argList)
181
182 # The files are going to go to specific directories, so we need to map
183 # the files to their output directories
184 outputfiles = []
185 remap = []
186 sft_start = startTimeThisNode
187 sft_end = sft_start + args.time_baseline
188 # loop over start times
189 while sft_end <= endTimeThisNode:
190 # loop over channels
191 for idx, c in enumerate(args.channel_name):
192 filename = sft_name_from_vars(
193 args.observing_run,
194 sft_start,
195 args.time_baseline,
196 c,
197 kind=args.observing_kind,
198 rev=args.observing_revision,
199 window=window_type,
200 par=window_param,
201 miscstr=args.misc_desc,
202 )
203 outputfiles.append(filename)
204 remap.append(f"{filename}={args.output_sft_path[idx]/filename}")
205
206 # update start and end times
207 if args.overlap_fraction:
208 sft_start += int(round((1 - args.overlap_fraction) * args.time_baseline))
209 else:
210 sft_start += args.time_baseline
211 sft_end = sft_start + args.time_baseline
212
213 # gw_data_find job
214 if not args.cache_file:
215 dagFID.write(f"JOB {datafind} {Path(dagFID.name).parent / 'datafind.sub'}\n")
216 dagFID.write(f"RETRY {datafind} 1\n")
217 dagFID.write(
218 f'VARS {datafind} gpsstarttime="{startTimeDatafind}" '
219 f'gpsendtime="{endTimeDatafind}" observatory="{site}" '
220 f'inputdatatype="{args.input_data_type}" tagstring="{tagStringOut}"\n'
221 )
222
223 # MakeSFT job
224 dagFID.write(f"JOB {MakeSFTs} {Path(dagFID.name).parent / 'MakeSFTs.sub'}\n")
225 dagFID.write(f"RETRY {MakeSFTs} 1\n")
226 dagFID.write(
227 f'VARS {MakeSFTs} argList="{argStr}" cachefile="{cacheFile}" '
228 f'tagstring="{tagStringOut}"\n'
229 )
230 if not args.cache_file:
231 dagFID.write(f"PARENT {datafind} CHILD {MakeSFTs}\n")
232
233
234#
235# MAIN CODE START HERE
236#
237
238parser = argparse.ArgumentParser(
239 description="This script creates datafind.sub, MakeSFTs.sub, and a dag \
240 file that generates SFTs based on the options given.",
241 fromfile_prefix_chars="@",
242)
243parser.add_argument(
244 "-O",
245 "--observing-run",
246 required=True,
247 type=int,
248 help="For public SFTs, observing run data the SFTs are generated from, or \
249 (in the case of mock data challenge data) the observing \
250 run on which the data is most closely based",
251)
252parser.add_argument(
253 "-K",
254 "--observing-kind",
255 type=str,
256 choices=["RUN", "AUX", "SIM", "DEV"],
257 help='For public SFTs, one of: "RUN" for production SFTs of h(t) channels; \
258 "AUX" for SFTs of non-h(t) channels; \
259 "SIM" for mock data challenge or other simulated data; or \
260 "DEV" for development/testing purposes',
261)
262parser.add_argument(
263 "-R",
264 "--observing-revision",
265 type=int,
266 help="For public SFTs: revision number starts at 1, and should be incremented once \
267 SFTs have been widely distributed across clusters, advertised \
268 as being ready for use, etc. For example, if mistakes are found \
269 in the initial SFT production run after they have been published, \
270 regenerated SFTs should have a revision number of at least 2",
271)
272parser.add_argument(
273 "-X",
274 "--misc-desc",
275 type=str,
276 help="For private SFTs, miscellaneous part of the SFT \
277 description field in the filename",
278)
279parser.add_argument(
280 "-a",
281 "--analysis-start-time",
282 type=int,
283 help="GPS start time of data from which to generate \
284 SFTs (optional and unused if a segment file is given)",
285)
286parser.add_argument(
287 "-b",
288 "--analysis-end-time",
289 type=int,
290 help="GPS end time of data from which to generate SFTs \
291 (optional and unused if a segment file is given)",
292)
293parser.add_argument(
294 "-f",
295 "--dag-file",
296 required=True,
297 type=Path,
298 help="filename for .dag file (should end in .dag)",
299)
300parser.add_argument(
301 "-G",
302 "--tag-string",
303 required=True,
304 type=str,
305 help="tag string used in names of various files unique to \
306 jobs that will run under the DAG",
307)
308parser.add_argument(
309 "-d",
310 "--input-data-type",
311 required=True,
312 type=str,
313 help="input data type for use with the gw_data_find --type \
314 option",
315)
316parser.add_argument(
317 "-x",
318 "--extra-datafind-time",
319 type=int,
320 default=0,
321 help="extra time to subtract/add from/to start/end time \
322 arguments of gw_data_find",
323)
324parser.add_argument(
325 "-M",
326 "--datafind-match",
327 type=str,
328 help="string to use with the gw_data_find --match option",
329)
330parser.add_argument(
331 "-y",
332 "--synchronize-start",
333 action="store_true",
334 help="synchronize the start times of the SFTs so that the \
335 start times are synchronized when there are gaps in the \
336 data",
337)
338parser.add_argument(
339 "-k",
340 "--filter-knee-freq",
341 required=True,
342 type=float,
343 help="high pass filter knee frequency used on time domain \
344 data before generating SFTs",
345)
346parser.add_argument(
347 "-T",
348 "--time-baseline",
349 required=True,
350 type=int,
351 help="time baseline of SFTs (e.g., 60 or 1800 seconds)",
352)
353parser.add_argument(
354 "-p",
355 "--output-sft-path",
356 nargs="+",
357 type=Path,
358 help="Path where to save the SFT files. Can specify multiple options, \
359 If specifying multiple options then it is required to specify the \
360 same number of output-sft-path options as the number of channels. \
361 The first listed channel will have the SFTs go into the first \
362 listed output-sft-path. Otherwise specify only one output path. \
363 If one path is specified and more than 1 channels are specified \
364 then --observing-run must be >= 1 and --observing-kind and \
365 --observing-revision must be set",
366)
367parser.add_argument(
368 "-C",
369 "--cache-path",
370 type=Path,
371 default="cache",
372 help="path to cache files that will be produced by \
373 gw_data_find (default is $PWD/cache; this directory is \
374 created if it does not exist and must agree with that \
375 given in .sub files)",
376)
377parser.add_argument(
378 "-e",
379 "--cache-file",
380 type=Path,
381 help="path and filename to frame cache file to use instead \
382 of gw_data_find",
383)
384parser.add_argument(
385 "-o",
386 "--log-path",
387 type=Path,
388 default="logs",
389 help="path to log, output, and error files (default \
390 is $PWD/logs; this directory is created if it does not \
391 exist and usually should be under a local file system)",
392)
393parser.add_argument(
394 "-N",
395 "--channel-name",
396 nargs="+",
397 type=str,
398 help="Name of input time-domain channel to read from frames. \
399 Can specify multiple options. The number of channels must be \
400 equal to the number of output-sft-path options given. The \
401 first listed channel will have the SFTs go to the first listed \
402 output-sft-path. Can only specify one channel when generating \
403 private SFTs (--observing-run=0)",
404)
405parser.add_argument(
406 "--allow-skipping",
407 action="store_true",
408 help="allow channels to be skipped if not in frames or too low sampling \
409 frequency",
410)
411parser.add_argument("-c", "--comment-field", type=str, help="comment for SFT header")
412parser.add_argument(
413 "-F", "--start-freq", type=int, default=10, help="start frequency of the SFTs"
414)
415parser.add_argument(
416 "-B", "--band", type=int, default=1990, help="frequency band of the SFTs"
417)
418parser.add_argument(
419 "-w",
420 "--window-type",
421 type=str,
422 default="tukey:0.001",
423 help='type of windowing of time-domain to do \
424 before generating SFTs, e.g. "rectangular", \
425 "hann", "tukey:<beta in [0,1], required>"; \
426 (default is "tukey:0.001", standard choice for LVK production SFTs)',
427)
428parser.add_argument(
429 "-P",
430 "--overlap-fraction",
431 type=float,
432 default=0,
433 help="overlap fraction (for use with windows; e.g., use \
434 --overlap-fraction 0.5 with --window-type hann windows)",
435)
436parser.add_argument(
437 "-m",
438 "--max-num-per-node",
439 type=int,
440 default=1,
441 help="maximum number of SFTs to generate on one node",
442)
443parser.add_argument(
444 "-L",
445 "--max-length-all-jobs",
446 type=int,
447 help="maximum total amount of data to process, in seconds \
448 (optional and unused if a segment file is given)",
449)
450parser.add_argument(
451 "-g",
452 "--segment-file",
453 type=Path,
454 help="alternative file with segments to use, rather than \
455 the input times",
456)
457parser.add_argument(
458 "-l",
459 "--min-seg-length",
460 type=int,
461 default=0,
462 help="minimum length segments to process in seconds (used \
463 only if a segment file is given)",
464)
465parser.add_argument(
466 "-q",
467 "--list-of-nodes",
468 type=str,
469 help="file with list of nodes on which to output SFTs",
470)
471parser.add_argument(
472 "-Q",
473 "--node-path",
474 type=Path,
475 help="path to nodes to output SFTs; the node name is \
476 appended to this path, followed by path given by the -p \
477 option; for example, if -q point to file with the list \
478 node1 node2 ... and the -Q /data/ -p /frames/S5/sfts/LHO \
479 options are given, the first output file will go into \
480 /data/node1/frames/S5/sfts/LHO; the next node in the list \
481 is used in constructing the path when the number of jobs \
482 given by the -r option reached, and so on",
483)
484parser.add_argument(
485 "-r",
486 "--output-jobs-per-node",
487 type=int,
488 default=0,
489 help="number of jobs to output per node in the list of \
490 nodes given with the -q option",
491)
492parser.add_argument(
493 "-j",
494 "--datafind-path",
495 type=Path,
496 help="string specifying the gw_data_find executable, \
497 or a path to it; if not set, will use \
498 LSC_DATAFIND_PATH env variable or system default (in \
499 that order)",
500)
501parser.add_argument(
502 "-J",
503 "--makesfts-path",
504 type=Path,
505 help="string specifying the lalpulsar_MakeSFTs executable, \
506 or a path to it; if not set, will use \
507 MAKESFTS_PATH env variable or system default (in that \
508 order)",
509)
510parser.add_argument(
511 "--movesfts-path",
512 type=Path,
513 help="string specifying the lalpulsar_MoveSFTs executable, \
514 or a path to it; if not set, will use \
515 MOVESFTS_PATH env variable or system default (in that \
516 order)",
517)
518parser.add_argument(
519 "--no-validate",
520 dest="validate",
521 action="store_false",
522 help="do not validate created SFTs",
523)
524parser.add_argument(
525 "-Y",
526 "--request-memory",
527 type=int,
528 default=2048,
529 help="memory allocation in MB to request from condor for \
530 lalpulsar_MakeSFTs step",
531)
532parser.add_argument(
533 "-s",
534 "--request-disk",
535 type=int,
536 default=1024,
537 help="disk space allocation in MB to request from condor \
538 for lalpulsar_MakeSFTs step",
539)
540parser.add_argument(
541 "-A",
542 "--accounting-group",
543 required=True,
544 type=str,
545 help="Condor tag for the production of SFTs",
546)
547parser.add_argument(
548 "-U",
549 "--accounting-group-user",
550 required=True,
551 type=str,
552 help="albert.einstein username (do not add @LIGO.ORG)",
553)
554
555
556##### DEPRECATED OPTIONS #####
557class DeprecateAction(argparse.Action):
558 def __call__(self, parser, namespace, values, option_string=None):
559 parser.error(
560 f"Argument {self.option_strings} has been deprecated in lalpulsar_MakeSFTs"
561 )
562
563
564parser.add_argument(
565 "-u",
566 "--frame-struct-type",
567 nargs=0,
568 action=DeprecateAction,
569 help="DEPRECATED. No longer required; \
570 the frame channel type is determined automatically",
571)
572parser.add_argument(
573 "-H",
574 "--use-hot",
575 nargs=0,
576 action=DeprecateAction,
577 help="DEPRECATED. No longer required; \
578 the frame channel type is determined automatically",
579)
580parser.add_argument(
581 "-i",
582 "--ifo",
583 nargs=0,
584 action=DeprecateAction,
585 help="DEPRECATED. No longer required; \
586 the detector prefix is deduced from the channel name",
587)
588parser.add_argument(
589 "-D",
590 "--make-gps-dirs",
591 nargs=0,
592 action=DeprecateAction,
593 help="DEPRECATED. No longer supported",
594)
595parser.add_argument(
596 "-Z",
597 "--make-tmp-file",
598 nargs=0,
599 action=DeprecateAction,
600 help="DEPRECATED. Default behaviour",
601)
602parser.add_argument(
603 "-v",
604 "--sft-version",
605 nargs=0,
606 action=DeprecateAction,
607 help="DEPRECATED. No longer supported",
608)
609parser.add_argument(
610 "-S",
611 "--use-single",
612 nargs=0,
613 action=DeprecateAction,
614 help="DEPRECATED. No longer supported",
615)
616
617args = parser.parse_args()
618
619# Some basic argument value checking
620if args.observing_run < 0:
621 raise parser.error("--observing-run must be >= 0")
622
623if args.observing_run > 0 and not args.observing_kind:
624 raise parser.error("--observing-run requires --observing-kind")
625
626if args.observing_run > 0 and not args.observing_revision:
627 raise parser.error("--observing-run requires --observing-revision")
628
629if args.observing_revision and args.observing_revision <= 0:
630 raise parser.error("--observing-revision must be > 0")
631
632if args.observing_run > 0 and args.misc_desc:
633 raise parser.error(
634 f"--observing-run={args.observing_run} incompatible with --misc-desc"
635 )
636
637if args.misc_desc and not re.compile(r"^[A-Za-z0-9]+$").match(args.misc_desc):
638 raise parser.error("--misc-desc may only contain A-Z, a-z, 0-9 characters")
639
640if args.extra_datafind_time < 0:
641 raise parser.error("--extra-datafind-time must be >= 0")
642
643if args.filter_knee_freq < 0:
644 raise parser.error("--filter-knee-freq must be >= 0")
645
646if args.time_baseline <= 0:
647 raise parser.error("--time-baseline must be > 0")
648
649if args.overlap_fraction < 0.0 or args.overlap_fraction >= 1.0:
650 raise parser.error("--overlap-fraction must be in the range [0,1)")
651
652if args.start_freq < 0.0 or args.start_freq >= 7192.0:
653 raise parser.error("--start-freq must be in the range [0,7192)")
654
655if args.band <= 0 or args.band >= 8192.0:
656 raise parser.error("--band must be in the range (0,8192)")
657
658if args.start_freq + args.band >= 8192.0:
659 raise parser.error("--start-freq + --band must be < 8192")
660
661if args.max_num_per_node <= 0:
662 raise parser.error("--max-num-per-node must be > 0")
663
664if (
665 len(args.channel_name) != len(args.output_sft_path)
666 and len(args.output_sft_path) != 1
667):
668 raise parser.error(
669 "--channel-name and --output-sft-path must be the "
670 "same length or --output-sft-path must be length of 1"
671 )
672
673if len(args.channel_name) > 1 and args.observing_run == 0:
674 raise parser.error(
675 "When creating SFTs from multiple channels, public SFT naming "
676 "convention must be used: --observing-run > 0 and set "
677 "--observing-kind and --observing-revision"
678 )
679
680# Set executables for gw_data_find, lalpulsar_MakeSFTs, and lalpulsar_MoveSFTs
681dataFindExe = "gw_data_find"
682if args.datafind_path:
683 if args.datafind_path.is_file():
684 dataFindExe = args.datafind_path
685 else:
686 dataFindExe = args.datafind_path / dataFindExe
687elif "LSC_DATAFIND_PATH" in os.environ:
688 dataFindExe = Path("$ENV(LSC_DATAFIND_PATH)") / dataFindExe
689else:
690 dataFindExe = Path("/usr/bin") / dataFindExe
691
692makeSFTsExe = "lalpulsar_MakeSFTs"
693if args.makesfts_path:
694 if args.makesfts_path.is_file():
695 makeSFTsExe = args.makesfts_path
696 else:
697 makeSFTsExe = args.makesfts_path / makeSFTsExe
698elif "MAKESFTS_PATH" in os.environ:
699 makeSFTsExe = Path("$ENV(MAKESFTS_PATH)") / makeSFTsExe
700else:
701 makeSFTsExe = Path("@LALSUITE_BINDIR@") / makeSFTsExe
702
703moveSFTsExe = "lalpulsar_MoveSFTs"
704if args.movesfts_path:
705 if args.movesfts_path.is_file():
706 moveSFTsExe = args.movesfts_path
707 else:
708 moveSFTsExe = args.movesfts_path / moveSFTsExe
709elif "MOVESFTS_PATH" in os.environ:
710 moveSFTsExe = Path("$ENV(MOVESFTS_PATH)") / moveSFTsExe
711else:
712 moveSFTsExe = Path("@LALSUITE_BINDIR@") / moveSFTsExe
713
714# try and make a directory to store the cache files and job logs
715try:
716 args.log_path.mkdir()
717except:
718 pass
719if not args.cache_file:
720 try:
721 args.cache_path.mkdir()
722 except:
723 pass
724
725# Check if list of nodes is given, on which to output SFTs.
726nodeList = []
727useNodeList = False
728savedOutputSFTPath = None
729if args.list_of_nodes is not None:
730 if args.node_path is None:
731 raise argparse.error("Node file list given, but no node path specified")
732
733 if args.output_jobs_per_node < 1:
734 raise argparse.error(
735 "Node file list given, but invalid output jobs per node specified"
736 )
737
738 with open(args.list_of_nodes) as fp_nodelist:
739 for idx, line in enumerate(fp_nodelist):
740 splitLine = line.split()
741 nodeList.append(splitLine[0])
742 if len(nodeList) < 1:
743 raise ValueError(
744 "No nodes found in node list file: {}".format(args.list_of_nodes)
745 )
746
747 # Set flag to use list of nodes in constructing output files
748 useNodeList = True
749 savedOutputSFTPath = args.output_sft_path
750# END if (args.list_of_nodes != None)
751
752# Check if segment file was given, else set up one segment from the command line
753segList = []
754adjustSegExtraTime = False
755if args.segment_file is not None:
756 if args.min_seg_length < 0:
757 raise argparse.error("--min-seg-length must be >= 0")
758
759 # the next flag causes extra time that cannot be processes to be trimmed
760 # from the start and end of a segment
761 adjustSegExtraTime = True
762
763 with open(args.segment_file) as fp_segfile:
764 for idx, line in enumerate(fp_segfile):
765 splitLine = line.split()
766 oneSeg = []
767 oneSeg.append(int(splitLine[0]))
768 oneSeg.append(int(splitLine[1]))
769 if (oneSeg[1] - oneSeg[0]) >= args.min_seg_length:
770 segList.append(oneSeg)
771
772 if len(segList) < 1:
773 raise ValueError(
774 "No segments found in segment file: {}".format(args.segment_file)
775 )
776else:
777 if args.analysis_start_time is None:
778 raise argparse.error(
779 "--analysis-start-time must be specified if no segment file is \
780 given"
781 )
782
783 if args.analysis_end_time is None:
784 raise argparse.error(
785 "--analysis-start-time must be specified if no segment file is \
786 given"
787 )
788
789 if args.max_length_all_jobs is None:
790 raise argparse.error(
791 "--max-length-all-jobs must be specified if no segment file is \
792 given"
793 )
794
795 # Make sure not to exceed maximum allow analysis
796 if args.analysis_end_time > (args.analysis_start_time + args.max_length_all_jobs):
797 args.analysis_end_time = args.analysis_start_time + args.max_length_all_jobs
798
799 oneSeg = []
800 oneSeg.append(args.analysis_start_time)
801 oneSeg.append(args.analysis_end_time)
802 segList.append(oneSeg)
803# END if (args.segment_file != None)
804
805# Get the IFO site, which is the first letter of the channel name.
806site = args.channel_name[0][0]
807
808# initialize count of nodes
809nodeCount = 0
810
811# Create .sub files
812path_to_dag_file = args.dag_file.parent
813dag_filename = args.dag_file.name
814datafind_sub = path_to_dag_file / "datafind.sub"
815makesfts_sub = path_to_dag_file / "MakeSFTs.sub"
816movesfts_sub = path_to_dag_file / "MoveSFTs.sub"
817
818# create datafind.sub
819if not args.cache_file:
820 with open(datafind_sub, "w") as datafindFID:
821 datafindLogFile = f"{args.log_path}/datafind_{dag_filename}.log"
822 datafindFID.write("universe = vanilla\n")
823 datafindFID.write(f"executable = {dataFindExe}\n")
824 datafindFID.write("arguments = ")
825 datafindFID.write("--observatory $(observatory) --url-type file ")
826 datafindFID.write("--gps-start-time $(gpsstarttime) ")
827 datafindFID.write("--gps-end-time $(gpsendtime) --lal-cache --gaps ")
828 datafindFID.write(f"--type $(inputdatatype)")
829 if args.datafind_match:
830 datafindFID.write(f" --match {args.datafind_match}\n")
831 else:
832 datafindFID.write("\n")
833 datafindFID.write("getenv = *DATAFIND*\n")
834 datafindFID.write("request_disk = 5MB\n")
835 datafindFID.write("request_memory = 2000MB\n")
836 datafindFID.write(f"accounting_group = {args.accounting_group}\n")
837 datafindFID.write(f"accounting_group_user = {args.accounting_group_user}\n")
838 datafindFID.write(f"log = {datafindLogFile}\n")
839 datafindFID.write(f"error = {args.log_path}/datafind_$(tagstring).err\n")
840 datafindFID.write(f"output = {args.cache_path}/")
841 datafindFID.write("$(observatory)-$(gpsstarttime)-$(gpsendtime).cache\n")
842 datafindFID.write("should_transfer_files = yes\n")
843 datafindFID.write("notification = never\n")
844 datafindFID.write("queue 1\n")
845
846# create MakeSFTs.sub
847with open(makesfts_sub, "w") as MakeSFTsFID:
848 MakeSFTsLogFile = f"{args.log_path}/MakeSFTs_{dag_filename}.log"
849 MakeSFTsFID.write("universe = vanilla\n")
850 MakeSFTsFID.write(f"executable = {makeSFTsExe}\n")
851 MakeSFTsFID.write("arguments = $(argList)\n")
852 MakeSFTsFID.write(f"accounting_group = {args.accounting_group}\n")
853 MakeSFTsFID.write(f"accounting_group_user = {args.accounting_group_user}\n")
854 MakeSFTsFID.write(f"log = {MakeSFTsLogFile}\n")
855 MakeSFTsFID.write(f"error = {args.log_path}/MakeSFTs_$(tagstring).err\n")
856 MakeSFTsFID.write(f"output = {args.log_path}/MakeSFTs_$(tagstring).out\n")
857 MakeSFTsFID.write("notification = never\n")
858 MakeSFTsFID.write(f"request_memory = {args.request_memory}MB\n")
859 MakeSFTsFID.write(f"request_disk = {args.request_disk}MB\n")
860 MakeSFTsFID.write("RequestCpus = 1\n")
861 MakeSFTsFID.write("should_transfer_files = yes\n")
862 MakeSFTsFID.write("transfer_input_files = $(cachefile)\n")
863 if "MAKESFTS_PATH" in os.environ and not args.makesfts_path:
864 MakeSFTsFID.write("getenv = MAKESFTS_PATH\n")
865 MakeSFTsFID.write("queue 1\n")
866
867# create MoveSFTs.sub
868with open(movesfts_sub, "w") as MoveSFTsFID:
869 MoveSFTsLogFile = f"{args.log_path}/MoveSFTs_{dag_filename}.log"
870 MoveSFTsFID.write("universe = local\n")
871 MoveSFTsFID.write(f"executable = {moveSFTsExe}\n")
872 MoveSFTsFID.write("arguments = ")
873 if not args.validate:
874 MoveSFTsFID.write("$(opts) ")
875 MoveSFTsFID.write("-s $(sourcedirectory) -c $(channels) -d $(destdirectory)\n")
876 MoveSFTsFID.write(f"accounting_group = {args.accounting_group}\n")
877 MoveSFTsFID.write(f"accounting_group_user = {args.accounting_group_user}\n")
878 MoveSFTsFID.write(f"log = {MoveSFTsLogFile}\n")
879 MoveSFTsFID.write(f"error = {args.log_path}/MoveSFTs.err\n")
880 MoveSFTsFID.write(f"output = {args.log_path}/MoveSFTs.out\n")
881 MoveSFTsFID.write("notification = never\n")
882 MoveSFTsFID.write(f"request_memory = 1GB\n")
883 MoveSFTsFID.write(f"request_disk = 10MB\n")
884 MoveSFTsFID.write("RequestCpus = 1\n")
885 if "MOVESFTS_PATH" in os.environ and not args.movesfts_path:
886 MoveSFTsFID.write("getenv = MOVESFTS_PATH\n")
887 MoveSFTsFID.write("queue 1\n")
888
889# create the DAG file with the jobs to run
890with open(args.dag_file, "w") as dagFID:
891 startTimeAllNodes = None
892 firstSFTstartTime = 0 # need this for the synchronized start option
893 nodeListIndex = 0
894
895 # Loop over the segment list to generate the SFTs for each segment
896 for seg in segList:
897 # Each segment in the segList runs on one or more nodes;
898 # initialize the number SFTs produced by the current node:
899 numThisNode = 0
900 numThisSeg = 0
901
902 # Case 1: a segment file was given but the SFTs do not need their
903 # start times to be synchronized
904 if adjustSegExtraTime and not args.synchronize_start:
905 segStartTime = seg[0]
906 segEndTime = seg[1]
907
908 # First we figure out how much extra time is in the segment so that
909 # SFTs are fit within the segment:
910 # |..<SFT><SFT><SFT>..|
911 # where the .. represent the extra time in the segment
912 # The amount of extra time in a segment is given as the remainder
913 # of (total segment time) / (SFT time baseline)
914 segExtraTime = (segEndTime - segStartTime) % args.time_baseline
915
916 # If there is overlap of SFTs requested, then we compute the extra
917 # time as:
918 # the remainder of (end - start - Tsft) / (non-overlap time)
919 # provided there was at least one SFT that is in the segment
920 if args.overlap_fraction != 0.0:
921 if (segEndTime - segStartTime) > args.time_baseline:
922 segExtraTime = (
923 segEndTime - segStartTime - args.time_baseline
924 ) % int((1.0 - args.overlap_fraction) * args.time_baseline)
925
926 # We'll add half the extra time to the start of the SFTs to be
927 # created in this segment and half at the end
928 segExtraStart = int(segExtraTime / 2)
929 segExtraEnd = segExtraTime - segExtraStart
930 args.analysis_start_time = segStartTime + segExtraStart
931
932 # This shift may have pushed past the end time of the segment. In
933 # that case, just fix the start time to the end time of the segment
934 if args.analysis_start_time > segEndTime:
935 args.analysis_start_time = segEndTime
936
937 # shifting the end time by the other portion of the extra time
938 # amount ...
939 args.analysis_end_time = segEndTime - segExtraEnd
940
941 # Again, this shift could have pushed the end time beyond the start
942 # of the segment, so just fix the end time to the segment start
943 if args.analysis_end_time < segStartTime:
944 args.analysis_end_time = segStartTime
945
946 # Case 2: SFTs need a synchronized start. This is a special case for
947 # methods like TwoSpect, where signal periodicity spacing must be
948 # maintained
949 elif args.synchronize_start:
950 segStartTime = seg[0]
951 segEndTime = seg[1]
952
953 # If we haven't set the first SFT start time, then set it equal to
954 # the start time of the first segment
955 if firstSFTstartTime == 0:
956 firstSFTstartTime = segStartTime
957
958 # This is a tricky bit of math to set the start time based on when
959 # the first SFT start time of all the segments
960 args.analysis_start_time = (
961 int(
962 round(
963 math.ceil(
964 (segStartTime - firstSFTstartTime)
965 / ((1.0 - args.overlap_fraction) * args.time_baseline)
966 )
967 * (1.0 - args.overlap_fraction)
968 * args.time_baseline
969 )
970 )
971 + firstSFTstartTime
972 )
973
974 # This shift may have pushed past the end time of the segment. In
975 # that case, just fix the start time to the end time of the segment
976 if args.analysis_start_time > segEndTime:
977 args.analysis_start_time = segEndTime
978
979 # This is a tricky bit of math to set the end time based on when
980 # the first SFT start time of all the segments
981 args.analysis_end_time = (
982 int(
983 round(
984 math.floor(
985 (segEndTime - args.analysis_start_time - args.time_baseline)
986 / ((1.0 - args.overlap_fraction) * args.time_baseline)
987 )
988 * (1.0 - args.overlap_fraction)
989 * args.time_baseline
990 )
991 )
992 + args.time_baseline
993 + args.analysis_start_time
994 )
995
996 # Again, this shift could have pushed the end time beyond the start
997 # of the segment, so just fix the end time to the segment start
998 if args.analysis_end_time < segStartTime:
999 args.analysis_end_time = segStartTime
1000
1001 # If no segment file given and no synchronized starts, just set the
1002 # start time and end time to the segment start and end
1003 else:
1004 args.analysis_start_time = seg[0]
1005 args.analysis_end_time = seg[1]
1006
1007 # Loop through the analysis time; make sure no more than
1008 # args.max_num_per_node SFTs are produced by any one node
1009 startTimeThisNode = args.analysis_start_time
1010 endTimeThisNode = args.analysis_start_time
1011 endTimeAllNodes = args.analysis_start_time
1012 while endTimeAllNodes < args.analysis_end_time:
1013 # increment endTimeAllNodes by the args.time_baseline until we get
1014 # past the args.analysis_end_time
1015 if args.overlap_fraction != 0.0:
1016 # handle overlap
1017 if numThisSeg == 0:
1018 endTimeAllNodes = endTimeAllNodes + args.time_baseline
1019 else:
1020 endTimeAllNodes = endTimeAllNodes + int(
1021 (1.0 - args.overlap_fraction) * args.time_baseline
1022 )
1023 else:
1024 # default case, no overlap
1025 endTimeAllNodes = endTimeAllNodes + args.time_baseline
1026 if endTimeAllNodes <= args.analysis_end_time:
1027 # increment the number of SFTs output from this node, and
1028 # update the end time this node.
1029 numThisNode = numThisNode + 1
1030 numThisSeg = numThisSeg + 1
1031 endTimeThisNode = endTimeAllNodes
1032 if numThisNode < args.max_num_per_node:
1033 continue
1034 else:
1035 # write jobs to dag for this node
1036 nodeCount = nodeCount + 1
1037
1038 if useNodeList:
1039 args.output_sft_path = (
1040 args.node_path
1041 + nodeList[nodeListIndex]
1042 + savedOutputSFTPath
1043 )
1044 if (nodeCount % args.output_jobs_per_node) == 0:
1045 nodeListIndex = nodeListIndex + 1
1046 # END if ((nodeCount % args.output_jobs_per_node) == 0L)
1047 # END if (useNodeList)
1048
1049 if nodeCount == 1:
1050 startTimeAllNodes = startTimeThisNode
1051 writeToDag(
1052 dagFID,
1053 nodeCount,
1054 startTimeThisNode,
1055 endTimeThisNode,
1056 site,
1057 args,
1058 )
1059 # Update for next node
1060 numThisNode = 0
1061 if args.overlap_fraction != 0.0:
1062 # handle overlap
1063 startTimeThisNode = endTimeThisNode - int(
1064 (args.overlap_fraction) * args.time_baseline
1065 )
1066 else:
1067 # default case, no overlap
1068 startTimeThisNode = endTimeThisNode
1069 else:
1070 # we are at or past the args.analysis_end_time; output job for last
1071 # node if needed.
1072 if numThisNode > 0:
1073 # write jobs to dag for this node
1074 nodeCount = nodeCount + 1
1075
1076 if useNodeList:
1077 args.output_sft_path = (
1078 args.node_path + nodeList[nodeListIndex] + savedOutputSFTPath
1079 )
1080 if (nodeCount % args.output_jobs_per_node) == 0:
1081 nodeListIndex = nodeListIndex + 1
1082 # END if ((nodeCount % args.output_jobs_per_node) == 0L)
1083 # END if (useNodeList)
1084
1085 if nodeCount == 1:
1086 startTimeAllNodes = startTimeThisNode
1087 writeToDag(
1088 dagFID, nodeCount, startTimeThisNode, endTimeThisNode, site, args
1089 )
1090 # END while (endTimeAllNodes < args.analysis_end_time)
1091 # END for seg in segList
1092
1093 # Write the move SFTs job to the DAG
1094 # Move SFTs
1095 dagFID.write(f"JOB MoveSFTs {Path(dagFID.name).parent / 'MoveSFTs.sub'}\n")
1096 dagFID.write(f"RETRY MoveSFTs 1\n")
1097 dagFID.write(f"VARS MoveSFTs ")
1098 if not args.validate:
1099 dagFID.write('opts="--no-validate" ')
1100 dagFID.write(
1101 f'sourcedirectory="." '
1102 f"channels=\"{' '.join(args.channel_name)}\" "
1103 f"destdirectory=\"{' '.join([str(p) for p in args.output_sft_path])}\"\n"
1104 )
1105 dagFID.write(
1106 f"PARENT {' '.join([f'MakeSFTs_{n}' for n in range(1, nodeCount+1)])} CHILD MoveSFTs\n"
1107 )
1108
1109# Close the DAG file
1110
1111# Update actual end time of the last job and print out the times all jobs will run on:
1112endTimeAllNodes = endTimeThisNode
1113
1114if startTimeAllNodes is None:
1115 raise Exception("The startTimeAllNodes == none; the DAG file contains no jobs!")
1116
1117if endTimeAllNodes <= startTimeAllNodes:
1118 raise Exception(
1119 "The endTimeAllNodes <= startTimeAllNodes; the DAG file contains no jobs!"
1120 )
1121
1122print(startTimeAllNodes, endTimeAllNodes)
def __call__(self, parser, namespace, values, option_string=None)
def writeToDag(dagFID, nodeCount, startTimeThisNode, endTimeThisNode, site, args)
def sft_name_from_vars(obs, gpsstart, Tsft, channel=None, kind=None, rev=None, window="unknown", par=None, miscstr=None)
Create SFT file name from specification.