23"""Creates DAGs to run jobs that generates SFTs"""
29from pathlib
import Path
30from urllib.parse
import urlparse
32from gwdatafind
import find_urls
33from gwdatafind.utils
import filename_metadata, file_segment
35from gwpy.segments
import Segment, SegmentList
37from lalpulsar
import (
40 FillSFTFilenameSpecStrings,
41 BuildSFTFilenameFromSpec,
44__author__ =
"Evan Goetz <evan.goetz@ligo.org>, Greg Mendell"
45__version__ = git_version.id
46__date__ = git_version.date
48cache_re = re.compile(
r"^([A-Z])(\s+)(\w+)(\s+)(\d+)(\s+)(\d+)(\s+)(.+gwf)")
113 """Create SFT file name from specification"""
115 spec = SFTFilenameSpec()
117 FillSFTFilenameSpecStrings(
121 detector=channel[:2],
127 spec.pubObsRun = obs
or 0
128 spec.pubRevision = rev
or 0
129 spec.window_param = par
or 0
131 spec.SFTtimebase = Tsft
132 spec.gpsStart = gpsstart
135 return BuildSFTFilenameFromSpec(spec)
139 """Get frame file URL list from gwdatafind or cache file"""
141 if not args.cache_file:
144 args.input_data_type,
147 match=args.datafind_match,
148 urltype=args.datafind_urltype,
152 with open(args.cache_file,
"r")
as f:
154 m = cache_re.match(line)
156 framefile = m.group(9)
157 urls.append(framefile)
166 """Make a frame list and cache list from a list of URLs"""
170 for idx, url
in enumerate(urls):
171 obs, desc, dataseg = filename_metadata(url)
172 dataseg = Segment(dataseg)
173 if dataseg.disjoint(job_seg) < 0:
175 if dataseg.disjoint(job_seg) > 0:
177 if dataseg.intersects(job_seg):
178 framefileurl = urlparse(url)
179 framefilepath = Path(framefileurl.path)
183 if "/home" in str(framefilepath.parent)
or "osdf" in framefileurl.scheme:
184 newcache = f
"{obs}\t{desc}\t{dataseg.start}\t{abs(dataseg)}\t{framefilepath.name}"
186 newcache = f
"{obs}\t{desc}\t{dataseg.start}\t{abs(dataseg)}\t{url}"
187 cache.append(newcache)
189 if "/home" in str(framefilepath.parent):
190 frames.append(framefilepath)
197def writeToDag(dagFID, nodeCount, startTimeThisNode, endTimeThisNode, urls, args):
198 """Write one job to DAG file"""
200 MakeSFTs = f
"MakeSFTs_{nodeCount}"
201 tagStringOut = f
"{args.tag_string}_{nodeCount}"
203 job_segment = Segment(
204 startTimeThisNode - args.extra_datafind_time,
205 endTimeThisNode + args.extra_datafind_time,
210 obs, desc, dataseg = filename_metadata(urls[0])
211 cacheFile = args.cache_path / f
"{obs}-{job_segment.start}-{job_segment.end}.cache"
212 with open(cacheFile,
"w")
as f:
217 argList.append(f
"-O {args.observing_run}")
218 if args.observing_run > 0:
219 argList.append(f
"-K {args.observing_kind}")
220 argList.append(f
"-R {args.observing_revision}")
222 argList.append(f
"-X {args.misc_desc}")
223 argList.append(f
"-f {args.filter_knee_freq}")
224 argList.append(f
"-t {args.time_baseline}")
228 argList.append(f
"-p {','.join(['.' for p in args.output_sft_path])}")
232 argList.append(f
"-C {cacheFile.name}")
233 argList.append(f
"-s {startTimeThisNode}")
234 argList.append(f
"-e {endTimeThisNode}")
235 argList.append(f
"-N {','.join(args.channel_name)}")
236 argList.append(f
"-F {args.start_freq}")
237 argList.append(f
"-B {args.band}")
238 if args.comment_field:
239 argList.append(f
"-c {args.comment_field}")
240 if ":" in args.window_type:
241 window_type, window_param = args.window_type.split(
":")
242 window_param =
float(window_param)
243 argList.append(f
"-w {window_type} -r {window_param}")
245 window_type = args.window_type
247 argList.append(f
"-w {window_type}")
248 if args.overlap_fraction:
249 argList.append(f
"-P {args.overlap_fraction}")
250 if args.allow_skipping:
251 argList.append(
"--allow-skipping TRUE")
252 argStr =
" ".join(argList)
258 sft_start = startTimeThisNode
259 sft_end = sft_start + args.time_baseline
261 while sft_end <= endTimeThisNode:
263 for idx, c
in enumerate(args.channel_name):
269 kind=args.observing_kind,
270 rev=args.observing_revision,
273 miscstr=args.misc_desc,
275 outputfiles.append(filename)
276 remap.append(f
"{filename}={args.output_sft_path[idx]/filename}")
279 if args.overlap_fraction:
280 sft_start +=
int(round((1 - args.overlap_fraction) * args.time_baseline))
282 sft_start += args.time_baseline
283 sft_end = sft_start + args.time_baseline
286 dagFID.write(f
"JOB {MakeSFTs} {Path(dagFID.name).parent / 'MakeSFTs.sub'}\n")
287 dagFID.write(f
"RETRY {MakeSFTs} 1\n")
288 dagFID.write(f
'VARS {MakeSFTs} argList="{argStr}" cachefile="{cacheFile}" ')
289 if args.transfer_frame_files:
290 framefiles =
",".join([
str(fr)
for fr
in frames])
291 dagFID.write(f
'framefiles="{framefiles}" ')
292 dagFID.write(f
'tagstring="{tagStringOut}"\n')
299parser = argparse.ArgumentParser(
300 description=
"This script creates MakeSFTs.sub, MoveSFTs.sub, and a dag \
301 file that generates SFTs based on the options given.",
302 fromfile_prefix_chars=
"@",
305dag_group = parser.add_argument_group(
306 "DAG organization",
"Options for workflow control"
308datafind_group = parser.add_argument_group(
309 "Datafind",
"Options for locating frame files"
311makesfts_group = parser.add_argument_group(
312 "SFT creation",
"Options for SFT creation and output"
314deprecated_group = parser.add_argument_group(
"DEPRECATED")
316dag_group.add_argument(
321 help=
"filename for .dag file (should end in .dag)",
323dag_group.add_argument(
328 help=
"tag string used in names of various files unique to \
329 jobs that will run under the DAG",
331dag_group.add_argument(
333 "--analysis-start-time",
335 help=
"GPS start time of data from which to generate \
336 SFTs (optional and unused if a segment file is given)",
338dag_group.add_argument(
340 "--analysis-end-time",
342 help=
"GPS end time of data from which to generate SFTs \
343 (optional and unused if a segment file is given)",
345dag_group.add_argument(
347 "--max-length-all-jobs",
349 help=
"maximum total amount of data to process, in seconds \
350 (optional and unused if a segment file is given)",
352dag_group.add_argument(
356 help=
"alternative file with segments to use, rather than \
359dag_group.add_argument(
364 help=
"minimum length segments to process in seconds (used \
365 only if a segment file is given)",
367dag_group.add_argument(
369 "--synchronize-start",
371 help=
"synchronize the start times of the SFTs so that the \
372 start times are synchronized when there are gaps in the \
375dag_group.add_argument(
380 help=
"path to log, output, and error files (default \
381 is $PWD/logs; this directory is created if it does not \
382 exist and usually should be under a local file system)",
384dag_group.add_argument(
386 "--max-num-per-node",
389 help=
"maximum number of SFTs to generate on one node",
391dag_group.add_argument(
395 help=
"string specifying the lalpulsar_MakeSFTs executable, \
396 or a path to it; if not set, will use \
397 MAKESFTS_PATH env variable or system default (in that \
400dag_group.add_argument(
403 help=
"string specifying the lalpulsar_MoveSFTs executable, \
404 or a path to it; if not set, will use \
405 MOVESFTS_PATH env variable or system default (in that \
408dag_group.add_argument(
413 help=
"memory allocation in MB to request from condor for \
414 lalpulsar_MakeSFTs step",
416dag_group.add_argument(
421 help=
"disk space allocation in MB to request from condor \
422 for lalpulsar_MakeSFTs step",
424dag_group.add_argument(
426 "--accounting-group",
429 help=
"Condor tag for the production of SFTs",
431dag_group.add_argument(
433 "--accounting-group-user",
436 help=
"albert.einstein username (do not add @LIGO.ORG)",
438dag_group.add_argument(
440 "--transfer-frame-files",
442 help=
"Transfer frame files via HTCondor file transfer system. \
443 This should be specified if frames are not visible to the \
444 compute node file system. Ex. this should be specified if \
445 frames are on /home or running on the open science grid. \
446 Usually frame files are visible on CIT, LHO, LLO clusters \
447 so that this does not need to be specified in that case.",
450datafind_group.add_argument(
455 help=
"input data type for use with the gw_data_find --type \
458datafind_group.add_argument(
460 "--extra-datafind-time",
463 help=
"extra time to subtract/add from/to start/end time \
464 arguments of gw_data_find",
466datafind_group.add_argument(
470 help=
"string to use with the gw_data_find --match option",
472datafind_group.add_argument(
473 "--datafind-urltype",
476 choices=[
"file",
"osdf"],
477 help=
"String for the gw_data_find --urltype option. \
478 Use 'file' if creating SFTs on a local LDG cluster. \
479 Use 'osdf' if creating SFTs on the open science grid",
481datafind_group.add_argument(
485 help=
"path and filename to frame cache file to use instead \
489makesfts_group.add_argument(
494 help=
"For public SFTs, observing run data the SFTs are generated from, or \
495 (in the case of mock data challenge data) the observing \
496 run on which the data is most closely based",
498makesfts_group.add_argument(
502 choices=[
"RUN",
"AUX",
"SIM",
"DEV"],
503 help=
'For public SFTs, one of: "RUN" for production SFTs of h(t) channels; \
504 "AUX" for SFTs of non-h(t) channels; \
505 "SIM" for mock data challenge or other simulated data; or \
506 "DEV" for development/testing purposes',
508makesfts_group.add_argument(
510 "--observing-revision",
512 help=
"For public SFTs: revision number starts at 1, and should be incremented once \
513 SFTs have been widely distributed across clusters, advertised \
514 as being ready for use, etc. For example, if mistakes are found \
515 in the initial SFT production run after they have been published, \
516 regenerated SFTs should have a revision number of at least 2",
518makesfts_group.add_argument(
522 help=
"For private SFTs, miscellaneous part of the SFT \
523 description field in the filename",
525makesfts_group.add_argument(
527 "--filter-knee-freq",
530 help=
"high pass filter knee frequency used on time domain \
531 data before generating SFTs",
533makesfts_group.add_argument(
538 help=
"time baseline of SFTs (e.g., 60 or 1800 seconds)",
540makesfts_group.add_argument(
545 help=
"Path where to save the SFT files. Can specify multiple options, \
546 If specifying multiple options then it is required to specify the \
547 same number of output-sft-path options as the number of channels. \
548 The first listed channel will have the SFTs go into the first \
549 listed output-sft-path. Otherwise specify only one output path. \
550 If one path is specified and more than 1 channels are specified \
551 then --observing-run must be >= 1 and --observing-kind and \
552 --observing-revision must be set",
554makesfts_group.add_argument(
559 help=
"path to cache files that will be produced by \
560 gw_data_find (default is $PWD/cache; this directory is \
561 created if it does not exist and must agree with that \
562 given in .sub files)",
564makesfts_group.add_argument(
569 help=
"Name of input time-domain channel to read from frames. \
570 Can specify multiple options. The number of channels must be \
571 equal to the number of output-sft-path options given. The \
572 first listed channel will have the SFTs go to the first listed \
573 output-sft-path. Can only specify one channel when generating \
574 private SFTs (--observing-run=0)",
576makesfts_group.add_argument(
577 "-c",
"--comment-field", type=str, help=
"comment for SFT header"
579makesfts_group.add_argument(
580 "-F",
"--start-freq", type=int, default=10, help=
"start frequency of the SFTs"
582makesfts_group.add_argument(
583 "-B",
"--band", type=int, default=1990, help=
"frequency band of the SFTs"
585makesfts_group.add_argument(
589 default=
"tukey:0.001",
590 help=
'type of windowing of time-domain to do \
591 before generating SFTs, e.g. "rectangular", \
592 "hann", "tukey:<beta in [0,1], required>"; \
593 (default is "tukey:0.001", standard choice for LVK production SFTs)',
595makesfts_group.add_argument(
597 "--overlap-fraction",
600 help=
"overlap fraction (for use with windows; e.g., use \
601 --overlap-fraction 0.5 with --window-type hann windows)",
603makesfts_group.add_argument(
606 help=
"allow channels to be skipped if not in frames or too low sampling \
609makesfts_group.add_argument(
612 action=
"store_false",
613 help=
"do not validate created SFTs",
618 def __call__(self, parser, namespace, values, option_string=None):
620 f
"Argument {self.option_strings} has been deprecated in lalpulsar_MakeSFTs"
624deprecated_group.add_argument(
626 "--frame-struct-type",
628 action=DeprecateAction,
629 help=
"DEPRECATED. No longer required; \
630 the frame channel type is determined automatically",
632deprecated_group.add_argument(
636 action=DeprecateAction,
637 help=
"DEPRECATED. No longer required; \
638 the frame channel type is determined automatically",
640deprecated_group.add_argument(
644 action=DeprecateAction,
645 help=
"DEPRECATED. No longer required; \
646 the detector prefix is deduced from the channel name",
648deprecated_group.add_argument(
652 action=DeprecateAction,
653 help=
"DEPRECATED. No longer supported",
655deprecated_group.add_argument(
659 action=DeprecateAction,
660 help=
"DEPRECATED. Default behaviour",
662deprecated_group.add_argument(
666 action=DeprecateAction,
667 help=
"DEPRECATED. No longer supported",
669deprecated_group.add_argument(
673 action=DeprecateAction,
674 help=
"DEPRECATED. No longer supported",
676deprecated_group.add_argument(
680 action=DeprecateAction,
681 help=
"DEPCRECATED. No longer supported",
683deprecated_group.add_argument(
687 action=DeprecateAction,
688 help=
"DEPCRECATED. No longer supported",
690deprecated_group.add_argument(
692 "--output-jobs-per-node",
695 action=DeprecateAction,
696 help=
"DEPRECATED. No longer supported",
699args = parser.parse_args()
702if args.observing_run < 0:
703 raise parser.error(
"--observing-run must be >= 0")
705if args.observing_run > 0
and not args.observing_kind:
706 raise parser.error(
"--observing-run requires --observing-kind")
708if args.observing_run > 0
and not args.observing_revision:
709 raise parser.error(
"--observing-run requires --observing-revision")
711if args.observing_revision
and args.observing_revision <= 0:
712 raise parser.error(
"--observing-revision must be > 0")
714if args.observing_run > 0
and args.misc_desc:
716 f
"--observing-run={args.observing_run} incompatible with --misc-desc"
719if args.misc_desc
and not re.compile(
r"^[A-Za-z0-9]+$").match(args.misc_desc):
720 raise parser.error(
"--misc-desc may only contain A-Z, a-z, 0-9 characters")
722if args.extra_datafind_time < 0:
723 raise parser.error(
"--extra-datafind-time must be >= 0")
725if args.filter_knee_freq < 0:
726 raise parser.error(
"--filter-knee-freq must be >= 0")
728if args.time_baseline <= 0:
729 raise parser.error(
"--time-baseline must be > 0")
731if args.overlap_fraction < 0.0
or args.overlap_fraction >= 1.0:
732 raise parser.error(
"--overlap-fraction must be in the range [0,1)")
734if args.start_freq < 0.0
or args.start_freq >= 7192.0:
735 raise parser.error(
"--start-freq must be in the range [0,7192)")
737if args.band <= 0
or args.band >= 8192.0:
738 raise parser.error(
"--band must be in the range (0,8192)")
740if args.start_freq + args.band >= 8192.0:
741 raise parser.error(
"--start-freq + --band must be < 8192")
743if args.max_num_per_node <= 0:
744 raise parser.error(
"--max-num-per-node must be > 0")
747 len(args.channel_name) != len(args.output_sft_path)
748 and len(args.output_sft_path) != 1
751 "--channel-name and --output-sft-path must be the "
752 "same length or --output-sft-path must be length of 1"
755if len(args.channel_name) > 1
and args.observing_run == 0:
757 "When creating SFTs from multiple channels, public SFT naming "
758 "convention must be used: --observing-run > 0 and set "
759 "--observing-kind and --observing-revision"
762if args.datafind_urltype ==
"osdf" and not args.transfer_frame_files:
764 "--transfer-frame-files must be specified when --datafind-urltype=osdf"
768makeSFTsExe =
"lalpulsar_MakeSFTs"
769if args.makesfts_path:
770 if args.makesfts_path.is_file():
771 makeSFTsExe = args.makesfts_path
773 makeSFTsExe = args.makesfts_path / makeSFTsExe
774elif "MAKESFTS_PATH" in os.environ:
775 makeSFTsExe = Path(
"$ENV(MAKESFTS_PATH)") / makeSFTsExe
777 makeSFTsExe = Path(
"@LALSUITE_BINDIR@") / makeSFTsExe
779moveSFTsExe =
"lalpulsar_MoveSFTs"
780if args.movesfts_path:
781 if args.movesfts_path.is_file():
782 moveSFTsExe = args.movesfts_path
784 moveSFTsExe = args.movesfts_path / moveSFTsExe
785elif "MOVESFTS_PATH" in os.environ:
786 moveSFTsExe = Path(
"$ENV(MOVESFTS_PATH)") / moveSFTsExe
788 moveSFTsExe = Path(
"@LALSUITE_BINDIR@") / moveSFTsExe
791args.log_path.mkdir(exist_ok=
True)
792args.cache_path.mkdir(exist_ok=
True)
793for p
in args.output_sft_path:
794 p.mkdir(exist_ok=
True)
797segList = SegmentList()
798adjustSegExtraTime =
False
799if args.segment_file
is not None:
800 if args.min_seg_length < 0:
801 raise parser.error(
"--min-seg-length must be >= 0")
805 adjustSegExtraTime =
True
807 with open(args.segment_file)
as fp_segfile:
808 for idx, line
in enumerate(fp_segfile):
809 splitLine = line.split()
810 oneSeg = Segment(
int(splitLine[0]),
int(splitLine[1]))
811 if abs(oneSeg) >= args.min_seg_length:
812 segList.append(oneSeg)
815 raise ValueError(f
"No segments found in segment file: {args.segment_file}")
817 if args.analysis_start_time
is None:
819 "--analysis-start-time must be specified if no segment file is " "given"
822 if args.analysis_end_time
is None:
824 "--analysis-start-time must be specified if no segment file is " "given"
827 if args.max_length_all_jobs
is None:
829 "--max-length-all-jobs must be specified if no segment file is " "given"
833 if args.analysis_end_time > (args.analysis_start_time + args.max_length_all_jobs):
834 args.analysis_end_time = args.analysis_start_time + args.max_length_all_jobs
836 oneSeg = Segment(args.analysis_start_time, args.analysis_end_time)
837 segList.append(oneSeg)
842site = args.channel_name[0][0]
848if not args.transfer_frame_files:
852 "--transfer-frame-files must be specified when frame files are in /home"
856dataSegs = SegmentList()
858 dataSegs.append(file_segment(url))
869path_to_dag_file = args.dag_file.parent
870dag_filename = args.dag_file.name
871makesfts_sub = path_to_dag_file /
"MakeSFTs.sub"
872movesfts_sub = path_to_dag_file /
"MoveSFTs.sub"
875with open(makesfts_sub,
"w")
as MakeSFTsFID:
876 MakeSFTsLogFile = f
"{args.log_path}/MakeSFTs_{dag_filename}.log"
877 MakeSFTsFID.write(
"universe = vanilla\n")
878 MakeSFTsFID.write(f
"executable = {makeSFTsExe}\n")
879 MakeSFTsFID.write(
"arguments = $(argList)\n")
880 MakeSFTsFID.write(f
"accounting_group = {args.accounting_group}\n")
881 MakeSFTsFID.write(f
"accounting_group_user = {args.accounting_group_user}\n")
882 MakeSFTsFID.write(f
"log = {MakeSFTsLogFile}\n")
883 MakeSFTsFID.write(f
"error = {args.log_path}/MakeSFTs_$(tagstring).err\n")
884 MakeSFTsFID.write(f
"output = {args.log_path}/MakeSFTs_$(tagstring).out\n")
885 MakeSFTsFID.write(
"notification = never\n")
886 MakeSFTsFID.write(f
"request_memory = {args.request_memory}MB\n")
887 MakeSFTsFID.write(f
"request_disk = {args.request_disk}MB\n")
888 MakeSFTsFID.write(
"RequestCpus = 1\n")
889 MakeSFTsFID.write(
"should_transfer_files = yes\n")
890 if args.transfer_frame_files:
891 MakeSFTsFID.write(
"transfer_input_files = $(cachefile),$(framefiles)\n")
893 MakeSFTsFID.write(
"transfer_input_files = $(cachefile)\n")
894 if "MAKESFTS_PATH" in os.environ
and not args.makesfts_path:
895 MakeSFTsFID.write(
"getenv = MAKESFTS_PATH\n")
896 if args.datafind_urltype ==
"osdf":
897 MakeSFTsFID.write(
"use_oauth_services = scitokens\n")
899 "environment = BEARER_TOKEN_FILE=$$(CondorScratchDir)/.condor_creds/scitokens.use\n"
901 MakeSFTsFID.write(
"queue 1\n")
904with open(movesfts_sub,
"w")
as MoveSFTsFID:
905 MoveSFTsLogFile = f
"{args.log_path}/MoveSFTs_{dag_filename}.log"
906 MoveSFTsFID.write(
"universe = local\n")
907 MoveSFTsFID.write(f
"executable = {moveSFTsExe}\n")
908 MoveSFTsFID.write(
"arguments = ")
909 if not args.validate:
910 MoveSFTsFID.write(
"$(opts) ")
911 MoveSFTsFID.write(
"-s $(sourcedirectory) -c $(channels) -d $(destdirectory)\n")
912 MoveSFTsFID.write(f
"accounting_group = {args.accounting_group}\n")
913 MoveSFTsFID.write(f
"accounting_group_user = {args.accounting_group_user}\n")
914 MoveSFTsFID.write(f
"log = {MoveSFTsLogFile}\n")
915 MoveSFTsFID.write(f
"error = {args.log_path}/MoveSFTs.err\n")
916 MoveSFTsFID.write(f
"output = {args.log_path}/MoveSFTs.out\n")
917 MoveSFTsFID.write(
"notification = never\n")
918 MoveSFTsFID.write(f
"request_memory = 1GB\n")
919 MoveSFTsFID.write(f
"request_disk = 10MB\n")
920 MoveSFTsFID.write(
"RequestCpus = 1\n")
921 if "MOVESFTS_PATH" in os.environ
and not args.movesfts_path:
922 MoveSFTsFID.write(
"getenv = MOVESFTS_PATH\n")
923 MoveSFTsFID.write(
"queue 1\n")
926with open(args.dag_file,
"w")
as dagFID:
927 startTimeAllNodes =
None
928 firstSFTstartTime = 0
939 if adjustSegExtraTime
and not args.synchronize_start:
940 segStartTime = seg[0]
949 segExtraTime = (segEndTime - segStartTime) % args.time_baseline
955 if args.overlap_fraction != 0.0:
956 if (segEndTime - segStartTime) > args.time_baseline:
958 segEndTime - segStartTime - args.time_baseline
959 ) %
int((1.0 - args.overlap_fraction) * args.time_baseline)
963 segExtraStart =
int(segExtraTime / 2)
964 segExtraEnd = segExtraTime - segExtraStart
965 args.analysis_start_time = segStartTime + segExtraStart
969 if args.analysis_start_time > segEndTime:
970 args.analysis_start_time = segEndTime
974 args.analysis_end_time = segEndTime - segExtraEnd
978 if args.analysis_end_time < segStartTime:
979 args.analysis_end_time = segStartTime
984 elif args.synchronize_start:
985 segStartTime = seg[0]
990 if firstSFTstartTime == 0:
991 firstSFTstartTime = segStartTime
995 args.analysis_start_time = (
999 (segStartTime - firstSFTstartTime)
1000 / ((1.0 - args.overlap_fraction) * args.time_baseline)
1002 * (1.0 - args.overlap_fraction)
1003 * args.time_baseline
1011 if args.analysis_start_time > segEndTime:
1012 args.analysis_start_time = segEndTime
1016 args.analysis_end_time = (
1020 (segEndTime - args.analysis_start_time - args.time_baseline)
1021 / ((1.0 - args.overlap_fraction) * args.time_baseline)
1023 * (1.0 - args.overlap_fraction)
1024 * args.time_baseline
1027 + args.time_baseline
1028 + args.analysis_start_time
1033 if args.analysis_end_time < segStartTime:
1034 args.analysis_end_time = segStartTime
1039 args.analysis_start_time = seg[0]
1040 args.analysis_end_time = seg[1]
1044 startTimeThisNode = args.analysis_start_time
1045 endTimeThisNode = args.analysis_start_time
1046 endTimeAllNodes = args.analysis_start_time
1047 while endTimeAllNodes < args.analysis_end_time:
1050 if args.overlap_fraction != 0.0:
1053 endTimeAllNodes = endTimeAllNodes + args.time_baseline
1055 endTimeAllNodes = endTimeAllNodes +
int(
1056 (1.0 - args.overlap_fraction) * args.time_baseline
1060 endTimeAllNodes = endTimeAllNodes + args.time_baseline
1061 if endTimeAllNodes <= args.analysis_end_time:
1064 numThisNode = numThisNode + 1
1065 numThisSeg = numThisSeg + 1
1066 endTimeThisNode = endTimeAllNodes
1067 if numThisNode < args.max_num_per_node:
1071 nodeCount = nodeCount + 1
1074 startTimeAllNodes = startTimeThisNode
1085 if args.overlap_fraction != 0.0:
1087 startTimeThisNode = endTimeThisNode -
int(
1088 (args.overlap_fraction) * args.time_baseline
1092 startTimeThisNode = endTimeThisNode
1098 nodeCount = nodeCount + 1
1101 startTimeAllNodes = startTimeThisNode
1103 dagFID, nodeCount, startTimeThisNode, endTimeThisNode, urls, args
1110 dagFID.write(f
"JOB MoveSFTs {Path(dagFID.name).parent / 'MoveSFTs.sub'}\n")
1111 dagFID.write(f
"RETRY MoveSFTs 1\n")
1112 dagFID.write(f
"VARS MoveSFTs ")
1113 if not args.validate:
1114 dagFID.write(
'opts="--no-validate" ')
1116 f
'sourcedirectory="." '
1117 f
"channels=\"{' '.join(args.channel_name)}\" "
1118 f
"destdirectory=\"{' '.join([str(p) for p in args.output_sft_path])}\"\n"
1121 f
"PARENT {' '.join([f'MakeSFTs_{n}' for n in range(1, nodeCount+1)])} CHILD MoveSFTs\n"
1127endTimeAllNodes = endTimeThisNode
1129if startTimeAllNodes
is None:
1130 raise Exception(
"The startTimeAllNodes == none; the DAG file contains no jobs!")
1132if endTimeAllNodes <= startTimeAllNodes:
1134 "The endTimeAllNodes <= startTimeAllNodes; the DAG file contains no jobs!"
1137print(startTimeAllNodes, endTimeAllNodes)
DEPRECATED OPTIONS #####.
def __call__(self, parser, namespace, values, option_string=None)
def make_cache(urls, job_seg)
Make a frame list and cache list from a list of URLs.
def sft_name_from_vars(obs, gpsstart, Tsft, channel=None, kind=None, rev=None, window="unknown", par=None, miscstr=None)
Create SFT file name from specification.
def writeToDag(dagFID, nodeCount, startTimeThisNode, endTimeThisNode, urls, args)
Write one job to DAG file.
def get_urls(args)
Get frame file URL list from gwdatafind or cache file.