23"""Creates DAGs to run jobs that generates SFTs"""
29from pathlib
import Path
30from urllib.parse
import urlparse
32from gwdatafind
import find_urls
33from gwdatafind.utils
import filename_metadata, file_segment
35from igwn_segments
import segment, segmentlist
37from lalpulsar
import (
40 FillSFTFilenameSpecStrings,
41 BuildSFTFilenameFromSpec,
44__author__ =
"Evan Goetz <evan.goetz@ligo.org>, Greg Mendell"
45__version__ = git_version.id
46__date__ = git_version.date
48cache_re = re.compile(
r"^([A-Z])(\s+)(\w+)(\s+)(\d+)(\s+)(\d+)(\s+)(.+gwf)")
113 """Create SFT file name from specification"""
115 spec = SFTFilenameSpec()
117 FillSFTFilenameSpecStrings(
121 detector=channel[:2],
127 spec.pubObsRun = obs
or 0
128 spec.pubRevision = rev
or 0
129 spec.window_param = par
or 0
131 spec.SFTtimebase = Tsft
132 spec.gpsStart = gpsstart
135 return BuildSFTFilenameFromSpec(spec)
139 """Get frame file URL list from gwdatafind or cache file"""
141 if not args.cache_file:
144 args.input_data_type,
147 match=args.datafind_match,
148 urltype=args.datafind_urltype,
152 with open(args.cache_file,
"r")
as f:
154 m = cache_re.match(line)
156 framefile = m.group(9)
157 urls.append(framefile)
166 """Make a frame list and cache list from a list of URLs"""
170 for idx, url
in enumerate(urls):
171 obs, desc, dataseg = filename_metadata(url)
172 dataseg = segment(dataseg)
173 if dataseg.disjoint(job_seg) < 0:
175 if dataseg.disjoint(job_seg) > 0:
177 if dataseg.intersects(job_seg):
178 framefileurl = urlparse(url)
179 framefilepath = Path(framefileurl.path)
183 if "/home" in str(framefilepath.parent)
or "osdf" in framefileurl.scheme:
185 f
"{obs}\t{desc}\t{dataseg[0]}\t{abs(dataseg)}\t{framefilepath.name}"
188 newcache = f
"{obs}\t{desc}\t{dataseg[0]}\t{abs(dataseg)}\t{url}"
189 cache.append(newcache)
191 if "/home" in str(framefilepath.parent):
192 frames.append(framefilepath)
199def writeToDag(dagFID, nodeCount, startTimeThisNode, endTimeThisNode, urls, args):
200 """Write one job to DAG file"""
202 MakeSFTs = f
"MakeSFTs_{nodeCount}"
203 tagStringOut = f
"{args.tag_string}_{nodeCount}"
205 job_segment = segment(
206 startTimeThisNode - args.extra_datafind_time,
207 endTimeThisNode + args.extra_datafind_time,
212 obs, desc, dataseg = filename_metadata(urls[0])
213 cacheFile = args.cache_path / f
"{obs}-{job_segment[0]}-{job_segment[1]}.cache"
214 with open(cacheFile,
"w")
as f:
219 argList.append(f
"-O {args.observing_run}")
220 if args.observing_run > 0:
221 argList.append(f
"-K {args.observing_kind}")
222 argList.append(f
"-R {args.observing_revision}")
224 argList.append(f
"-X {args.misc_desc}")
225 argList.append(f
"-f {args.filter_knee_freq}")
226 argList.append(f
"-t {args.time_baseline}")
230 argList.append(f
"-p {','.join(['.' for p in args.output_sft_path])}")
234 argList.append(f
"-C {cacheFile.name}")
235 argList.append(f
"-s {startTimeThisNode}")
236 argList.append(f
"-e {endTimeThisNode}")
237 argList.append(f
"-N {','.join(args.channel_name)}")
238 argList.append(f
"-F {args.start_freq}")
239 argList.append(f
"-B {args.band}")
240 if args.comment_field:
241 argList.append(f
"-c {args.comment_field}")
242 if ":" in args.window_type:
243 window_type, window_param = args.window_type.split(
":")
244 window_param =
float(window_param)
245 argList.append(f
"-w {window_type} -r {window_param}")
247 window_type = args.window_type
249 argList.append(f
"-w {window_type}")
250 if args.overlap_fraction:
251 argList.append(f
"-P {args.overlap_fraction}")
252 if args.allow_skipping:
253 argList.append(
"--allow-skipping TRUE")
254 argStr =
" ".join(argList)
260 sft_start = startTimeThisNode
261 sft_end = sft_start + args.time_baseline
263 while sft_end <= endTimeThisNode:
265 for idx, c
in enumerate(args.channel_name):
271 kind=args.observing_kind,
272 rev=args.observing_revision,
275 miscstr=args.misc_desc,
277 outputfiles.append(filename)
278 remap.append(f
"{filename}={args.output_sft_path[idx]/filename}")
281 if args.overlap_fraction:
282 sft_start +=
int(round((1 - args.overlap_fraction) * args.time_baseline))
284 sft_start += args.time_baseline
285 sft_end = sft_start + args.time_baseline
288 dagFID.write(f
"JOB {MakeSFTs} {Path(dagFID.name).parent / 'MakeSFTs.sub'}\n")
289 dagFID.write(f
"RETRY {MakeSFTs} 1\n")
290 dagFID.write(f
'VARS {MakeSFTs} argList="{argStr}" cachefile="{cacheFile}" ')
291 if args.transfer_frame_files:
292 framefiles =
",".join([
str(fr)
for fr
in frames])
293 dagFID.write(f
'framefiles="{framefiles}" ')
294 dagFID.write(f
'tagstring="{tagStringOut}"\n')
301parser = argparse.ArgumentParser(
302 description=
"This script creates MakeSFTs.sub, MoveSFTs.sub, and a dag \
303 file that generates SFTs based on the options given.",
304 fromfile_prefix_chars=
"@",
307dag_group = parser.add_argument_group(
308 "DAG organization",
"Options for workflow control"
310datafind_group = parser.add_argument_group(
311 "Datafind",
"Options for locating frame files"
313makesfts_group = parser.add_argument_group(
314 "SFT creation",
"Options for SFT creation and output"
316deprecated_group = parser.add_argument_group(
"DEPRECATED")
318dag_group.add_argument(
323 help=
"filename for .dag file (should end in .dag)",
325dag_group.add_argument(
330 help=
"tag string used in names of various files unique to \
331 jobs that will run under the DAG",
333dag_group.add_argument(
335 "--analysis-start-time",
337 help=
"GPS start time of data from which to generate \
338 SFTs (optional and unused if a segment file is given)",
340dag_group.add_argument(
342 "--analysis-end-time",
344 help=
"GPS end time of data from which to generate SFTs \
345 (optional and unused if a segment file is given)",
347dag_group.add_argument(
349 "--max-length-all-jobs",
351 help=
"maximum total amount of data to process, in seconds \
352 (optional and unused if a segment file is given)",
354dag_group.add_argument(
358 help=
"alternative file with segments to use, rather than \
361dag_group.add_argument(
366 help=
"minimum length segments to process in seconds (used \
367 only if a segment file is given)",
369dag_group.add_argument(
371 "--synchronize-start",
373 help=
"synchronize the start times of the SFTs so that the \
374 start times are synchronized when there are gaps in the \
377dag_group.add_argument(
382 help=
"path to log, output, and error files (default \
383 is $PWD/logs; this directory is created if it does not \
384 exist and usually should be under a local file system)",
386dag_group.add_argument(
388 "--max-num-per-node",
391 help=
"maximum number of SFTs to generate on one node",
393dag_group.add_argument(
397 help=
"string specifying the lalpulsar_MakeSFTs executable, \
398 or a path to it; if not set, will use \
399 MAKESFTS_PATH env variable or system default (in that \
402dag_group.add_argument(
405 help=
"string specifying the lalpulsar_MoveSFTs executable, \
406 or a path to it; if not set, will use \
407 MOVESFTS_PATH env variable or system default (in that \
410dag_group.add_argument(
415 help=
"memory allocation in MB to request from condor for \
416 lalpulsar_MakeSFTs step",
418dag_group.add_argument(
423 help=
"disk space allocation in MB to request from condor \
424 for lalpulsar_MakeSFTs step",
426dag_group.add_argument(
428 "--accounting-group",
431 help=
"Condor tag for the production of SFTs",
433dag_group.add_argument(
435 "--accounting-group-user",
438 help=
"albert.einstein username (do not add @LIGO.ORG)",
440dag_group.add_argument(
442 "--transfer-frame-files",
444 help=
"Transfer frame files via HTCondor file transfer system. \
445 This should be specified if frames are not visible to the \
446 compute node file system. Ex. this should be specified if \
447 frames are on /home or running on the open science grid. \
448 Usually frame files are visible on CIT, LHO, LLO clusters \
449 so that this does not need to be specified in that case.",
452datafind_group.add_argument(
457 help=
"input data type for use with the gw_data_find --type \
460datafind_group.add_argument(
462 "--extra-datafind-time",
465 help=
"extra time to subtract/add from/to start/end time \
466 arguments of gw_data_find",
468datafind_group.add_argument(
472 help=
"string to use with the gw_data_find --match option",
474datafind_group.add_argument(
475 "--datafind-urltype",
478 choices=[
"file",
"osdf"],
479 help=
"String for the gw_data_find --urltype option. \
480 Use 'file' if creating SFTs on a local LDG cluster. \
481 Use 'osdf' if creating SFTs on the open science grid",
483datafind_group.add_argument(
487 help=
"path and filename to frame cache file to use instead \
491makesfts_group.add_argument(
496 help=
"For public SFTs, observing run data the SFTs are generated from, or \
497 (in the case of mock data challenge data) the observing \
498 run on which the data is most closely based",
500makesfts_group.add_argument(
504 choices=[
"RUN",
"AUX",
"SIM",
"DEV"],
505 help=
'For public SFTs, one of: "RUN" for production SFTs of h(t) channels; \
506 "AUX" for SFTs of non-h(t) channels; \
507 "SIM" for mock data challenge or other simulated data; or \
508 "DEV" for development/testing purposes',
510makesfts_group.add_argument(
512 "--observing-revision",
514 help=
"For public SFTs: revision number starts at 1, and should be incremented once \
515 SFTs have been widely distributed across clusters, advertised \
516 as being ready for use, etc. For example, if mistakes are found \
517 in the initial SFT production run after they have been published, \
518 regenerated SFTs should have a revision number of at least 2",
520makesfts_group.add_argument(
524 help=
"For private SFTs, miscellaneous part of the SFT \
525 description field in the filename",
527makesfts_group.add_argument(
529 "--filter-knee-freq",
532 help=
"high pass filter knee frequency used on time domain \
533 data before generating SFTs",
535makesfts_group.add_argument(
540 help=
"time baseline of SFTs (e.g., 60 or 1800 seconds)",
542makesfts_group.add_argument(
547 help=
"Path where to save the SFT files. Can specify multiple options, \
548 If specifying multiple options then it is required to specify the \
549 same number of output-sft-path options as the number of channels. \
550 The first listed channel will have the SFTs go into the first \
551 listed output-sft-path. Otherwise specify only one output path. \
552 If one path is specified and more than 1 channels are specified \
553 then --observing-run must be >= 1 and --observing-kind and \
554 --observing-revision must be set",
556makesfts_group.add_argument(
561 help=
"path to cache files that will be produced by \
562 gw_data_find (default is $PWD/cache; this directory is \
563 created if it does not exist and must agree with that \
564 given in .sub files)",
566makesfts_group.add_argument(
571 help=
"Name of input time-domain channel to read from frames. \
572 Can specify multiple options. The number of channels must be \
573 equal to the number of output-sft-path options given. The \
574 first listed channel will have the SFTs go to the first listed \
575 output-sft-path. Can only specify one channel when generating \
576 private SFTs (--observing-run=0)",
578makesfts_group.add_argument(
579 "-c",
"--comment-field", type=str, help=
"comment for SFT header"
581makesfts_group.add_argument(
582 "-F",
"--start-freq", type=int, default=10, help=
"start frequency of the SFTs"
584makesfts_group.add_argument(
585 "-B",
"--band", type=int, default=1990, help=
"frequency band of the SFTs"
587makesfts_group.add_argument(
591 default=
"tukey:0.001",
592 help=
'type of windowing of time-domain to do \
593 before generating SFTs, e.g. "rectangular", \
594 "hann", "tukey:<beta in [0,1], required>"; \
595 (default is "tukey:0.001", standard choice for LVK production SFTs)',
597makesfts_group.add_argument(
599 "--overlap-fraction",
602 help=
"overlap fraction (for use with windows; e.g., use \
603 --overlap-fraction 0.5 with --window-type hann windows)",
605makesfts_group.add_argument(
608 help=
"allow channels to be skipped if not in frames or too low sampling \
611makesfts_group.add_argument(
614 action=
"store_false",
615 help=
"do not validate created SFTs",
620 def __call__(self, parser, namespace, values, option_string=None):
622 f
"Argument {self.option_strings} has been deprecated in lalpulsar_MakeSFTs"
626deprecated_group.add_argument(
628 "--frame-struct-type",
630 action=DeprecateAction,
631 help=
"DEPRECATED. No longer required; \
632 the frame channel type is determined automatically",
634deprecated_group.add_argument(
638 action=DeprecateAction,
639 help=
"DEPRECATED. No longer required; \
640 the frame channel type is determined automatically",
642deprecated_group.add_argument(
646 action=DeprecateAction,
647 help=
"DEPRECATED. No longer required; \
648 the detector prefix is deduced from the channel name",
650deprecated_group.add_argument(
654 action=DeprecateAction,
655 help=
"DEPRECATED. No longer supported",
657deprecated_group.add_argument(
661 action=DeprecateAction,
662 help=
"DEPRECATED. Default behaviour",
664deprecated_group.add_argument(
668 action=DeprecateAction,
669 help=
"DEPRECATED. No longer supported",
671deprecated_group.add_argument(
675 action=DeprecateAction,
676 help=
"DEPRECATED. No longer supported",
678deprecated_group.add_argument(
682 action=DeprecateAction,
683 help=
"DEPCRECATED. No longer supported",
685deprecated_group.add_argument(
689 action=DeprecateAction,
690 help=
"DEPCRECATED. No longer supported",
692deprecated_group.add_argument(
694 "--output-jobs-per-node",
697 action=DeprecateAction,
698 help=
"DEPRECATED. No longer supported",
701args = parser.parse_args()
704if args.observing_run < 0:
705 raise parser.error(
"--observing-run must be >= 0")
707if args.observing_run > 0
and not args.observing_kind:
708 raise parser.error(
"--observing-run requires --observing-kind")
710if args.observing_run > 0
and not args.observing_revision:
711 raise parser.error(
"--observing-run requires --observing-revision")
713if args.observing_revision
and args.observing_revision <= 0:
714 raise parser.error(
"--observing-revision must be > 0")
716if args.observing_run > 0
and args.misc_desc:
718 f
"--observing-run={args.observing_run} incompatible with --misc-desc"
721if args.misc_desc
and not re.compile(
r"^[A-Za-z0-9]+$").match(args.misc_desc):
722 raise parser.error(
"--misc-desc may only contain A-Z, a-z, 0-9 characters")
724if args.extra_datafind_time < 0:
725 raise parser.error(
"--extra-datafind-time must be >= 0")
727if args.filter_knee_freq < 0:
728 raise parser.error(
"--filter-knee-freq must be >= 0")
730if args.time_baseline <= 0:
731 raise parser.error(
"--time-baseline must be > 0")
733if args.overlap_fraction < 0.0
or args.overlap_fraction >= 1.0:
734 raise parser.error(
"--overlap-fraction must be in the range [0,1)")
736if args.start_freq < 0.0
or args.start_freq >= 7192.0:
737 raise parser.error(
"--start-freq must be in the range [0,7192)")
739if args.band <= 0
or args.band >= 8192.0:
740 raise parser.error(
"--band must be in the range (0,8192)")
742if args.start_freq + args.band >= 8192.0:
743 raise parser.error(
"--start-freq + --band must be < 8192")
745if args.max_num_per_node <= 0:
746 raise parser.error(
"--max-num-per-node must be > 0")
749 len(args.channel_name) != len(args.output_sft_path)
750 and len(args.output_sft_path) != 1
753 "--channel-name and --output-sft-path must be the "
754 "same length or --output-sft-path must be length of 1"
757if len(args.channel_name) > 1
and args.observing_run == 0:
759 "When creating SFTs from multiple channels, public SFT naming "
760 "convention must be used: --observing-run > 0 and set "
761 "--observing-kind and --observing-revision"
764if args.datafind_urltype ==
"osdf" and not args.transfer_frame_files:
766 "--transfer-frame-files must be specified when --datafind-urltype=osdf"
770makeSFTsExe =
"lalpulsar_MakeSFTs"
771if args.makesfts_path:
772 if args.makesfts_path.is_file():
773 makeSFTsExe = args.makesfts_path
775 makeSFTsExe = args.makesfts_path / makeSFTsExe
776elif "MAKESFTS_PATH" in os.environ:
777 makeSFTsExe = Path(
"$ENV(MAKESFTS_PATH)") / makeSFTsExe
779 makeSFTsExe = Path(
"@LALSUITE_BINDIR@") / makeSFTsExe
781moveSFTsExe =
"lalpulsar_MoveSFTs"
782if args.movesfts_path:
783 if args.movesfts_path.is_file():
784 moveSFTsExe = args.movesfts_path
786 moveSFTsExe = args.movesfts_path / moveSFTsExe
787elif "MOVESFTS_PATH" in os.environ:
788 moveSFTsExe = Path(
"$ENV(MOVESFTS_PATH)") / moveSFTsExe
790 moveSFTsExe = Path(
"@LALSUITE_BINDIR@") / moveSFTsExe
793args.log_path.mkdir(exist_ok=
True)
794args.cache_path.mkdir(exist_ok=
True)
795for p
in args.output_sft_path:
796 p.mkdir(exist_ok=
True)
799segList = segmentlist()
800adjustSegExtraTime =
False
801if args.segment_file
is not None:
802 if args.min_seg_length < 0:
803 raise parser.error(
"--min-seg-length must be >= 0")
807 adjustSegExtraTime =
True
809 with open(args.segment_file)
as fp_segfile:
810 for idx, line
in enumerate(fp_segfile):
811 splitLine = line.split()
812 oneSeg = segment(
int(splitLine[0]),
int(splitLine[1]))
813 if abs(oneSeg) >= args.min_seg_length:
814 segList.append(oneSeg)
817 raise ValueError(f
"No segments found in segment file: {args.segment_file}")
819 if args.analysis_start_time
is None:
821 "--analysis-start-time must be specified if no segment file is " "given"
824 if args.analysis_end_time
is None:
826 "--analysis-start-time must be specified if no segment file is " "given"
829 if args.max_length_all_jobs
is None:
831 "--max-length-all-jobs must be specified if no segment file is " "given"
835 if args.analysis_end_time > (args.analysis_start_time + args.max_length_all_jobs):
836 args.analysis_end_time = args.analysis_start_time + args.max_length_all_jobs
838 oneSeg = segment(args.analysis_start_time, args.analysis_end_time)
839 segList.append(oneSeg)
844site = args.channel_name[0][0]
850if not args.transfer_frame_files:
854 "--transfer-frame-files must be specified when frame files are in /home"
858dataSegs = segmentlist()
860 dataSegs.append(file_segment(url))
871path_to_dag_file = args.dag_file.parent
872dag_filename = args.dag_file.name
873makesfts_sub = path_to_dag_file /
"MakeSFTs.sub"
874movesfts_sub = path_to_dag_file /
"MoveSFTs.sub"
877with open(makesfts_sub,
"w")
as MakeSFTsFID:
878 MakeSFTsLogFile = f
"{args.log_path}/MakeSFTs_{dag_filename}.log"
879 MakeSFTsFID.write(
"universe = vanilla\n")
880 MakeSFTsFID.write(f
"executable = {makeSFTsExe}\n")
881 MakeSFTsFID.write(
"arguments = $(argList)\n")
882 MakeSFTsFID.write(f
"accounting_group = {args.accounting_group}\n")
883 MakeSFTsFID.write(f
"accounting_group_user = {args.accounting_group_user}\n")
884 MakeSFTsFID.write(f
"log = {MakeSFTsLogFile}\n")
885 MakeSFTsFID.write(f
"error = {args.log_path}/MakeSFTs_$(tagstring).err\n")
886 MakeSFTsFID.write(f
"output = {args.log_path}/MakeSFTs_$(tagstring).out\n")
887 MakeSFTsFID.write(
"notification = never\n")
888 MakeSFTsFID.write(f
"request_memory = {args.request_memory}MB\n")
889 MakeSFTsFID.write(f
"request_disk = {args.request_disk}MB\n")
890 MakeSFTsFID.write(
"RequestCpus = 1\n")
891 MakeSFTsFID.write(
"should_transfer_files = yes\n")
892 if args.transfer_frame_files:
893 MakeSFTsFID.write(
"transfer_input_files = $(cachefile),$(framefiles)\n")
895 MakeSFTsFID.write(
"transfer_input_files = $(cachefile)\n")
896 if "MAKESFTS_PATH" in os.environ
and not args.makesfts_path:
897 MakeSFTsFID.write(
"getenv = MAKESFTS_PATH\n")
898 if args.datafind_urltype ==
"osdf":
899 MakeSFTsFID.write(
"use_oauth_services = scitokens\n")
901 "environment = BEARER_TOKEN_FILE=$$(CondorScratchDir)/.condor_creds/scitokens.use\n"
903 MakeSFTsFID.write(
"queue 1\n")
906with open(movesfts_sub,
"w")
as MoveSFTsFID:
907 MoveSFTsLogFile = f
"{args.log_path}/MoveSFTs_{dag_filename}.log"
908 MoveSFTsFID.write(
"universe = local\n")
909 MoveSFTsFID.write(f
"executable = {moveSFTsExe}\n")
910 MoveSFTsFID.write(
"arguments = ")
911 if not args.validate:
912 MoveSFTsFID.write(
"$(opts) ")
913 MoveSFTsFID.write(
"-s $(sourcedirectory) -c $(channels) -d $(destdirectory)\n")
914 MoveSFTsFID.write(f
"accounting_group = {args.accounting_group}\n")
915 MoveSFTsFID.write(f
"accounting_group_user = {args.accounting_group_user}\n")
916 MoveSFTsFID.write(f
"log = {MoveSFTsLogFile}\n")
917 MoveSFTsFID.write(f
"error = {args.log_path}/MoveSFTs.err\n")
918 MoveSFTsFID.write(f
"output = {args.log_path}/MoveSFTs.out\n")
919 MoveSFTsFID.write(
"notification = never\n")
920 MoveSFTsFID.write(f
"request_memory = 1GB\n")
921 MoveSFTsFID.write(f
"request_disk = 10MB\n")
922 MoveSFTsFID.write(
"RequestCpus = 1\n")
923 if "MOVESFTS_PATH" in os.environ
and not args.movesfts_path:
924 MoveSFTsFID.write(
"getenv = MOVESFTS_PATH\n")
925 MoveSFTsFID.write(
"queue 1\n")
928with open(args.dag_file,
"w")
as dagFID:
929 startTimeAllNodes =
None
930 firstSFTstartTime = 0
941 if adjustSegExtraTime
and not args.synchronize_start:
942 segStartTime = seg[0]
951 segExtraTime = (segEndTime - segStartTime) % args.time_baseline
957 if args.overlap_fraction != 0.0:
958 if (segEndTime - segStartTime) > args.time_baseline:
960 segEndTime - segStartTime - args.time_baseline
961 ) %
int((1.0 - args.overlap_fraction) * args.time_baseline)
965 segExtraStart =
int(segExtraTime / 2)
966 segExtraEnd = segExtraTime - segExtraStart
967 args.analysis_start_time = segStartTime + segExtraStart
971 if args.analysis_start_time > segEndTime:
972 args.analysis_start_time = segEndTime
976 args.analysis_end_time = segEndTime - segExtraEnd
980 if args.analysis_end_time < segStartTime:
981 args.analysis_end_time = segStartTime
986 elif args.synchronize_start:
987 segStartTime = seg[0]
992 if firstSFTstartTime == 0:
993 firstSFTstartTime = segStartTime
997 args.analysis_start_time = (
1001 (segStartTime - firstSFTstartTime)
1002 / ((1.0 - args.overlap_fraction) * args.time_baseline)
1004 * (1.0 - args.overlap_fraction)
1005 * args.time_baseline
1013 if args.analysis_start_time > segEndTime:
1014 args.analysis_start_time = segEndTime
1018 args.analysis_end_time = (
1022 (segEndTime - args.analysis_start_time - args.time_baseline)
1023 / ((1.0 - args.overlap_fraction) * args.time_baseline)
1025 * (1.0 - args.overlap_fraction)
1026 * args.time_baseline
1029 + args.time_baseline
1030 + args.analysis_start_time
1035 if args.analysis_end_time < segStartTime:
1036 args.analysis_end_time = segStartTime
1041 args.analysis_start_time = seg[0]
1042 args.analysis_end_time = seg[1]
1046 startTimeThisNode = args.analysis_start_time
1047 endTimeThisNode = args.analysis_start_time
1048 endTimeAllNodes = args.analysis_start_time
1049 while endTimeAllNodes < args.analysis_end_time:
1052 if args.overlap_fraction != 0.0:
1055 endTimeAllNodes = endTimeAllNodes + args.time_baseline
1057 endTimeAllNodes = endTimeAllNodes +
int(
1058 (1.0 - args.overlap_fraction) * args.time_baseline
1062 endTimeAllNodes = endTimeAllNodes + args.time_baseline
1063 if endTimeAllNodes <= args.analysis_end_time:
1066 numThisNode = numThisNode + 1
1067 numThisSeg = numThisSeg + 1
1068 endTimeThisNode = endTimeAllNodes
1069 if numThisNode < args.max_num_per_node:
1073 nodeCount = nodeCount + 1
1076 startTimeAllNodes = startTimeThisNode
1087 if args.overlap_fraction != 0.0:
1089 startTimeThisNode = endTimeThisNode -
int(
1090 (args.overlap_fraction) * args.time_baseline
1094 startTimeThisNode = endTimeThisNode
1100 nodeCount = nodeCount + 1
1103 startTimeAllNodes = startTimeThisNode
1105 dagFID, nodeCount, startTimeThisNode, endTimeThisNode, urls, args
1112 dagFID.write(f
"JOB MoveSFTs {Path(dagFID.name).parent / 'MoveSFTs.sub'}\n")
1113 dagFID.write(f
"RETRY MoveSFTs 1\n")
1114 dagFID.write(f
"VARS MoveSFTs ")
1115 if not args.validate:
1116 dagFID.write(
'opts="--no-validate" ')
1118 f
'sourcedirectory="." '
1119 f
"channels=\"{' '.join(args.channel_name)}\" "
1120 f
"destdirectory=\"{' '.join([str(p) for p in args.output_sft_path])}\"\n"
1123 f
"PARENT {' '.join([f'MakeSFTs_{n}' for n in range(1, nodeCount+1)])} CHILD MoveSFTs\n"
1129endTimeAllNodes = endTimeThisNode
1131if startTimeAllNodes
is None:
1132 raise Exception(
"The startTimeAllNodes == none; the DAG file contains no jobs!")
1134if endTimeAllNodes <= startTimeAllNodes:
1136 "The endTimeAllNodes <= startTimeAllNodes; the DAG file contains no jobs!"
1139print(startTimeAllNodes, endTimeAllNodes)
DEPRECATED OPTIONS #####.
def __call__(self, parser, namespace, values, option_string=None)
def make_cache(urls, job_seg)
Make a frame list and cache list from a list of URLs.
def sft_name_from_vars(obs, gpsstart, Tsft, channel=None, kind=None, rev=None, window="unknown", par=None, miscstr=None)
Create SFT file name from specification.
def writeToDag(dagFID, nodeCount, startTimeThisNode, endTimeThisNode, urls, args)
Write one job to DAG file.
def get_urls(args)
Get frame file URL list from gwdatafind or cache file.