Loading [MathJax]/extensions/TeX/AMSsymbols.js
LALPulsar 7.1.1.1-5c3393d
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Modules Pages
lalpulsar_MakeSFTDAG.py
Go to the documentation of this file.
1##python
2# Copyright (C) 2013, 2014, 2020--2024 Evan Goetz
3# Copyright (C) 2011, 2021, 2022 Karl Wette
4# Copyright (C) 2005, 2007 Gregory Mendell
5#
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with with program; see the file COPYING. If not, write to the
18# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
19# MA 02110-1301 USA
20
21## \file
22## \ingroup lalpulsar_bin_SFTTools
23"""Creates DAGs to run jobs that generates SFTs"""
24
25import math
26import argparse
27import os
28import re
29from pathlib import Path
30from urllib.parse import urlparse
31
32from gwdatafind import find_urls
33from gwdatafind.utils import filename_metadata, file_segment
34
35from igwn_segments import segment, segmentlist
36
37from lalpulsar import (
38 git_version,
39 SFTFilenameSpec,
40 FillSFTFilenameSpecStrings,
41 BuildSFTFilenameFromSpec,
42)
43
44__author__ = "Evan Goetz <evan.goetz@ligo.org>, Greg Mendell"
45__version__ = git_version.id
46__date__ = git_version.date
47
48cache_re = re.compile(r"^([A-Z])(\s+)(\w+)(\s+)(\d+)(\s+)(\d+)(\s+)(.+gwf)")
49
50
51# REVISIONS:
52# 12/02/05 gam; generate datafind.sub and MakeSFTs.sub as well as dag file in
53# PWD, with log files based subLogPath and dag filename.
54# 12/28/05 gam; Add option --make-gps-dirs, -D <num>, to make directory based
55# on this many GPS digits.
56# 12/28/05 gam; Add option --misc-desc, -X <string> giving misc. part of the
57# SFT description field in the filename.
58# 12/28/05 gam; Add options --start-freq -F and --band -B options to enter
59# these.
60# 12/28/05 gam; Add in --window-type, -w options; 0 = no window, 1 = default =
61# Matlab style Tukey window; 2 = make_sfts.c Tukey window; 3 =
62# Hann window.
63# 12/28/05 gam; Add option --overlap-fraction -P (for use with windows; e.g.,
64# use -P 0.5 with -w 3 Hann windows; default is 0.0)
65# 12/28/05 gam; Add --sft-version, -v option to select output SFT version (1 =
66# default is version 1 SFTs; 2 = version 2 SFTs.
67# 12/28/05 gam; Add --comment-field, -c option, for comment for version 2 SFTs.
68# 12/28/05 gam; Remove sample rate option
69# 01/09/06 gam; Add -Z option; write SFT to .*.tmp file, then move to final
70# file name.
71# 01/14/07 gam; Add -u option to specify frame struct and type; add -i option
72# to specify IFO name.
73# 07/24/07 gam; Add in -q option to read in list of nodes on which to output
74# SFTs, -Q option to give node path, and -R option for number of
75# jobs per node.
76# 04/XX/13 eag; Add -y option to synchronize the start times of SFTs.
77# 07/24/14 eag; Change default to version 2 SFTs
78# 12/2020 eag; Update script to conform to modern python3 and pep8
79# 10/2020 kww; Pass args directly to writeToDag(), use Python f-strings
80# 10/2022 kww; Deprecate options that have been removed from MakeSFTs
81# 10/2022 kww; Parse window type as a string, parameter separated by colon
82# 10/2022 kww; Merge -O and -o log path options to free up -O option
83# 10/2022 kww; Implement public SFT file naming convention
84# 11/2022 kww; -R command line option now used for --observing-revision
85# instead of --output-jobs-per-node, which now uses -r
86# 11/2022 kww; --datafind-path and --makesfts-path accept executable names
87# 03/2023 eag; Allow user to pass a frame cache file --cache-file
88# 04/2023 kww; Improve documentation of --window-type argument
89# 05/2023 eag; Add the --gaps flag to gw_data_find
90# 08/2023 eag; Allow for multiple channel names to be provided
91# 09/2023 eag; Modify use of environment variables
92# 01/2024 eag; Allow skipping of channels if not in frames or too low data
93# rate
94# 10/2024 eag; Modify workflow for version 3 SFTs and HTCondor file transfer
95# workflow
96# 12/2024 eag; Modify workflow to use lalpulsar_MoveSFTs script instead of
97# remapping files
98# 03/2025 eag; Rewrite to generate cache files running lalpulsar_MakeSFTDAG
99# enabling frame files on /home or OSDF
100
101
103 obs,
104 gpsstart,
105 Tsft,
106 channel=None,
107 kind=None,
108 rev=None,
109 window="unknown",
110 par=None,
111 miscstr=None,
112):
113 """Create SFT file name from specification"""
114
115 spec = SFTFilenameSpec()
116
117 FillSFTFilenameSpecStrings(
118 spec=spec,
119 path=None,
120 extn=None,
121 detector=channel[:2],
122 window_type=window,
123 privMisc=miscstr,
124 pubObsKind=kind,
125 pubChannel=channel,
126 )
127 spec.pubObsRun = obs or 0
128 spec.pubRevision = rev or 0
129 spec.window_param = par or 0
130 spec.numSFTs = 1 # MakeSFTDAG will only ever generate 1 SFT per file
131 spec.SFTtimebase = Tsft
132 spec.gpsStart = gpsstart
133 spec.SFTspan = Tsft # MakeSFTDAG will only ever generate 1 SFT per file
134
135 return BuildSFTFilenameFromSpec(spec)
136
137
138def get_urls(args):
139 """Get frame file URL list from gwdatafind or cache file"""
140
141 if not args.cache_file:
142 urls = find_urls(
143 site,
144 args.input_data_type,
145 segList[0][0],
146 segList[-1][-1],
147 match=args.datafind_match,
148 urltype=args.datafind_urltype,
149 )
150 else:
151 urls = []
152 with open(args.cache_file, "r") as f:
153 for line in f:
154 m = cache_re.match(line)
155 if m:
156 framefile = m.group(9)
157 urls.append(framefile)
158
159 return urls
160
161
162def make_cache(
163 urls,
164 job_seg,
165):
166 """Make a frame list and cache list from a list of URLs"""
167
168 cache = [] # list of lines for the cache file
169 frames = [] # list of frame filenames used in the job
170 for idx, url in enumerate(urls):
171 obs, desc, dataseg = filename_metadata(url)
172 dataseg = segment(dataseg)
173 if dataseg.disjoint(job_seg) < 0:
174 continue
175 if dataseg.disjoint(job_seg) > 0:
176 break
177 if dataseg.intersects(job_seg):
178 framefileurl = urlparse(url)
179 framefilepath = Path(framefileurl.path)
180
181 # list in cache file if files not visible on execute node
182 # otherwise use file url
183 if "/home" in str(framefilepath.parent) or "osdf" in framefileurl.scheme:
184 newcache = (
185 f"{obs}\t{desc}\t{dataseg[0]}\t{abs(dataseg)}\t{framefilepath.name}"
186 )
187 else:
188 newcache = f"{obs}\t{desc}\t{dataseg[0]}\t{abs(dataseg)}\t{url}"
189 cache.append(newcache)
190
191 if "/home" in str(framefilepath.parent):
192 frames.append(framefilepath)
193 else:
194 frames.append(url)
195
196 return frames, cache
197
198
199def writeToDag(dagFID, nodeCount, startTimeThisNode, endTimeThisNode, urls, args):
200 """Write one job to DAG file"""
201
202 MakeSFTs = f"MakeSFTs_{nodeCount}"
203 tagStringOut = f"{args.tag_string}_{nodeCount}"
204
205 job_segment = segment(
206 startTimeThisNode - args.extra_datafind_time,
207 endTimeThisNode + args.extra_datafind_time,
208 )
209
210 frames, cache = make_cache(urls, job_segment)
211
212 obs, desc, dataseg = filename_metadata(urls[0])
213 cacheFile = args.cache_path / f"{obs}-{job_segment[0]}-{job_segment[1]}.cache"
214 with open(cacheFile, "w") as f:
215 for l in cache:
216 f.write(f"{l}\n")
217
218 argList = []
219 argList.append(f"-O {args.observing_run}")
220 if args.observing_run > 0:
221 argList.append(f"-K {args.observing_kind}")
222 argList.append(f"-R {args.observing_revision}")
223 elif args.misc_desc:
224 argList.append(f"-X {args.misc_desc}")
225 argList.append(f"-f {args.filter_knee_freq}")
226 argList.append(f"-t {args.time_baseline}")
227 # To work with the condor file transfer protocol, we save everything to the
228 # scratch directory because the files will have unique names. Since
229 # lalpulsar_MakeSFTs wants to have a path to save the file to, we provide .
230 argList.append(f"-p {','.join(['.' for p in args.output_sft_path])}")
231 # To work with the condor file transfer protocol, the cache file is saved
232 # to the scratch directory on transfer so we just need the name, not the
233 # full path
234 argList.append(f"-C {cacheFile.name}")
235 argList.append(f"-s {startTimeThisNode}")
236 argList.append(f"-e {endTimeThisNode}")
237 argList.append(f"-N {','.join(args.channel_name)}")
238 argList.append(f"-F {args.start_freq}")
239 argList.append(f"-B {args.band}")
240 if args.comment_field:
241 argList.append(f"-c {args.comment_field}")
242 if ":" in args.window_type:
243 window_type, window_param = args.window_type.split(":")
244 window_param = float(window_param)
245 argList.append(f"-w {window_type} -r {window_param}")
246 else:
247 window_type = args.window_type
248 window_param = None
249 argList.append(f"-w {window_type}")
250 if args.overlap_fraction:
251 argList.append(f"-P {args.overlap_fraction}")
252 if args.allow_skipping:
253 argList.append("--allow-skipping TRUE")
254 argStr = " ".join(argList)
255
256 # The files are going to go to specific directories, so we need to map
257 # the files to their output directories
258 outputfiles = []
259 remap = []
260 sft_start = startTimeThisNode
261 sft_end = sft_start + args.time_baseline
262 # loop over start times
263 while sft_end <= endTimeThisNode:
264 # loop over channels
265 for idx, c in enumerate(args.channel_name):
266 filename = sft_name_from_vars(
267 args.observing_run,
268 sft_start,
269 args.time_baseline,
270 c,
271 kind=args.observing_kind,
272 rev=args.observing_revision,
273 window=window_type,
274 par=window_param,
275 miscstr=args.misc_desc,
276 )
277 outputfiles.append(filename)
278 remap.append(f"{filename}={args.output_sft_path[idx]/filename}")
279
280 # update start and end times
281 if args.overlap_fraction:
282 sft_start += int(round((1 - args.overlap_fraction) * args.time_baseline))
283 else:
284 sft_start += args.time_baseline
285 sft_end = sft_start + args.time_baseline
286
287 # MakeSFT job
288 dagFID.write(f"JOB {MakeSFTs} {Path(dagFID.name).parent / 'MakeSFTs.sub'}\n")
289 dagFID.write(f"RETRY {MakeSFTs} 1\n")
290 dagFID.write(f'VARS {MakeSFTs} argList="{argStr}" cachefile="{cacheFile}" ')
291 if args.transfer_frame_files:
292 framefiles = ",".join([str(fr) for fr in frames])
293 dagFID.write(f'framefiles="{framefiles}" ')
294 dagFID.write(f'tagstring="{tagStringOut}"\n')
295
296
297#
298# MAIN CODE START HERE
299#
300
301parser = argparse.ArgumentParser(
302 description="This script creates MakeSFTs.sub, MoveSFTs.sub, and a dag \
303 file that generates SFTs based on the options given.",
304 fromfile_prefix_chars="@",
305)
306
307dag_group = parser.add_argument_group(
308 "DAG organization", "Options for workflow control"
309)
310datafind_group = parser.add_argument_group(
311 "Datafind", "Options for locating frame files"
312)
313makesfts_group = parser.add_argument_group(
314 "SFT creation", "Options for SFT creation and output"
315)
316deprecated_group = parser.add_argument_group("DEPRECATED")
317
318dag_group.add_argument(
319 "-f",
320 "--dag-file",
321 required=True,
322 type=Path,
323 help="filename for .dag file (should end in .dag)",
324)
325dag_group.add_argument(
326 "-G",
327 "--tag-string",
328 required=True,
329 type=str,
330 help="tag string used in names of various files unique to \
331 jobs that will run under the DAG",
332)
333dag_group.add_argument(
334 "-a",
335 "--analysis-start-time",
336 type=int,
337 help="GPS start time of data from which to generate \
338 SFTs (optional and unused if a segment file is given)",
339)
340dag_group.add_argument(
341 "-b",
342 "--analysis-end-time",
343 type=int,
344 help="GPS end time of data from which to generate SFTs \
345 (optional and unused if a segment file is given)",
346)
347dag_group.add_argument(
348 "-L",
349 "--max-length-all-jobs",
350 type=int,
351 help="maximum total amount of data to process, in seconds \
352 (optional and unused if a segment file is given)",
353)
354dag_group.add_argument(
355 "-g",
356 "--segment-file",
357 type=Path,
358 help="alternative file with segments to use, rather than \
359 the input times",
360)
361dag_group.add_argument(
362 "-l",
363 "--min-seg-length",
364 type=int,
365 default=0,
366 help="minimum length segments to process in seconds (used \
367 only if a segment file is given)",
368)
369dag_group.add_argument(
370 "-y",
371 "--synchronize-start",
372 action="store_true",
373 help="synchronize the start times of the SFTs so that the \
374 start times are synchronized when there are gaps in the \
375 data",
376)
377dag_group.add_argument(
378 "-o",
379 "--log-path",
380 type=Path,
381 default="logs",
382 help="path to log, output, and error files (default \
383 is $PWD/logs; this directory is created if it does not \
384 exist and usually should be under a local file system)",
385)
386dag_group.add_argument(
387 "-m",
388 "--max-num-per-node",
389 type=int,
390 default=1,
391 help="maximum number of SFTs to generate on one node",
392)
393dag_group.add_argument(
394 "-J",
395 "--makesfts-path",
396 type=Path,
397 help="string specifying the lalpulsar_MakeSFTs executable, \
398 or a path to it; if not set, will use \
399 MAKESFTS_PATH env variable or system default (in that \
400 order)",
401)
402dag_group.add_argument(
403 "--movesfts-path",
404 type=Path,
405 help="string specifying the lalpulsar_MoveSFTs executable, \
406 or a path to it; if not set, will use \
407 MOVESFTS_PATH env variable or system default (in that \
408 order)",
409)
410dag_group.add_argument(
411 "-Y",
412 "--request-memory",
413 type=int,
414 default=4096,
415 help="memory allocation in MB to request from condor for \
416 lalpulsar_MakeSFTs step",
417)
418dag_group.add_argument(
419 "-s",
420 "--request-disk",
421 type=int,
422 default=4096,
423 help="disk space allocation in MB to request from condor \
424 for lalpulsar_MakeSFTs step",
425)
426dag_group.add_argument(
427 "-A",
428 "--accounting-group",
429 required=True,
430 type=str,
431 help="Condor tag for the production of SFTs",
432)
433dag_group.add_argument(
434 "-U",
435 "--accounting-group-user",
436 required=True,
437 type=str,
438 help="albert.einstein username (do not add @LIGO.ORG)",
439)
440dag_group.add_argument(
441 "-t",
442 "--transfer-frame-files",
443 action="store_true",
444 help="Transfer frame files via HTCondor file transfer system. \
445 This should be specified if frames are not visible to the \
446 compute node file system. Ex. this should be specified if \
447 frames are on /home or running on the open science grid. \
448 Usually frame files are visible on CIT, LHO, LLO clusters \
449 so that this does not need to be specified in that case.",
450)
451
452datafind_group.add_argument(
453 "-d",
454 "--input-data-type",
455 required=True,
456 type=str,
457 help="input data type for use with the gw_data_find --type \
458 option",
459)
460datafind_group.add_argument(
461 "-x",
462 "--extra-datafind-time",
463 type=int,
464 default=0,
465 help="extra time to subtract/add from/to start/end time \
466 arguments of gw_data_find",
467)
468datafind_group.add_argument(
469 "-M",
470 "--datafind-match",
471 type=str,
472 help="string to use with the gw_data_find --match option",
473)
474datafind_group.add_argument(
475 "--datafind-urltype",
476 type=str,
477 default="file",
478 choices=["file", "osdf"],
479 help="String for the gw_data_find --urltype option. \
480 Use 'file' if creating SFTs on a local LDG cluster. \
481 Use 'osdf' if creating SFTs on the open science grid",
482)
483datafind_group.add_argument(
484 "-e",
485 "--cache-file",
486 type=Path,
487 help="path and filename to frame cache file to use instead \
488 of gw_data_find",
489)
490
491makesfts_group.add_argument(
492 "-O",
493 "--observing-run",
494 required=True,
495 type=int,
496 help="For public SFTs, observing run data the SFTs are generated from, or \
497 (in the case of mock data challenge data) the observing \
498 run on which the data is most closely based",
499)
500makesfts_group.add_argument(
501 "-K",
502 "--observing-kind",
503 type=str,
504 choices=["RUN", "AUX", "SIM", "DEV"],
505 help='For public SFTs, one of: "RUN" for production SFTs of h(t) channels; \
506 "AUX" for SFTs of non-h(t) channels; \
507 "SIM" for mock data challenge or other simulated data; or \
508 "DEV" for development/testing purposes',
509)
510makesfts_group.add_argument(
511 "-R",
512 "--observing-revision",
513 type=int,
514 help="For public SFTs: revision number starts at 1, and should be incremented once \
515 SFTs have been widely distributed across clusters, advertised \
516 as being ready for use, etc. For example, if mistakes are found \
517 in the initial SFT production run after they have been published, \
518 regenerated SFTs should have a revision number of at least 2",
519)
520makesfts_group.add_argument(
521 "-X",
522 "--misc-desc",
523 type=str,
524 help="For private SFTs, miscellaneous part of the SFT \
525 description field in the filename",
526)
527makesfts_group.add_argument(
528 "-k",
529 "--filter-knee-freq",
530 required=True,
531 type=float,
532 help="high pass filter knee frequency used on time domain \
533 data before generating SFTs",
534)
535makesfts_group.add_argument(
536 "-T",
537 "--time-baseline",
538 required=True,
539 type=int,
540 help="time baseline of SFTs (e.g., 60 or 1800 seconds)",
541)
542makesfts_group.add_argument(
543 "-p",
544 "--output-sft-path",
545 nargs="+",
546 type=Path,
547 help="Path where to save the SFT files. Can specify multiple options, \
548 If specifying multiple options then it is required to specify the \
549 same number of output-sft-path options as the number of channels. \
550 The first listed channel will have the SFTs go into the first \
551 listed output-sft-path. Otherwise specify only one output path. \
552 If one path is specified and more than 1 channels are specified \
553 then --observing-run must be >= 1 and --observing-kind and \
554 --observing-revision must be set",
555)
556makesfts_group.add_argument(
557 "-C",
558 "--cache-path",
559 type=Path,
560 default="cache",
561 help="path to cache files that will be produced by \
562 gw_data_find (default is $PWD/cache; this directory is \
563 created if it does not exist and must agree with that \
564 given in .sub files)",
565)
566makesfts_group.add_argument(
567 "-N",
568 "--channel-name",
569 nargs="+",
570 type=str,
571 help="Name of input time-domain channel to read from frames. \
572 Can specify multiple options. The number of channels must be \
573 equal to the number of output-sft-path options given. The \
574 first listed channel will have the SFTs go to the first listed \
575 output-sft-path. Can only specify one channel when generating \
576 private SFTs (--observing-run=0)",
577)
578makesfts_group.add_argument(
579 "-c", "--comment-field", type=str, help="comment for SFT header"
580)
581makesfts_group.add_argument(
582 "-F", "--start-freq", type=int, default=10, help="start frequency of the SFTs"
583)
584makesfts_group.add_argument(
585 "-B", "--band", type=int, default=1990, help="frequency band of the SFTs"
586)
587makesfts_group.add_argument(
588 "-w",
589 "--window-type",
590 type=str,
591 default="tukey:0.001",
592 help='type of windowing of time-domain to do \
593 before generating SFTs, e.g. "rectangular", \
594 "hann", "tukey:<beta in [0,1], required>"; \
595 (default is "tukey:0.001", standard choice for LVK production SFTs)',
596)
597makesfts_group.add_argument(
598 "-P",
599 "--overlap-fraction",
600 type=float,
601 default=0,
602 help="overlap fraction (for use with windows; e.g., use \
603 --overlap-fraction 0.5 with --window-type hann windows)",
604)
605makesfts_group.add_argument(
606 "--allow-skipping",
607 action="store_true",
608 help="allow channels to be skipped if not in frames or too low sampling \
609 frequency",
610)
611makesfts_group.add_argument(
612 "--no-validate",
613 dest="validate",
614 action="store_false",
615 help="do not validate created SFTs",
616)
617
618##### DEPRECATED OPTIONS #####
619class DeprecateAction(argparse.Action):
620 def __call__(self, parser, namespace, values, option_string=None):
621 parser.error(
622 f"Argument {self.option_strings} has been deprecated in lalpulsar_MakeSFTs"
623 )
624
625
626deprecated_group.add_argument(
627 "-u",
628 "--frame-struct-type",
629 nargs=0,
630 action=DeprecateAction,
631 help="DEPRECATED. No longer required; \
632 the frame channel type is determined automatically",
633)
634deprecated_group.add_argument(
635 "-H",
636 "--use-hoft",
637 nargs=0,
638 action=DeprecateAction,
639 help="DEPRECATED. No longer required; \
640 the frame channel type is determined automatically",
641)
642deprecated_group.add_argument(
643 "-i",
644 "--ifo",
645 nargs=0,
646 action=DeprecateAction,
647 help="DEPRECATED. No longer required; \
648 the detector prefix is deduced from the channel name",
649)
650deprecated_group.add_argument(
651 "-D",
652 "--make-gps-dirs",
653 nargs=0,
654 action=DeprecateAction,
655 help="DEPRECATED. No longer supported",
656)
657deprecated_group.add_argument(
658 "-Z",
659 "--make-tmp-file",
660 nargs=0,
661 action=DeprecateAction,
662 help="DEPRECATED. Default behaviour",
663)
664deprecated_group.add_argument(
665 "-v",
666 "--sft-version",
667 nargs=0,
668 action=DeprecateAction,
669 help="DEPRECATED. No longer supported",
670)
671deprecated_group.add_argument(
672 "-S",
673 "--use-single",
674 nargs=0,
675 action=DeprecateAction,
676 help="DEPRECATED. No longer supported",
677)
678deprecated_group.add_argument(
679 "-q",
680 "--list-of-nodes",
681 type=str,
682 action=DeprecateAction,
683 help="DEPCRECATED. No longer supported",
684)
685deprecated_group.add_argument(
686 "-Q",
687 "--node-path",
688 type=Path,
689 action=DeprecateAction,
690 help="DEPCRECATED. No longer supported",
691)
692deprecated_group.add_argument(
693 "-r",
694 "--output-jobs-per-node",
695 type=int,
696 default=0,
697 action=DeprecateAction,
698 help="DEPRECATED. No longer supported",
699)
700
701args = parser.parse_args()
702
703# Some basic argument value checking
704if args.observing_run < 0:
705 raise parser.error("--observing-run must be >= 0")
706
707if args.observing_run > 0 and not args.observing_kind:
708 raise parser.error("--observing-run requires --observing-kind")
709
710if args.observing_run > 0 and not args.observing_revision:
711 raise parser.error("--observing-run requires --observing-revision")
712
713if args.observing_revision and args.observing_revision <= 0:
714 raise parser.error("--observing-revision must be > 0")
715
716if args.observing_run > 0 and args.misc_desc:
717 raise parser.error(
718 f"--observing-run={args.observing_run} incompatible with --misc-desc"
719 )
720
721if args.misc_desc and not re.compile(r"^[A-Za-z0-9]+$").match(args.misc_desc):
722 raise parser.error("--misc-desc may only contain A-Z, a-z, 0-9 characters")
723
724if args.extra_datafind_time < 0:
725 raise parser.error("--extra-datafind-time must be >= 0")
726
727if args.filter_knee_freq < 0:
728 raise parser.error("--filter-knee-freq must be >= 0")
729
730if args.time_baseline <= 0:
731 raise parser.error("--time-baseline must be > 0")
732
733if args.overlap_fraction < 0.0 or args.overlap_fraction >= 1.0:
734 raise parser.error("--overlap-fraction must be in the range [0,1)")
735
736if args.start_freq < 0.0 or args.start_freq >= 7192.0:
737 raise parser.error("--start-freq must be in the range [0,7192)")
738
739if args.band <= 0 or args.band >= 8192.0:
740 raise parser.error("--band must be in the range (0,8192)")
741
742if args.start_freq + args.band >= 8192.0:
743 raise parser.error("--start-freq + --band must be < 8192")
744
745if args.max_num_per_node <= 0:
746 raise parser.error("--max-num-per-node must be > 0")
747
748if (
749 len(args.channel_name) != len(args.output_sft_path)
750 and len(args.output_sft_path) != 1
751):
752 raise parser.error(
753 "--channel-name and --output-sft-path must be the "
754 "same length or --output-sft-path must be length of 1"
755 )
756
757if len(args.channel_name) > 1 and args.observing_run == 0:
758 raise parser.error(
759 "When creating SFTs from multiple channels, public SFT naming "
760 "convention must be used: --observing-run > 0 and set "
761 "--observing-kind and --observing-revision"
762 )
763
764if args.datafind_urltype == "osdf" and not args.transfer_frame_files:
765 raise parser.error(
766 "--transfer-frame-files must be specified when --datafind-urltype=osdf"
767 )
768
769# Set executables for lalpulsar_MakeSFTs, and lalpulsar_MoveSFTs
770makeSFTsExe = "lalpulsar_MakeSFTs"
771if args.makesfts_path:
772 if args.makesfts_path.is_file():
773 makeSFTsExe = args.makesfts_path
774 else:
775 makeSFTsExe = args.makesfts_path / makeSFTsExe
776elif "MAKESFTS_PATH" in os.environ:
777 makeSFTsExe = Path("$ENV(MAKESFTS_PATH)") / makeSFTsExe
778else:
779 makeSFTsExe = Path("@LALSUITE_BINDIR@") / makeSFTsExe
780
781moveSFTsExe = "lalpulsar_MoveSFTs"
782if args.movesfts_path:
783 if args.movesfts_path.is_file():
784 moveSFTsExe = args.movesfts_path
785 else:
786 moveSFTsExe = args.movesfts_path / moveSFTsExe
787elif "MOVESFTS_PATH" in os.environ:
788 moveSFTsExe = Path("$ENV(MOVESFTS_PATH)") / moveSFTsExe
789else:
790 moveSFTsExe = Path("@LALSUITE_BINDIR@") / moveSFTsExe
791
792# make directories to store the cache files, job logs, and SFTs
793args.log_path.mkdir(exist_ok=True)
794args.cache_path.mkdir(exist_ok=True)
795for p in args.output_sft_path:
796 p.mkdir(exist_ok=True)
797
798# Check if segment file was given, else set up one segment from the command line
799segList = segmentlist()
800adjustSegExtraTime = False
801if args.segment_file is not None:
802 if args.min_seg_length < 0:
803 raise parser.error("--min-seg-length must be >= 0")
804
805 # the next flag causes extra time that cannot be processes to be trimmed
806 # from the start and end of a segment
807 adjustSegExtraTime = True
808
809 with open(args.segment_file) as fp_segfile:
810 for idx, line in enumerate(fp_segfile):
811 splitLine = line.split()
812 oneSeg = segment(int(splitLine[0]), int(splitLine[1]))
813 if abs(oneSeg) >= args.min_seg_length:
814 segList.append(oneSeg)
815
816 if len(segList) < 1:
817 raise ValueError(f"No segments found in segment file: {args.segment_file}")
818else:
819 if args.analysis_start_time is None:
820 raise parser.error(
821 "--analysis-start-time must be specified if no segment file is " "given"
822 )
823
824 if args.analysis_end_time is None:
825 raise parser.error(
826 "--analysis-start-time must be specified if no segment file is " "given"
827 )
828
829 if args.max_length_all_jobs is None:
830 raise parser.error(
831 "--max-length-all-jobs must be specified if no segment file is " "given"
832 )
833
834 # Make sure not to exceed maximum allow analysis
835 if args.analysis_end_time > (args.analysis_start_time + args.max_length_all_jobs):
836 args.analysis_end_time = args.analysis_start_time + args.max_length_all_jobs
837
838 oneSeg = segment(args.analysis_start_time, args.analysis_end_time)
839 segList.append(oneSeg)
840# END if (args.segment_file != None)
841segList.coalesce()
842
843# Get the IFO site, which is the first letter of the channel name.
844site = args.channel_name[0][0]
845
846# Get the frame file URL list
847urls = get_urls(args)
848
849# Basic check that the frame file url list are traditionally visible on EPs
850if not args.transfer_frame_files:
851 for f in urls:
852 if "/home" in f:
853 raise parser.error(
854 "--transfer-frame-files must be specified when frame files are in /home"
855 )
856
857# data segments created from the list of frame URLs
858dataSegs = segmentlist()
859for url in urls:
860 dataSegs.append(file_segment(url))
861dataSegs.coalesce()
862
863# intersection of segList with dataSegs
864segList &= dataSegs
865segList.coalesce() # just in case
866
867# initialize count of nodes
868nodeCount = 0
869
870# Create .sub files
871path_to_dag_file = args.dag_file.parent
872dag_filename = args.dag_file.name
873makesfts_sub = path_to_dag_file / "MakeSFTs.sub"
874movesfts_sub = path_to_dag_file / "MoveSFTs.sub"
875
876# create MakeSFTs.sub
877with open(makesfts_sub, "w") as MakeSFTsFID:
878 MakeSFTsLogFile = f"{args.log_path}/MakeSFTs_{dag_filename}.log"
879 MakeSFTsFID.write("universe = vanilla\n")
880 MakeSFTsFID.write(f"executable = {makeSFTsExe}\n")
881 MakeSFTsFID.write("arguments = $(argList)\n")
882 MakeSFTsFID.write(f"accounting_group = {args.accounting_group}\n")
883 MakeSFTsFID.write(f"accounting_group_user = {args.accounting_group_user}\n")
884 MakeSFTsFID.write(f"log = {MakeSFTsLogFile}\n")
885 MakeSFTsFID.write(f"error = {args.log_path}/MakeSFTs_$(tagstring).err\n")
886 MakeSFTsFID.write(f"output = {args.log_path}/MakeSFTs_$(tagstring).out\n")
887 MakeSFTsFID.write("notification = never\n")
888 MakeSFTsFID.write(f"request_memory = {args.request_memory}MB\n")
889 MakeSFTsFID.write(f"request_disk = {args.request_disk}MB\n")
890 MakeSFTsFID.write("RequestCpus = 1\n")
891 MakeSFTsFID.write("should_transfer_files = yes\n")
892 if args.transfer_frame_files:
893 MakeSFTsFID.write("transfer_input_files = $(cachefile),$(framefiles)\n")
894 else:
895 MakeSFTsFID.write("transfer_input_files = $(cachefile)\n")
896 if "MAKESFTS_PATH" in os.environ and not args.makesfts_path:
897 MakeSFTsFID.write("getenv = MAKESFTS_PATH\n")
898 if args.datafind_urltype == "osdf":
899 MakeSFTsFID.write("use_oauth_services = scitokens\n")
900 MakeSFTsFID.write(
901 "environment = BEARER_TOKEN_FILE=$$(CondorScratchDir)/.condor_creds/scitokens.use\n"
902 )
903 MakeSFTsFID.write("queue 1\n")
904
905# create MoveSFTs.sub
906with open(movesfts_sub, "w") as MoveSFTsFID:
907 MoveSFTsLogFile = f"{args.log_path}/MoveSFTs_{dag_filename}.log"
908 MoveSFTsFID.write("universe = local\n")
909 MoveSFTsFID.write(f"executable = {moveSFTsExe}\n")
910 MoveSFTsFID.write("arguments = ")
911 if not args.validate:
912 MoveSFTsFID.write("$(opts) ")
913 MoveSFTsFID.write("-s $(sourcedirectory) -c $(channels) -d $(destdirectory)\n")
914 MoveSFTsFID.write(f"accounting_group = {args.accounting_group}\n")
915 MoveSFTsFID.write(f"accounting_group_user = {args.accounting_group_user}\n")
916 MoveSFTsFID.write(f"log = {MoveSFTsLogFile}\n")
917 MoveSFTsFID.write(f"error = {args.log_path}/MoveSFTs.err\n")
918 MoveSFTsFID.write(f"output = {args.log_path}/MoveSFTs.out\n")
919 MoveSFTsFID.write("notification = never\n")
920 MoveSFTsFID.write(f"request_memory = 1GB\n")
921 MoveSFTsFID.write(f"request_disk = 10MB\n")
922 MoveSFTsFID.write("RequestCpus = 1\n")
923 if "MOVESFTS_PATH" in os.environ and not args.movesfts_path:
924 MoveSFTsFID.write("getenv = MOVESFTS_PATH\n")
925 MoveSFTsFID.write("queue 1\n")
926
927# create the DAG file with the jobs to run
928with open(args.dag_file, "w") as dagFID:
929 startTimeAllNodes = None
930 firstSFTstartTime = 0 # need this for the synchronized start option
931
932 # Loop over the segment list to generate the SFTs for each segment
933 for seg in segList:
934 # Each segment in the segList runs on one or more nodes;
935 # initialize the number SFTs produced by the current node:
936 numThisNode = 0
937 numThisSeg = 0
938
939 # Case 1: a segment file was given but the SFTs do not need their
940 # start times to be synchronized
941 if adjustSegExtraTime and not args.synchronize_start:
942 segStartTime = seg[0]
943 segEndTime = seg[1]
944
945 # First we figure out how much extra time is in the segment so that
946 # SFTs are fit within the segment:
947 # |..<SFT><SFT><SFT>..|
948 # where the .. represent the extra time in the segment
949 # The amount of extra time in a segment is given as the remainder
950 # of (total segment time) / (SFT time baseline)
951 segExtraTime = (segEndTime - segStartTime) % args.time_baseline
952
953 # If there is overlap of SFTs requested, then we compute the extra
954 # time as:
955 # the remainder of (end - start - Tsft) / (non-overlap time)
956 # provided there was at least one SFT that is in the segment
957 if args.overlap_fraction != 0.0:
958 if (segEndTime - segStartTime) > args.time_baseline:
959 segExtraTime = (
960 segEndTime - segStartTime - args.time_baseline
961 ) % int((1.0 - args.overlap_fraction) * args.time_baseline)
962
963 # We'll add half the extra time to the start of the SFTs to be
964 # created in this segment and half at the end
965 segExtraStart = int(segExtraTime / 2)
966 segExtraEnd = segExtraTime - segExtraStart
967 args.analysis_start_time = segStartTime + segExtraStart
968
969 # This shift may have pushed past the end time of the segment. In
970 # that case, just fix the start time to the end time of the segment
971 if args.analysis_start_time > segEndTime:
972 args.analysis_start_time = segEndTime
973
974 # shifting the end time by the other portion of the extra time
975 # amount ...
976 args.analysis_end_time = segEndTime - segExtraEnd
977
978 # Again, this shift could have pushed the end time beyond the start
979 # of the segment, so just fix the end time to the segment start
980 if args.analysis_end_time < segStartTime:
981 args.analysis_end_time = segStartTime
982
983 # Case 2: SFTs need a synchronized start. This is a special case for
984 # methods like TwoSpect, where signal periodicity spacing must be
985 # maintained
986 elif args.synchronize_start:
987 segStartTime = seg[0]
988 segEndTime = seg[1]
989
990 # If we haven't set the first SFT start time, then set it equal to
991 # the start time of the first segment
992 if firstSFTstartTime == 0:
993 firstSFTstartTime = segStartTime
994
995 # This is a tricky bit of math to set the start time based on when
996 # the first SFT start time of all the segments
997 args.analysis_start_time = (
998 int(
999 round(
1000 math.ceil(
1001 (segStartTime - firstSFTstartTime)
1002 / ((1.0 - args.overlap_fraction) * args.time_baseline)
1003 )
1004 * (1.0 - args.overlap_fraction)
1005 * args.time_baseline
1006 )
1007 )
1008 + firstSFTstartTime
1009 )
1010
1011 # This shift may have pushed past the end time of the segment. In
1012 # that case, just fix the start time to the end time of the segment
1013 if args.analysis_start_time > segEndTime:
1014 args.analysis_start_time = segEndTime
1015
1016 # This is a tricky bit of math to set the end time based on when
1017 # the first SFT start time of all the segments
1018 args.analysis_end_time = (
1019 int(
1020 round(
1021 math.floor(
1022 (segEndTime - args.analysis_start_time - args.time_baseline)
1023 / ((1.0 - args.overlap_fraction) * args.time_baseline)
1024 )
1025 * (1.0 - args.overlap_fraction)
1026 * args.time_baseline
1027 )
1028 )
1029 + args.time_baseline
1030 + args.analysis_start_time
1031 )
1032
1033 # Again, this shift could have pushed the end time beyond the start
1034 # of the segment, so just fix the end time to the segment start
1035 if args.analysis_end_time < segStartTime:
1036 args.analysis_end_time = segStartTime
1037
1038 # If no segment file given and no synchronized starts, just set the
1039 # start time and end time to the segment start and end
1040 else:
1041 args.analysis_start_time = seg[0]
1042 args.analysis_end_time = seg[1]
1043
1044 # Loop through the analysis time; make sure no more than
1045 # args.max_num_per_node SFTs are produced by any one node
1046 startTimeThisNode = args.analysis_start_time
1047 endTimeThisNode = args.analysis_start_time
1048 endTimeAllNodes = args.analysis_start_time
1049 while endTimeAllNodes < args.analysis_end_time:
1050 # increment endTimeAllNodes by the args.time_baseline until we get
1051 # past the args.analysis_end_time
1052 if args.overlap_fraction != 0.0:
1053 # handle overlap
1054 if numThisSeg == 0:
1055 endTimeAllNodes = endTimeAllNodes + args.time_baseline
1056 else:
1057 endTimeAllNodes = endTimeAllNodes + int(
1058 (1.0 - args.overlap_fraction) * args.time_baseline
1059 )
1060 else:
1061 # default case, no overlap
1062 endTimeAllNodes = endTimeAllNodes + args.time_baseline
1063 if endTimeAllNodes <= args.analysis_end_time:
1064 # increment the number of SFTs output from this node, and
1065 # update the end time this node.
1066 numThisNode = numThisNode + 1
1067 numThisSeg = numThisSeg + 1
1068 endTimeThisNode = endTimeAllNodes
1069 if numThisNode < args.max_num_per_node:
1070 continue
1071 else:
1072 # write jobs to dag for this node
1073 nodeCount = nodeCount + 1
1074
1075 if nodeCount == 1:
1076 startTimeAllNodes = startTimeThisNode
1077 writeToDag(
1078 dagFID,
1079 nodeCount,
1080 startTimeThisNode,
1081 endTimeThisNode,
1082 urls,
1083 args,
1084 )
1085 # Update for next node
1086 numThisNode = 0
1087 if args.overlap_fraction != 0.0:
1088 # handle overlap
1089 startTimeThisNode = endTimeThisNode - int(
1090 (args.overlap_fraction) * args.time_baseline
1091 )
1092 else:
1093 # default case, no overlap
1094 startTimeThisNode = endTimeThisNode
1095 else:
1096 # we are at or past the args.analysis_end_time; output job for last
1097 # node if needed.
1098 if numThisNode > 0:
1099 # write jobs to dag for this node
1100 nodeCount = nodeCount + 1
1101
1102 if nodeCount == 1:
1103 startTimeAllNodes = startTimeThisNode
1104 writeToDag(
1105 dagFID, nodeCount, startTimeThisNode, endTimeThisNode, urls, args
1106 )
1107 # END while (endTimeAllNodes < args.analysis_end_time)
1108 # END for seg in segList
1109
1110 # Write the move SFTs job to the DAG
1111 # Move SFTs
1112 dagFID.write(f"JOB MoveSFTs {Path(dagFID.name).parent / 'MoveSFTs.sub'}\n")
1113 dagFID.write(f"RETRY MoveSFTs 1\n")
1114 dagFID.write(f"VARS MoveSFTs ")
1115 if not args.validate:
1116 dagFID.write('opts="--no-validate" ')
1117 dagFID.write(
1118 f'sourcedirectory="." '
1119 f"channels=\"{' '.join(args.channel_name)}\" "
1120 f"destdirectory=\"{' '.join([str(p) for p in args.output_sft_path])}\"\n"
1121 )
1122 dagFID.write(
1123 f"PARENT {' '.join([f'MakeSFTs_{n}' for n in range(1, nodeCount+1)])} CHILD MoveSFTs\n"
1124 )
1125
1126# Close the DAG file
1127
1128# Update actual end time of the last job and print out the times all jobs will run on:
1129endTimeAllNodes = endTimeThisNode
1130
1131if startTimeAllNodes is None:
1132 raise Exception("The startTimeAllNodes == none; the DAG file contains no jobs!")
1133
1134if endTimeAllNodes <= startTimeAllNodes:
1135 raise Exception(
1136 "The endTimeAllNodes <= startTimeAllNodes; the DAG file contains no jobs!"
1137 )
1138
1139print(startTimeAllNodes, endTimeAllNodes)
def __call__(self, parser, namespace, values, option_string=None)
def make_cache(urls, job_seg)
Make a frame list and cache list from a list of URLs.
def sft_name_from_vars(obs, gpsstart, Tsft, channel=None, kind=None, rev=None, window="unknown", par=None, miscstr=None)
Create SFT file name from specification.
def writeToDag(dagFID, nodeCount, startTimeThisNode, endTimeThisNode, urls, args)
Write one job to DAG file.
def get_urls(args)
Get frame file URL list from gwdatafind or cache file.