Loading [MathJax]/extensions/TeX/AMSsymbols.js
LALPulsar 7.1.1.1-b246709
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Modules Pages
lalpulsar_MakeSFTDAG.py
Go to the documentation of this file.
1##python
2# Copyright (C) 2013, 2014, 2020--2024 Evan Goetz
3# Copyright (C) 2011, 2021, 2022 Karl Wette
4# Copyright (C) 2005, 2007 Gregory Mendell
5#
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with with program; see the file COPYING. If not, write to the
18# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19# MA 02110-1301 USA
20
21## \file
22## \ingroup lalpulsar_bin_SFTTools
23"""Creates DAGs to run jobs that generates SFTs"""
24
25import math
26import argparse
27import os
28import re
29from pathlib import Path
30from urllib.parse import urlparse
31
32from gwdatafind import find_urls
33from gwdatafind.utils import filename_metadata, file_segment
34
35from gwpy.segments import Segment, SegmentList
36
37from lalpulsar import (
38 git_version,
39 SFTFilenameSpec,
40 FillSFTFilenameSpecStrings,
41 BuildSFTFilenameFromSpec,
42)
43
44__author__ = "Evan Goetz <evan.goetz@ligo.org>, Greg Mendell"
45__version__ = git_version.id
46__date__ = git_version.date
47
48cache_re = re.compile(r"^([A-Z])(\s+)(\w+)(\s+)(\d+)(\s+)(\d+)(\s+)(.+gwf)")
49
50
51# REVISIONS:
52# 12/02/05 gam; generate datafind.sub and MakeSFTs.sub as well as dag file in
53# PWD, with log files based subLogPath and dag filename.
54# 12/28/05 gam; Add option --make-gps-dirs, -D <num>, to make directory based
55# on this many GPS digits.
56# 12/28/05 gam; Add option --misc-desc, -X <string> giving misc. part of the
57# SFT description field in the filename.
58# 12/28/05 gam; Add options --start-freq -F and --band -B options to enter
59# these.
60# 12/28/05 gam; Add in --window-type, -w options; 0 = no window, 1 = default =
61# Matlab style Tukey window; 2 = make_sfts.c Tukey window; 3 =
62# Hann window.
63# 12/28/05 gam; Add option --overlap-fraction -P (for use with windows; e.g.,
64# use -P 0.5 with -w 3 Hann windows; default is 0.0)
65# 12/28/05 gam; Add --sft-version, -v option to select output SFT version (1 =
66# default is version 1 SFTs; 2 = version 2 SFTs.
67# 12/28/05 gam; Add --comment-field, -c option, for comment for version 2 SFTs.
68# 12/28/05 gam; Remove sample rate option
69# 01/09/06 gam; Add -Z option; write SFT to .*.tmp file, then move to final
70# file name.
71# 01/14/07 gam; Add -u option to specify frame struct and type; add -i option
72# to specify IFO name.
73# 07/24/07 gam; Add in -q option to read in list of nodes on which to output
74# SFTs, -Q option to give node path, and -R option for number of
75# jobs per node.
76# 04/XX/13 eag; Add -y option to synchronize the start times of SFTs.
77# 07/24/14 eag; Change default to version 2 SFTs
78# 12/2020 eag; Update script to conform to modern python3 and pep8
79# 10/2020 kww; Pass args directly to writeToDag(), use Python f-strings
80# 10/2022 kww; Deprecate options that have been removed from MakeSFTs
81# 10/2022 kww; Parse window type as a string, parameter separated by colon
82# 10/2022 kww; Merge -O and -o log path options to free up -O option
83# 10/2022 kww; Implement public SFT file naming convention
84# 11/2022 kww; -R command line option now used for --observing-revision
85# instead of --output-jobs-per-node, which now uses -r
86# 11/2022 kww; --datafind-path and --makesfts-path accept executable names
87# 03/2023 eag; Allow user to pass a frame cache file --cache-file
88# 04/2023 kww; Improve documentation of --window-type argument
89# 05/2023 eag; Add the --gaps flag to gw_data_find
90# 08/2023 eag; Allow for multiple channel names to be provided
91# 09/2023 eag; Modify use of environment variables
92# 01/2024 eag; Allow skipping of channels if not in frames or too low data
93# rate
94# 10/2024 eag; Modify workflow for version 3 SFTs and HTCondor file transfer
95# workflow
96# 12/2024 eag; Modify workflow to use lalpulsar_MoveSFTs script instead of
97# remapping files
98# 03/2025 eag; Rewrite to generate cache files running lalpulsar_MakeSFTDAG
99# enabling frame files on /home or OSDF
100
101
103 obs,
104 gpsstart,
105 Tsft,
106 channel=None,
107 kind=None,
108 rev=None,
109 window="unknown",
110 par=None,
111 miscstr=None,
112):
113 """Create SFT file name from specification"""
114
115 spec = SFTFilenameSpec()
116
117 FillSFTFilenameSpecStrings(
118 spec=spec,
119 path=None,
120 extn=None,
121 detector=channel[:2],
122 window_type=window,
123 privMisc=miscstr,
124 pubObsKind=kind,
125 pubChannel=channel,
126 )
127 spec.pubObsRun = obs or 0
128 spec.pubRevision = rev or 0
129 spec.window_param = par or 0
130 spec.numSFTs = 1 # MakeSFTDAG will only ever generate 1 SFT per file
131 spec.SFTtimebase = Tsft
132 spec.gpsStart = gpsstart
133 spec.SFTspan = Tsft # MakeSFTDAG will only ever generate 1 SFT per file
134
135 return BuildSFTFilenameFromSpec(spec)
136
137
138def get_urls(args):
139 """Get frame file URL list from gwdatafind or cache file"""
140
141 if not args.cache_file:
142 urls = find_urls(
143 site,
144 args.input_data_type,
145 segList[0][0],
146 segList[-1][-1],
147 match=args.datafind_match,
148 urltype=args.datafind_urltype,
149 )
150 else:
151 urls = []
152 with open(args.cache_file, "r") as f:
153 for line in f:
154 m = cache_re.match(line)
155 if m:
156 framefile = m.group(9)
157 urls.append(framefile)
158
159 return urls
160
161
162def make_cache(
163 urls,
164 job_seg,
165):
166 """Make a frame list and cache list from a list of URLs"""
167
168 cache = [] # list of lines for the cache file
169 frames = [] # list of frame filenames used in the job
170 for idx, url in enumerate(urls):
171 obs, desc, dataseg = filename_metadata(url)
172 dataseg = Segment(dataseg)
173 if dataseg.disjoint(job_seg) < 0:
174 continue
175 if dataseg.disjoint(job_seg) > 0:
176 break
177 if dataseg.intersects(job_seg):
178 framefileurl = urlparse(url)
179 framefilepath = Path(framefileurl.path)
180
181 # list in cache file if files not visible on execute node
182 # otherwise use file url
183 if "/home" in str(framefilepath.parent) or "osdf" in framefileurl.scheme:
184 newcache = f"{obs}\t{desc}\t{dataseg.start}\t{abs(dataseg)}\t{framefilepath.name}"
185 else:
186 newcache = f"{obs}\t{desc}\t{dataseg.start}\t{abs(dataseg)}\t{url}"
187 cache.append(newcache)
188
189 if "/home" in str(framefilepath.parent):
190 frames.append(framefilepath)
191 else:
192 frames.append(url)
193
194 return frames, cache
195
196
197def writeToDag(dagFID, nodeCount, startTimeThisNode, endTimeThisNode, urls, args):
198 """Write one job to DAG file"""
199
200 MakeSFTs = f"MakeSFTs_{nodeCount}"
201 tagStringOut = f"{args.tag_string}_{nodeCount}"
202
203 job_segment = Segment(
204 startTimeThisNode - args.extra_datafind_time,
205 endTimeThisNode + args.extra_datafind_time,
206 )
207
208 frames, cache = make_cache(urls, job_segment)
209
210 obs, desc, dataseg = filename_metadata(urls[0])
211 cacheFile = args.cache_path / f"{obs}-{job_segment.start}-{job_segment.end}.cache"
212 with open(cacheFile, "w") as f:
213 for l in cache:
214 f.write(f"{l}\n")
215
216 argList = []
217 argList.append(f"-O {args.observing_run}")
218 if args.observing_run > 0:
219 argList.append(f"-K {args.observing_kind}")
220 argList.append(f"-R {args.observing_revision}")
221 elif args.misc_desc:
222 argList.append(f"-X {args.misc_desc}")
223 argList.append(f"-f {args.filter_knee_freq}")
224 argList.append(f"-t {args.time_baseline}")
225 # To work with the condor file transfer protocol, we save everything to the
226 # scratch directory because the files will have unique names. Since
227 # lalpulsar_MakeSFTs wants to have a path to save the file to, we provide .
228 argList.append(f"-p {','.join(['.' for p in args.output_sft_path])}")
229 # To work with the condor file transfer protocol, the cache file is saved
230 # to the scratch directory on transfer so we just need the name, not the
231 # full path
232 argList.append(f"-C {cacheFile.name}")
233 argList.append(f"-s {startTimeThisNode}")
234 argList.append(f"-e {endTimeThisNode}")
235 argList.append(f"-N {','.join(args.channel_name)}")
236 argList.append(f"-F {args.start_freq}")
237 argList.append(f"-B {args.band}")
238 if args.comment_field:
239 argList.append(f"-c {args.comment_field}")
240 if ":" in args.window_type:
241 window_type, window_param = args.window_type.split(":")
242 window_param = float(window_param)
243 argList.append(f"-w {window_type} -r {window_param}")
244 else:
245 window_type = args.window_type
246 window_param = None
247 argList.append(f"-w {window_type}")
248 if args.overlap_fraction:
249 argList.append(f"-P {args.overlap_fraction}")
250 if args.allow_skipping:
251 argList.append("--allow-skipping TRUE")
252 argStr = " ".join(argList)
253
254 # The files are going to go to specific directories, so we need to map
255 # the files to their output directories
256 outputfiles = []
257 remap = []
258 sft_start = startTimeThisNode
259 sft_end = sft_start + args.time_baseline
260 # loop over start times
261 while sft_end <= endTimeThisNode:
262 # loop over channels
263 for idx, c in enumerate(args.channel_name):
264 filename = sft_name_from_vars(
265 args.observing_run,
266 sft_start,
267 args.time_baseline,
268 c,
269 kind=args.observing_kind,
270 rev=args.observing_revision,
271 window=window_type,
272 par=window_param,
273 miscstr=args.misc_desc,
274 )
275 outputfiles.append(filename)
276 remap.append(f"{filename}={args.output_sft_path[idx]/filename}")
277
278 # update start and end times
279 if args.overlap_fraction:
280 sft_start += int(round((1 - args.overlap_fraction) * args.time_baseline))
281 else:
282 sft_start += args.time_baseline
283 sft_end = sft_start + args.time_baseline
284
285 # MakeSFT job
286 dagFID.write(f"JOB {MakeSFTs} {Path(dagFID.name).parent / 'MakeSFTs.sub'}\n")
287 dagFID.write(f"RETRY {MakeSFTs} 1\n")
288 dagFID.write(f'VARS {MakeSFTs} argList="{argStr}" cachefile="{cacheFile}" ')
289 if args.transfer_frame_files:
290 framefiles = ",".join([str(fr) for fr in frames])
291 dagFID.write(f'framefiles="{framefiles}" ')
292 dagFID.write(f'tagstring="{tagStringOut}"\n')
293
294
295#
296# MAIN CODE START HERE
297#
298
299parser = argparse.ArgumentParser(
300 description="This script creates MakeSFTs.sub, MoveSFTs.sub, and a dag \
301 file that generates SFTs based on the options given.",
302 fromfile_prefix_chars="@",
303)
304
305dag_group = parser.add_argument_group(
306 "DAG organization", "Options for workflow control"
307)
308datafind_group = parser.add_argument_group(
309 "Datafind", "Options for locating frame files"
310)
311makesfts_group = parser.add_argument_group(
312 "SFT creation", "Options for SFT creation and output"
313)
314deprecated_group = parser.add_argument_group("DEPRECATED")
315
316dag_group.add_argument(
317 "-f",
318 "--dag-file",
319 required=True,
320 type=Path,
321 help="filename for .dag file (should end in .dag)",
322)
323dag_group.add_argument(
324 "-G",
325 "--tag-string",
326 required=True,
327 type=str,
328 help="tag string used in names of various files unique to \
329 jobs that will run under the DAG",
330)
331dag_group.add_argument(
332 "-a",
333 "--analysis-start-time",
334 type=int,
335 help="GPS start time of data from which to generate \
336 SFTs (optional and unused if a segment file is given)",
337)
338dag_group.add_argument(
339 "-b",
340 "--analysis-end-time",
341 type=int,
342 help="GPS end time of data from which to generate SFTs \
343 (optional and unused if a segment file is given)",
344)
345dag_group.add_argument(
346 "-L",
347 "--max-length-all-jobs",
348 type=int,
349 help="maximum total amount of data to process, in seconds \
350 (optional and unused if a segment file is given)",
351)
352dag_group.add_argument(
353 "-g",
354 "--segment-file",
355 type=Path,
356 help="alternative file with segments to use, rather than \
357 the input times",
358)
359dag_group.add_argument(
360 "-l",
361 "--min-seg-length",
362 type=int,
363 default=0,
364 help="minimum length segments to process in seconds (used \
365 only if a segment file is given)",
366)
367dag_group.add_argument(
368 "-y",
369 "--synchronize-start",
370 action="store_true",
371 help="synchronize the start times of the SFTs so that the \
372 start times are synchronized when there are gaps in the \
373 data",
374)
375dag_group.add_argument(
376 "-o",
377 "--log-path",
378 type=Path,
379 default="logs",
380 help="path to log, output, and error files (default \
381 is $PWD/logs; this directory is created if it does not \
382 exist and usually should be under a local file system)",
383)
384dag_group.add_argument(
385 "-m",
386 "--max-num-per-node",
387 type=int,
388 default=1,
389 help="maximum number of SFTs to generate on one node",
390)
391dag_group.add_argument(
392 "-J",
393 "--makesfts-path",
394 type=Path,
395 help="string specifying the lalpulsar_MakeSFTs executable, \
396 or a path to it; if not set, will use \
397 MAKESFTS_PATH env variable or system default (in that \
398 order)",
399)
400dag_group.add_argument(
401 "--movesfts-path",
402 type=Path,
403 help="string specifying the lalpulsar_MoveSFTs executable, \
404 or a path to it; if not set, will use \
405 MOVESFTS_PATH env variable or system default (in that \
406 order)",
407)
408dag_group.add_argument(
409 "-Y",
410 "--request-memory",
411 type=int,
412 default=4096,
413 help="memory allocation in MB to request from condor for \
414 lalpulsar_MakeSFTs step",
415)
416dag_group.add_argument(
417 "-s",
418 "--request-disk",
419 type=int,
420 default=4096,
421 help="disk space allocation in MB to request from condor \
422 for lalpulsar_MakeSFTs step",
423)
424dag_group.add_argument(
425 "-A",
426 "--accounting-group",
427 required=True,
428 type=str,
429 help="Condor tag for the production of SFTs",
430)
431dag_group.add_argument(
432 "-U",
433 "--accounting-group-user",
434 required=True,
435 type=str,
436 help="albert.einstein username (do not add @LIGO.ORG)",
437)
438dag_group.add_argument(
439 "-t",
440 "--transfer-frame-files",
441 action="store_true",
442 help="Transfer frame files via HTCondor file transfer system. \
443 This should be specified if frames are not visible to the \
444 compute node file system. Ex. this should be specified if \
445 frames are on /home or running on the open science grid. \
446 Usually frame files are visible on CIT, LHO, LLO clusters \
447 so that this does not need to be specified in that case.",
448)
449
450datafind_group.add_argument(
451 "-d",
452 "--input-data-type",
453 required=True,
454 type=str,
455 help="input data type for use with the gw_data_find --type \
456 option",
457)
458datafind_group.add_argument(
459 "-x",
460 "--extra-datafind-time",
461 type=int,
462 default=0,
463 help="extra time to subtract/add from/to start/end time \
464 arguments of gw_data_find",
465)
466datafind_group.add_argument(
467 "-M",
468 "--datafind-match",
469 type=str,
470 help="string to use with the gw_data_find --match option",
471)
472datafind_group.add_argument(
473 "--datafind-urltype",
474 type=str,
475 default="file",
476 choices=["file", "osdf"],
477 help="String for the gw_data_find --urltype option. \
478 Use 'file' if creating SFTs on a local LDG cluster. \
479 Use 'osdf' if creating SFTs on the open science grid",
480)
481datafind_group.add_argument(
482 "-e",
483 "--cache-file",
484 type=Path,
485 help="path and filename to frame cache file to use instead \
486 of gw_data_find",
487)
488
489makesfts_group.add_argument(
490 "-O",
491 "--observing-run",
492 required=True,
493 type=int,
494 help="For public SFTs, observing run data the SFTs are generated from, or \
495 (in the case of mock data challenge data) the observing \
496 run on which the data is most closely based",
497)
498makesfts_group.add_argument(
499 "-K",
500 "--observing-kind",
501 type=str,
502 choices=["RUN", "AUX", "SIM", "DEV"],
503 help='For public SFTs, one of: "RUN" for production SFTs of h(t) channels; \
504 "AUX" for SFTs of non-h(t) channels; \
505 "SIM" for mock data challenge or other simulated data; or \
506 "DEV" for development/testing purposes',
507)
508makesfts_group.add_argument(
509 "-R",
510 "--observing-revision",
511 type=int,
512 help="For public SFTs: revision number starts at 1, and should be incremented once \
513 SFTs have been widely distributed across clusters, advertised \
514 as being ready for use, etc. For example, if mistakes are found \
515 in the initial SFT production run after they have been published, \
516 regenerated SFTs should have a revision number of at least 2",
517)
518makesfts_group.add_argument(
519 "-X",
520 "--misc-desc",
521 type=str,
522 help="For private SFTs, miscellaneous part of the SFT \
523 description field in the filename",
524)
525makesfts_group.add_argument(
526 "-k",
527 "--filter-knee-freq",
528 required=True,
529 type=float,
530 help="high pass filter knee frequency used on time domain \
531 data before generating SFTs",
532)
533makesfts_group.add_argument(
534 "-T",
535 "--time-baseline",
536 required=True,
537 type=int,
538 help="time baseline of SFTs (e.g., 60 or 1800 seconds)",
539)
540makesfts_group.add_argument(
541 "-p",
542 "--output-sft-path",
543 nargs="+",
544 type=Path,
545 help="Path where to save the SFT files. Can specify multiple options, \
546 If specifying multiple options then it is required to specify the \
547 same number of output-sft-path options as the number of channels. \
548 The first listed channel will have the SFTs go into the first \
549 listed output-sft-path. Otherwise specify only one output path. \
550 If one path is specified and more than 1 channels are specified \
551 then --observing-run must be >= 1 and --observing-kind and \
552 --observing-revision must be set",
553)
554makesfts_group.add_argument(
555 "-C",
556 "--cache-path",
557 type=Path,
558 default="cache",
559 help="path to cache files that will be produced by \
560 gw_data_find (default is $PWD/cache; this directory is \
561 created if it does not exist and must agree with that \
562 given in .sub files)",
563)
564makesfts_group.add_argument(
565 "-N",
566 "--channel-name",
567 nargs="+",
568 type=str,
569 help="Name of input time-domain channel to read from frames. \
570 Can specify multiple options. The number of channels must be \
571 equal to the number of output-sft-path options given. The \
572 first listed channel will have the SFTs go to the first listed \
573 output-sft-path. Can only specify one channel when generating \
574 private SFTs (--observing-run=0)",
575)
576makesfts_group.add_argument(
577 "-c", "--comment-field", type=str, help="comment for SFT header"
578)
579makesfts_group.add_argument(
580 "-F", "--start-freq", type=int, default=10, help="start frequency of the SFTs"
581)
582makesfts_group.add_argument(
583 "-B", "--band", type=int, default=1990, help="frequency band of the SFTs"
584)
585makesfts_group.add_argument(
586 "-w",
587 "--window-type",
588 type=str,
589 default="tukey:0.001",
590 help='type of windowing of time-domain to do \
591 before generating SFTs, e.g. "rectangular", \
592 "hann", "tukey:<beta in [0,1], required>"; \
593 (default is "tukey:0.001", standard choice for LVK production SFTs)',
594)
595makesfts_group.add_argument(
596 "-P",
597 "--overlap-fraction",
598 type=float,
599 default=0,
600 help="overlap fraction (for use with windows; e.g., use \
601 --overlap-fraction 0.5 with --window-type hann windows)",
602)
603makesfts_group.add_argument(
604 "--allow-skipping",
605 action="store_true",
606 help="allow channels to be skipped if not in frames or too low sampling \
607 frequency",
608)
609makesfts_group.add_argument(
610 "--no-validate",
611 dest="validate",
612 action="store_false",
613 help="do not validate created SFTs",
614)
615
616##### DEPRECATED OPTIONS #####
617class DeprecateAction(argparse.Action):
618 def __call__(self, parser, namespace, values, option_string=None):
619 parser.error(
620 f"Argument {self.option_strings} has been deprecated in lalpulsar_MakeSFTs"
621 )
622
623
624deprecated_group.add_argument(
625 "-u",
626 "--frame-struct-type",
627 nargs=0,
628 action=DeprecateAction,
629 help="DEPRECATED. No longer required; \
630 the frame channel type is determined automatically",
631)
632deprecated_group.add_argument(
633 "-H",
634 "--use-hoft",
635 nargs=0,
636 action=DeprecateAction,
637 help="DEPRECATED. No longer required; \
638 the frame channel type is determined automatically",
639)
640deprecated_group.add_argument(
641 "-i",
642 "--ifo",
643 nargs=0,
644 action=DeprecateAction,
645 help="DEPRECATED. No longer required; \
646 the detector prefix is deduced from the channel name",
647)
648deprecated_group.add_argument(
649 "-D",
650 "--make-gps-dirs",
651 nargs=0,
652 action=DeprecateAction,
653 help="DEPRECATED. No longer supported",
654)
655deprecated_group.add_argument(
656 "-Z",
657 "--make-tmp-file",
658 nargs=0,
659 action=DeprecateAction,
660 help="DEPRECATED. Default behaviour",
661)
662deprecated_group.add_argument(
663 "-v",
664 "--sft-version",
665 nargs=0,
666 action=DeprecateAction,
667 help="DEPRECATED. No longer supported",
668)
669deprecated_group.add_argument(
670 "-S",
671 "--use-single",
672 nargs=0,
673 action=DeprecateAction,
674 help="DEPRECATED. No longer supported",
675)
676deprecated_group.add_argument(
677 "-q",
678 "--list-of-nodes",
679 type=str,
680 action=DeprecateAction,
681 help="DEPCRECATED. No longer supported",
682)
683deprecated_group.add_argument(
684 "-Q",
685 "--node-path",
686 type=Path,
687 action=DeprecateAction,
688 help="DEPCRECATED. No longer supported",
689)
690deprecated_group.add_argument(
691 "-r",
692 "--output-jobs-per-node",
693 type=int,
694 default=0,
695 action=DeprecateAction,
696 help="DEPRECATED. No longer supported",
697)
698
699args = parser.parse_args()
700
701# Some basic argument value checking
702if args.observing_run < 0:
703 raise parser.error("--observing-run must be >= 0")
704
705if args.observing_run > 0 and not args.observing_kind:
706 raise parser.error("--observing-run requires --observing-kind")
707
708if args.observing_run > 0 and not args.observing_revision:
709 raise parser.error("--observing-run requires --observing-revision")
710
711if args.observing_revision and args.observing_revision <= 0:
712 raise parser.error("--observing-revision must be > 0")
713
714if args.observing_run > 0 and args.misc_desc:
715 raise parser.error(
716 f"--observing-run={args.observing_run} incompatible with --misc-desc"
717 )
718
719if args.misc_desc and not re.compile(r"^[A-Za-z0-9]+$").match(args.misc_desc):
720 raise parser.error("--misc-desc may only contain A-Z, a-z, 0-9 characters")
721
722if args.extra_datafind_time < 0:
723 raise parser.error("--extra-datafind-time must be >= 0")
724
725if args.filter_knee_freq < 0:
726 raise parser.error("--filter-knee-freq must be >= 0")
727
728if args.time_baseline <= 0:
729 raise parser.error("--time-baseline must be > 0")
730
731if args.overlap_fraction < 0.0 or args.overlap_fraction >= 1.0:
732 raise parser.error("--overlap-fraction must be in the range [0,1)")
733
734if args.start_freq < 0.0 or args.start_freq >= 7192.0:
735 raise parser.error("--start-freq must be in the range [0,7192)")
736
737if args.band <= 0 or args.band >= 8192.0:
738 raise parser.error("--band must be in the range (0,8192)")
739
740if args.start_freq + args.band >= 8192.0:
741 raise parser.error("--start-freq + --band must be < 8192")
742
743if args.max_num_per_node <= 0:
744 raise parser.error("--max-num-per-node must be > 0")
745
746if (
747 len(args.channel_name) != len(args.output_sft_path)
748 and len(args.output_sft_path) != 1
749):
750 raise parser.error(
751 "--channel-name and --output-sft-path must be the "
752 "same length or --output-sft-path must be length of 1"
753 )
754
755if len(args.channel_name) > 1 and args.observing_run == 0:
756 raise parser.error(
757 "When creating SFTs from multiple channels, public SFT naming "
758 "convention must be used: --observing-run > 0 and set "
759 "--observing-kind and --observing-revision"
760 )
761
762if args.datafind_urltype == "osdf" and not args.transfer_frame_files:
763 raise parser.error(
764 "--transfer-frame-files must be specified when --datafind-urltype=osdf"
765 )
766
767# Set executables for lalpulsar_MakeSFTs, and lalpulsar_MoveSFTs
768makeSFTsExe = "lalpulsar_MakeSFTs"
769if args.makesfts_path:
770 if args.makesfts_path.is_file():
771 makeSFTsExe = args.makesfts_path
772 else:
773 makeSFTsExe = args.makesfts_path / makeSFTsExe
774elif "MAKESFTS_PATH" in os.environ:
775 makeSFTsExe = Path("$ENV(MAKESFTS_PATH)") / makeSFTsExe
776else:
777 makeSFTsExe = Path("@LALSUITE_BINDIR@") / makeSFTsExe
778
779moveSFTsExe = "lalpulsar_MoveSFTs"
780if args.movesfts_path:
781 if args.movesfts_path.is_file():
782 moveSFTsExe = args.movesfts_path
783 else:
784 moveSFTsExe = args.movesfts_path / moveSFTsExe
785elif "MOVESFTS_PATH" in os.environ:
786 moveSFTsExe = Path("$ENV(MOVESFTS_PATH)") / moveSFTsExe
787else:
788 moveSFTsExe = Path("@LALSUITE_BINDIR@") / moveSFTsExe
789
790# make directories to store the cache files, job logs, and SFTs
791args.log_path.mkdir(exist_ok=True)
792args.cache_path.mkdir(exist_ok=True)
793for p in args.output_sft_path:
794 p.mkdir(exist_ok=True)
795
796# Check if segment file was given, else set up one segment from the command line
797segList = SegmentList()
798adjustSegExtraTime = False
799if args.segment_file is not None:
800 if args.min_seg_length < 0:
801 raise parser.error("--min-seg-length must be >= 0")
802
803 # the next flag causes extra time that cannot be processes to be trimmed
804 # from the start and end of a segment
805 adjustSegExtraTime = True
806
807 with open(args.segment_file) as fp_segfile:
808 for idx, line in enumerate(fp_segfile):
809 splitLine = line.split()
810 oneSeg = Segment(int(splitLine[0]), int(splitLine[1]))
811 if abs(oneSeg) >= args.min_seg_length:
812 segList.append(oneSeg)
813
814 if len(segList) < 1:
815 raise ValueError(f"No segments found in segment file: {args.segment_file}")
816else:
817 if args.analysis_start_time is None:
818 raise parser.error(
819 "--analysis-start-time must be specified if no segment file is " "given"
820 )
821
822 if args.analysis_end_time is None:
823 raise parser.error(
824 "--analysis-start-time must be specified if no segment file is " "given"
825 )
826
827 if args.max_length_all_jobs is None:
828 raise parser.error(
829 "--max-length-all-jobs must be specified if no segment file is " "given"
830 )
831
832 # Make sure not to exceed maximum allow analysis
833 if args.analysis_end_time > (args.analysis_start_time + args.max_length_all_jobs):
834 args.analysis_end_time = args.analysis_start_time + args.max_length_all_jobs
835
836 oneSeg = Segment(args.analysis_start_time, args.analysis_end_time)
837 segList.append(oneSeg)
838# END if (args.segment_file != None)
839segList.coalesce()
840
841# Get the IFO site, which is the first letter of the channel name.
842site = args.channel_name[0][0]
843
844# Get the frame file URL list
845urls = get_urls(args)
846
847# Basic check that the frame file url list are traditionally visible on EPs
848if not args.transfer_frame_files:
849 for f in urls:
850 if "/home" in f:
851 raise parser.error(
852 "--transfer-frame-files must be specified when frame files are in /home"
853 )
854
855# data segments created from the list of frame URLs
856dataSegs = SegmentList()
857for url in urls:
858 dataSegs.append(file_segment(url))
859dataSegs.coalesce()
860
861# intersection of segList with dataSegs
862segList &= dataSegs
863segList.coalesce() # just in case
864
865# initialize count of nodes
866nodeCount = 0
867
868# Create .sub files
869path_to_dag_file = args.dag_file.parent
870dag_filename = args.dag_file.name
871makesfts_sub = path_to_dag_file / "MakeSFTs.sub"
872movesfts_sub = path_to_dag_file / "MoveSFTs.sub"
873
874# create MakeSFTs.sub
875with open(makesfts_sub, "w") as MakeSFTsFID:
876 MakeSFTsLogFile = f"{args.log_path}/MakeSFTs_{dag_filename}.log"
877 MakeSFTsFID.write("universe = vanilla\n")
878 MakeSFTsFID.write(f"executable = {makeSFTsExe}\n")
879 MakeSFTsFID.write("arguments = $(argList)\n")
880 MakeSFTsFID.write(f"accounting_group = {args.accounting_group}\n")
881 MakeSFTsFID.write(f"accounting_group_user = {args.accounting_group_user}\n")
882 MakeSFTsFID.write(f"log = {MakeSFTsLogFile}\n")
883 MakeSFTsFID.write(f"error = {args.log_path}/MakeSFTs_$(tagstring).err\n")
884 MakeSFTsFID.write(f"output = {args.log_path}/MakeSFTs_$(tagstring).out\n")
885 MakeSFTsFID.write("notification = never\n")
886 MakeSFTsFID.write(f"request_memory = {args.request_memory}MB\n")
887 MakeSFTsFID.write(f"request_disk = {args.request_disk}MB\n")
888 MakeSFTsFID.write("RequestCpus = 1\n")
889 MakeSFTsFID.write("should_transfer_files = yes\n")
890 if args.transfer_frame_files:
891 MakeSFTsFID.write("transfer_input_files = $(cachefile),$(framefiles)\n")
892 else:
893 MakeSFTsFID.write("transfer_input_files = $(cachefile)\n")
894 if "MAKESFTS_PATH" in os.environ and not args.makesfts_path:
895 MakeSFTsFID.write("getenv = MAKESFTS_PATH\n")
896 if args.datafind_urltype == "osdf":
897 MakeSFTsFID.write("use_oauth_services = scitokens\n")
898 MakeSFTsFID.write(
899 "environment = BEARER_TOKEN_FILE=$$(CondorScratchDir)/.condor_creds/scitokens.use\n"
900 )
901 MakeSFTsFID.write("queue 1\n")
902
903# create MoveSFTs.sub
904with open(movesfts_sub, "w") as MoveSFTsFID:
905 MoveSFTsLogFile = f"{args.log_path}/MoveSFTs_{dag_filename}.log"
906 MoveSFTsFID.write("universe = local\n")
907 MoveSFTsFID.write(f"executable = {moveSFTsExe}\n")
908 MoveSFTsFID.write("arguments = ")
909 if not args.validate:
910 MoveSFTsFID.write("$(opts) ")
911 MoveSFTsFID.write("-s $(sourcedirectory) -c $(channels) -d $(destdirectory)\n")
912 MoveSFTsFID.write(f"accounting_group = {args.accounting_group}\n")
913 MoveSFTsFID.write(f"accounting_group_user = {args.accounting_group_user}\n")
914 MoveSFTsFID.write(f"log = {MoveSFTsLogFile}\n")
915 MoveSFTsFID.write(f"error = {args.log_path}/MoveSFTs.err\n")
916 MoveSFTsFID.write(f"output = {args.log_path}/MoveSFTs.out\n")
917 MoveSFTsFID.write("notification = never\n")
918 MoveSFTsFID.write(f"request_memory = 1GB\n")
919 MoveSFTsFID.write(f"request_disk = 10MB\n")
920 MoveSFTsFID.write("RequestCpus = 1\n")
921 if "MOVESFTS_PATH" in os.environ and not args.movesfts_path:
922 MoveSFTsFID.write("getenv = MOVESFTS_PATH\n")
923 MoveSFTsFID.write("queue 1\n")
924
925# create the DAG file with the jobs to run
926with open(args.dag_file, "w") as dagFID:
927 startTimeAllNodes = None
928 firstSFTstartTime = 0 # need this for the synchronized start option
929
930 # Loop over the segment list to generate the SFTs for each segment
931 for seg in segList:
932 # Each segment in the segList runs on one or more nodes;
933 # initialize the number SFTs produced by the current node:
934 numThisNode = 0
935 numThisSeg = 0
936
937 # Case 1: a segment file was given but the SFTs do not need their
938 # start times to be synchronized
939 if adjustSegExtraTime and not args.synchronize_start:
940 segStartTime = seg[0]
941 segEndTime = seg[1]
942
943 # First we figure out how much extra time is in the segment so that
944 # SFTs are fit within the segment:
945 # |..<SFT><SFT><SFT>..|
946 # where the .. represent the extra time in the segment
947 # The amount of extra time in a segment is given as the remainder
948 # of (total segment time) / (SFT time baseline)
949 segExtraTime = (segEndTime - segStartTime) % args.time_baseline
950
951 # If there is overlap of SFTs requested, then we compute the extra
952 # time as:
953 # the remainder of (end - start - Tsft) / (non-overlap time)
954 # provided there was at least one SFT that is in the segment
955 if args.overlap_fraction != 0.0:
956 if (segEndTime - segStartTime) > args.time_baseline:
957 segExtraTime = (
958 segEndTime - segStartTime - args.time_baseline
959 ) % int((1.0 - args.overlap_fraction) * args.time_baseline)
960
961 # We'll add half the extra time to the start of the SFTs to be
962 # created in this segment and half at the end
963 segExtraStart = int(segExtraTime / 2)
964 segExtraEnd = segExtraTime - segExtraStart
965 args.analysis_start_time = segStartTime + segExtraStart
966
967 # This shift may have pushed past the end time of the segment. In
968 # that case, just fix the start time to the end time of the segment
969 if args.analysis_start_time > segEndTime:
970 args.analysis_start_time = segEndTime
971
972 # shifting the end time by the other portion of the extra time
973 # amount ...
974 args.analysis_end_time = segEndTime - segExtraEnd
975
976 # Again, this shift could have pushed the end time beyond the start
977 # of the segment, so just fix the end time to the segment start
978 if args.analysis_end_time < segStartTime:
979 args.analysis_end_time = segStartTime
980
981 # Case 2: SFTs need a synchronized start. This is a special case for
982 # methods like TwoSpect, where signal periodicity spacing must be
983 # maintained
984 elif args.synchronize_start:
985 segStartTime = seg[0]
986 segEndTime = seg[1]
987
988 # If we haven't set the first SFT start time, then set it equal to
989 # the start time of the first segment
990 if firstSFTstartTime == 0:
991 firstSFTstartTime = segStartTime
992
993 # This is a tricky bit of math to set the start time based on when
994 # the first SFT start time of all the segments
995 args.analysis_start_time = (
996 int(
997 round(
998 math.ceil(
999 (segStartTime - firstSFTstartTime)
1000 / ((1.0 - args.overlap_fraction) * args.time_baseline)
1001 )
1002 * (1.0 - args.overlap_fraction)
1003 * args.time_baseline
1004 )
1005 )
1006 + firstSFTstartTime
1007 )
1008
1009 # This shift may have pushed past the end time of the segment. In
1010 # that case, just fix the start time to the end time of the segment
1011 if args.analysis_start_time > segEndTime:
1012 args.analysis_start_time = segEndTime
1013
1014 # This is a tricky bit of math to set the end time based on when
1015 # the first SFT start time of all the segments
1016 args.analysis_end_time = (
1017 int(
1018 round(
1019 math.floor(
1020 (segEndTime - args.analysis_start_time - args.time_baseline)
1021 / ((1.0 - args.overlap_fraction) * args.time_baseline)
1022 )
1023 * (1.0 - args.overlap_fraction)
1024 * args.time_baseline
1025 )
1026 )
1027 + args.time_baseline
1028 + args.analysis_start_time
1029 )
1030
1031 # Again, this shift could have pushed the end time beyond the start
1032 # of the segment, so just fix the end time to the segment start
1033 if args.analysis_end_time < segStartTime:
1034 args.analysis_end_time = segStartTime
1035
1036 # If no segment file given and no synchronized starts, just set the
1037 # start time and end time to the segment start and end
1038 else:
1039 args.analysis_start_time = seg[0]
1040 args.analysis_end_time = seg[1]
1041
1042 # Loop through the analysis time; make sure no more than
1043 # args.max_num_per_node SFTs are produced by any one node
1044 startTimeThisNode = args.analysis_start_time
1045 endTimeThisNode = args.analysis_start_time
1046 endTimeAllNodes = args.analysis_start_time
1047 while endTimeAllNodes < args.analysis_end_time:
1048 # increment endTimeAllNodes by the args.time_baseline until we get
1049 # past the args.analysis_end_time
1050 if args.overlap_fraction != 0.0:
1051 # handle overlap
1052 if numThisSeg == 0:
1053 endTimeAllNodes = endTimeAllNodes + args.time_baseline
1054 else:
1055 endTimeAllNodes = endTimeAllNodes + int(
1056 (1.0 - args.overlap_fraction) * args.time_baseline
1057 )
1058 else:
1059 # default case, no overlap
1060 endTimeAllNodes = endTimeAllNodes + args.time_baseline
1061 if endTimeAllNodes <= args.analysis_end_time:
1062 # increment the number of SFTs output from this node, and
1063 # update the end time this node.
1064 numThisNode = numThisNode + 1
1065 numThisSeg = numThisSeg + 1
1066 endTimeThisNode = endTimeAllNodes
1067 if numThisNode < args.max_num_per_node:
1068 continue
1069 else:
1070 # write jobs to dag for this node
1071 nodeCount = nodeCount + 1
1072
1073 if nodeCount == 1:
1074 startTimeAllNodes = startTimeThisNode
1075 writeToDag(
1076 dagFID,
1077 nodeCount,
1078 startTimeThisNode,
1079 endTimeThisNode,
1080 urls,
1081 args,
1082 )
1083 # Update for next node
1084 numThisNode = 0
1085 if args.overlap_fraction != 0.0:
1086 # handle overlap
1087 startTimeThisNode = endTimeThisNode - int(
1088 (args.overlap_fraction) * args.time_baseline
1089 )
1090 else:
1091 # default case, no overlap
1092 startTimeThisNode = endTimeThisNode
1093 else:
1094 # we are at or past the args.analysis_end_time; output job for last
1095 # node if needed.
1096 if numThisNode > 0:
1097 # write jobs to dag for this node
1098 nodeCount = nodeCount + 1
1099
1100 if nodeCount == 1:
1101 startTimeAllNodes = startTimeThisNode
1102 writeToDag(
1103 dagFID, nodeCount, startTimeThisNode, endTimeThisNode, urls, args
1104 )
1105 # END while (endTimeAllNodes < args.analysis_end_time)
1106 # END for seg in segList
1107
1108 # Write the move SFTs job to the DAG
1109 # Move SFTs
1110 dagFID.write(f"JOB MoveSFTs {Path(dagFID.name).parent / 'MoveSFTs.sub'}\n")
1111 dagFID.write(f"RETRY MoveSFTs 1\n")
1112 dagFID.write(f"VARS MoveSFTs ")
1113 if not args.validate:
1114 dagFID.write('opts="--no-validate" ')
1115 dagFID.write(
1116 f'sourcedirectory="." '
1117 f"channels=\"{' '.join(args.channel_name)}\" "
1118 f"destdirectory=\"{' '.join([str(p) for p in args.output_sft_path])}\"\n"
1119 )
1120 dagFID.write(
1121 f"PARENT {' '.join([f'MakeSFTs_{n}' for n in range(1, nodeCount+1)])} CHILD MoveSFTs\n"
1122 )
1123
1124# Close the DAG file
1125
1126# Update actual end time of the last job and print out the times all jobs will run on:
1127endTimeAllNodes = endTimeThisNode
1128
1129if startTimeAllNodes is None:
1130 raise Exception("The startTimeAllNodes == none; the DAG file contains no jobs!")
1131
1132if endTimeAllNodes <= startTimeAllNodes:
1133 raise Exception(
1134 "The endTimeAllNodes <= startTimeAllNodes; the DAG file contains no jobs!"
1135 )
1136
1137print(startTimeAllNodes, endTimeAllNodes)
def __call__(self, parser, namespace, values, option_string=None)
def make_cache(urls, job_seg)
Make a frame list and cache list from a list of URLs.
def sft_name_from_vars(obs, gpsstart, Tsft, channel=None, kind=None, rev=None, window="unknown", par=None, miscstr=None)
Create SFT file name from specification.
def writeToDag(dagFID, nodeCount, startTimeThisNode, endTimeThisNode, urls, args)
Write one job to DAG file.
def get_urls(args)
Get frame file URL list from gwdatafind or cache file.