20 """Creates DAGs to run jobs that generates SFTs"""
27 from lalpulsar
import git_version
29 __author__ =
"Evan Goetz <evan.goetz@ligo.org>, Greg Mendell"
30 __version__ = git_version.id
31 __date__ = git_version.date
75 def writeToDag(dagFID, nodeCount, startTimeThisNode, endTimeThisNode, site, args):
76 datafind = f
"datafind_{nodeCount}"
77 MakeSFTs = f
"MakeSFTs_{nodeCount}"
78 startTimeDatafind = startTimeThisNode - args.extra_datafind_time
79 endTimeDatafind = endTimeThisNode + args.extra_datafind_time
80 tagStringOut = f
"{args.tag_string}_{nodeCount}"
82 cacheFile = args.cache_file
85 f
"{args.cache_path}/{site}-{startTimeDatafind}-{endTimeDatafind}.cache"
89 argList.append(f
"-O {args.observing_run}")
90 if args.observing_run > 0:
91 argList.append(f
"-K {args.observing_kind}")
92 argList.append(f
"-R {args.observing_revision}")
94 argList.append(f
"-X {args.misc_desc}")
95 argList.append(f
"-f {args.filter_knee_freq}")
96 argList.append(f
"-t {args.time_baseline}")
97 argList.append(f
"-p {','.join(args.output_sft_path)}")
98 argList.append(f
"-C {cacheFile}")
99 argList.append(f
"-s {startTimeThisNode}")
100 argList.append(f
"-e {endTimeThisNode}")
101 argList.append(f
"-N {','.join(args.channel_name)}")
102 argList.append(f
"-F {args.start_freq}")
103 argList.append(f
"-B {args.band}")
104 if args.comment_field:
105 argList.append(f
"-c {args.comment_field}")
107 if ":" in args.window_type:
108 window_type, window_param = args.window_type.split(
":")
109 argList.append(f
"-w {window_type} -r {window_param}")
111 argList.append(f
"-w {args.window_type}")
112 if args.overlap_fraction:
113 argList.append(f
"-P {args.overlap_fraction}")
114 if args.allow_skipping:
115 argList.append(
"--allow-skipping TRUE")
116 argStr =
" ".join(argList)
119 if not args.cache_file:
121 f
"JOB {datafind} {os.path.join(os.path.dirname(dagFID.name), 'datafind.sub')}\n"
123 dagFID.write(f
"RETRY {datafind} 1\n")
125 f
'VARS {datafind} gpsstarttime="{startTimeDatafind}" gpsendtime="{endTimeDatafind}" observatory="{site}" inputdatatype="{args.input_data_type}" tagstring="{tagStringOut}"\n'
130 f
"JOB {MakeSFTs} {os.path.join(os.path.dirname(dagFID.name), 'MakeSFTs.sub')}\n"
132 dagFID.write(f
"RETRY {MakeSFTs} 1\n")
133 dagFID.write(f
'VARS {MakeSFTs} argList="{argStr}" tagstring="{tagStringOut}"\n')
134 if not args.cache_file:
135 dagFID.write(f
"PARENT {datafind} CHILD {MakeSFTs}\n")
142 parser = argparse.ArgumentParser(
143 description=
"This script creates datafind.sub, MakeSFTs.sub, and a dag \
144 file that generates SFTs based on the options given.",
145 fromfile_prefix_chars=
"@",
152 help=
"For public SFTs, observing run data the SFTs are generated from, or \
153 (in the case of mock data challenge data) the observing \
154 run on which the data is most closely based",
160 choices=[
"RUN",
"AUX",
"SIM",
"DEV"],
161 help=
'For public SFTs, one of: "RUN" for production SFTs of h(t) channels; \
162 "AUX" for SFTs of non-h(t) channels; \
163 "SIM" for mock data challenge or other simulated data; or \
164 "DEV" for development/testing purposes',
168 "--observing-revision",
170 help=
"For public SFTs: revision number starts at 1, and should be incremented once \
171 SFTs have been widely distributed across clusters, advertised \
172 as being ready for use, etc. For example, if mistakes are found \
173 in the initial SFT production run after they have been published, \
174 regenerated SFTs should have a revision number of at least 2",
180 help=
"For private SFTs, miscellaneous part of the SFT \
181 description field in the filename",
185 "--analysis-start-time",
187 help=
"GPS start time of data from which to generate \
188 SFTs (optional and unused if a segment file is given)",
192 "--analysis-end-time",
194 help=
"GPS end time of data from which to generate SFTs \
195 (optional and unused if a segment file is given)",
202 help=
"filename for .dag file (should end in .dag)",
209 help=
"tag string used in names of various files unique to \
210 jobs that will run under the DAG",
217 help=
"input data type for use with the gw_data_find --type \
222 "--extra-datafind-time",
225 help=
"extra time to subtract/add from/to start/end time \
226 arguments of gw_data_find",
232 help=
"string to use with the gw_data_find --match option",
236 "--synchronize-start",
238 help=
"synchronize the start times of the SFTs so that the \
239 start times are synchronized when there are gaps in the \
244 "--filter-knee-freq",
247 help=
"high pass filter knee frequency used on time domain \
248 data before generating SFTs",
255 help=
"time baseline of SFTs (e.g., 60 or 1800 seconds)",
258 "-p",
"--output-sft-path", nargs=
"+", type=str, help=
"path to output SFTs"
265 help=
"path to cache files that will be produced by \
266 gw_data_find (default is $PWD/cache; this directory is \
267 created if it does not exist and must agree with that \
268 given in .sub files)",
274 help=
"path and filename to frame cache file to use instead \
282 help=
"path to log, output, and error files (default \
283 is $PWD/logs; this directory is created if it does not \
284 exist and usually should be under a local file system)",
291 help=
"name of input time-domain channel to read from \
297 help=
"allow channels to be skipped if not in frames or too low sampling \
300 parser.add_argument(
"-c",
"--comment-field", type=str, help=
"comment for SFT header")
302 "-F",
"--start-freq", type=int, default=10, help=
"start frequency of the SFTs"
305 "-B",
"--band", type=int, default=1990, help=
"frequency band of the SFTs"
311 help=
'type of windowing of time-domain to do \
312 before generating SFTs, e.g. "rectangular", \
313 "hann", "tukey:<beta in [0,1], required>"; \
314 if unspecified use lalpulsar_MakeSFTs defaults',
318 "--overlap-fraction",
321 help=
"overlap fraction (for use with windows; e.g., use \
322 --overlap-fraction 0.5 with --window-type hann windows)",
326 "--max-num-per-node",
329 help=
"maximum number of SFTs to generate on one node",
333 "--max-length-all-jobs",
335 help=
"maximum total amount of data to process, in seconds \
336 (optional and unused if a segment file is given)",
342 help=
"alternative file with segments to use, rather than \
350 help=
"minimum length segments to process in seconds (used \
351 only if a segment file is given)",
357 help=
"file with list of nodes on which to output SFTs",
363 help=
"path to nodes to output SFTs; the node name is \
364 appended to this path, followed by path given by the -p \
365 option; for example, if -q point to file with the list \
366 node1 node2 ... and the -Q /data/ -p /frames/S5/sfts/LHO \
367 options are given, the first output file will go into \
368 /data/node1/frames/S5/sfts/LHO; the next node in the list \
369 is used in constructing the path when the number of jobs \
370 given by the -r option reached, and so on",
374 "--output-jobs-per-node",
377 help=
"number of jobs to output per node in the list of \
378 nodes given with the -q option",
384 help=
"string specifying the gw_data_find executable, \
385 or a path to it; if not set, will use \
386 LSC_DATAFIND_PATH env variable or system default (in \
393 help=
"string specifying the lalpulsar_MakeSFTs executable, \
394 or a path to it; if not set, will use \
395 MAKESFTS_PATH env variable or system default (in that \
403 help=
"memory allocation in MB to request from condor for \
404 lalpulsar_MakeSFTs step",
411 help=
"disk space allocation in MB to request from condor \
412 for lalpulsar_MakeSFTs step",
416 "--accounting-group",
419 help=
"Condor tag for the production of SFTs",
423 "--accounting-group-user",
426 help=
"albert.einstein username (do not add @LIGO.ORG)",
432 def __call__(self, parser, namespace, values, option_string=None):
434 f
"Argument {self.option_strings} has been deprecated in lalpulsar_MakeSFTs"
440 "--frame-struct-type",
442 action=DeprecateAction,
443 help=
"DEPRECATED. No longer required; \
444 the frame channel type is determined automatically",
450 action=DeprecateAction,
451 help=
"DEPRECATED. No longer required; \
452 the frame channel type is determined automatically",
458 action=DeprecateAction,
459 help=
"DEPRECATED. No longer required; \
460 the detector prefix is deduced from the channel name",
466 action=DeprecateAction,
467 help=
"DEPRECATED. No longer supported",
473 action=DeprecateAction,
474 help=
"DEPRECATED. Default behaviour",
480 action=DeprecateAction,
481 help=
"DEPRECATED. No longer supported",
487 action=DeprecateAction,
488 help=
"DEPRECATED. No longer supported",
491 args = parser.parse_args()
494 if args.observing_run < 0:
495 raise argparse.error(
"--observing-run must be >= 0")
497 if args.observing_run > 0
and not args.observing_kind:
498 raise argparse.error(
"--observing-run requires --observing-kind")
500 if args.observing_run > 0
and not args.observing_revision:
501 raise argparse.error(
"--observing-run requires --observing-revision")
503 if args.observing_revision
and args.observing_revision <= 0:
504 raise argparse.error(
"--observing-revision must be > 0")
506 if args.observing_run > 0
and args.misc_desc:
507 raise argparse.error(
508 f
"--observing-run={args.observing_run} incompatible with --misc-desc"
511 if args.misc_desc
and not re.compile(
r"^[A-Za-z0-9]+$").match(args.misc_desc):
512 raise argparse.error(
"--misc-desc may only contain A-Z, a-z, 0-9 characters")
514 if args.extra_datafind_time < 0:
515 raise argparse.error(
"--extra-datafind-time must be >= 0")
517 if args.filter_knee_freq < 0:
518 raise argparse.error(
"--filter-knee-freq must be >= 0")
520 if args.time_baseline <= 0:
521 raise argparse.error(
"--time-baseline must be > 0")
523 if args.overlap_fraction < 0.0
or args.overlap_fraction >= 1.0:
524 raise argparse.error(
"--overlap-fraction must be in the range [0,1)")
526 if args.start_freq < 0.0
or args.start_freq >= 7192.0:
527 raise argparse.error(
"--start-freq must be in the range [0,7192)")
529 if args.band <= 0
or args.band >= 8192.0:
530 raise argparse.error(
"--band must be in the range (0,8192)")
532 if args.start_freq + args.band >= 8192.0:
533 raise argparse.error(
"--start-freq + --band must be < 8192")
535 if args.max_num_per_node <= 0:
536 raise argparse.error(
"--max-num-per-node must be > 0")
539 len(args.channel_name) != len(args.output_sft_path)
540 and len(args.output_sft_path) != 1
542 raise argparse.error(
543 "--channel-name and --output-sft-path must be the "
544 "same length or --output-sft-path must be length of 1"
548 dataFindExe =
"gw_data_find"
549 if args.datafind_path:
550 if os.path.isfile(args.datafind_path):
551 dataFindExe = args.datafind_path
553 dataFindExe = os.path.join(args.datafind_path, dataFindExe)
554 elif "LSC_DATAFIND_PATH" in os.environ:
555 dataFindExe = os.path.join(
"$ENV(LSC_DATAFIND_PATH)", dataFindExe)
557 dataFindExe = os.path.join(
"/usr/bin", dataFindExe)
559 makeSFTsExe =
"lalpulsar_MakeSFTs"
560 if args.makesfts_path:
561 if os.path.isfile(args.makesfts_path):
562 makeSFTsExe = args.makesfts_path
564 makeSFTsExe = os.path.join(args.makesfts_path, makeSFTsExe)
565 elif "MAKESFTS_PATH" in os.environ:
566 makeSFTsExe = os.path.join(
"$ENV(MAKESFTS_PATH)", makeSFTsExe)
568 makeSFTsExe = os.path.join(
"@LALSUITE_BINDIR@", makeSFTsExe)
572 os.mkdir(args.log_path)
575 if not args.cache_file:
577 os.mkdir(args.cache_path)
584 savedOutputSFTPath =
None
585 if args.list_of_nodes
is not None:
586 if args.node_path
is None:
587 raise argparse.error(
"Node file list given, but no node path specified")
589 if args.output_jobs_per_node < 1:
590 raise argparse.error(
591 "Node file list given, but invalid output jobs per node specified"
594 with open(args.list_of_nodes)
as fp_nodelist:
595 for idx, line
in enumerate(fp_nodelist):
596 splitLine = line.split()
597 nodeList.append(splitLine[0])
598 if len(nodeList) < 1:
600 "No nodes found in node list file: {}".format(args.list_of_nodes)
605 savedOutputSFTPath = args.output_sft_path
610 adjustSegExtraTime =
False
611 if args.segment_file
is not None:
612 if args.min_seg_length < 0:
613 raise argparse.error(
"--min-seg-length must be >= 0")
617 adjustSegExtraTime =
True
619 with open(args.segment_file)
as fp_segfile:
620 for idx, line
in enumerate(fp_segfile):
621 splitLine = line.split()
623 oneSeg.append(
int(splitLine[0]))
624 oneSeg.append(
int(splitLine[1]))
625 if (oneSeg[1] - oneSeg[0]) >= args.min_seg_length:
626 segList.append(oneSeg)
630 "No segments found in segment file: {}".format(args.segment_file)
633 if args.analysis_start_time
is None:
634 raise argparse.error(
635 "--analysis-start-time must be specified if no segment file is \
639 if args.analysis_end_time
is None:
640 raise argparse.error(
641 "--analysis-start-time must be specified if no segment file is \
645 if args.max_length_all_jobs
is None:
646 raise argparse.error(
647 "--max-length-all-jobs must be specified if no segment file is \
652 if args.analysis_end_time > (args.analysis_start_time + args.max_length_all_jobs):
653 args.analysis_end_time = args.analysis_start_time + args.max_length_all_jobs
656 oneSeg.append(args.analysis_start_time)
657 oneSeg.append(args.analysis_end_time)
658 segList.append(oneSeg)
662 site = args.channel_name[0][0]
668 path_to_dag_file = os.path.dirname(args.dag_file)
669 dag_filename = os.path.basename(args.dag_file)
670 datafind_sub = os.path.join(path_to_dag_file,
"datafind.sub")
671 makesfts_sub = os.path.join(path_to_dag_file,
"MakeSFTs.sub")
674 if not args.cache_file:
675 with open(datafind_sub,
"w")
as datafindFID:
676 datafindLogFile = f
"{args.log_path}/datafind_{dag_filename}.log"
677 datafindFID.write(
"universe = vanilla\n")
678 datafindFID.write(f
"executable = {dataFindExe}\n")
679 datafindFID.write(
"arguments = ")
680 datafindFID.write(
"--observatory $(observatory) --url-type file ")
681 datafindFID.write(
"--gps-start-time $(gpsstarttime) ")
682 datafindFID.write(
"--gps-end-time $(gpsendtime) --lal-cache --gaps ")
683 datafindFID.write(f
"--type $(inputdatatype)")
684 if args.datafind_match:
685 datafindFID.write(f
" --match {args.datafind_match}\n")
687 datafindFID.write(
"\n")
689 "getenv = *DATAFIND*, KRB5*, X509*, BEARER_TOKEN*, SCITOKEN*\n"
691 datafindFID.write(
"request_disk = 5MB\n")
692 datafindFID.write(
"request_memory = 2000MB\n")
693 datafindFID.write(f
"accounting_group = {args.accounting_group}\n")
694 datafindFID.write(f
"accounting_group_user = {args.accounting_group_user}\n")
695 datafindFID.write(f
"log = {datafindLogFile}\n")
696 datafindFID.write(f
"error = {args.log_path}/datafind_$(tagstring).err\n")
697 datafindFID.write(f
"output = {args.cache_path}/")
698 datafindFID.write(
"$(observatory)-$(gpsstarttime)-$(gpsendtime).cache\n")
699 datafindFID.write(
"notification = never\n")
700 datafindFID.write(
"queue 1\n")
703 with open(makesfts_sub,
"w")
as MakeSFTsFID:
704 MakeSFTsLogFile =
"{}/MakeSFTs_{}.log".format(args.log_path, dag_filename)
705 MakeSFTsFID.write(
"universe = vanilla\n")
706 MakeSFTsFID.write(
"executable = {}\n".format(makeSFTsExe))
707 MakeSFTsFID.write(
"arguments = $(argList)\n")
708 MakeSFTsFID.write(
"accounting_group = {}\n".format(args.accounting_group))
709 MakeSFTsFID.write(
"accounting_group_user = {}\n".format(args.accounting_group_user))
710 MakeSFTsFID.write(
"log = {}\n".format(MakeSFTsLogFile))
711 MakeSFTsFID.write(
"error = {}/MakeSFTs_$(tagstring).err\n".format(args.log_path))
712 MakeSFTsFID.write(
"output = {}/MakeSFTs_$(tagstring).out\n".format(args.log_path))
713 MakeSFTsFID.write(
"notification = never\n")
714 MakeSFTsFID.write(f
"request_memory = {args.request_memory}MB\n")
715 MakeSFTsFID.write(f
"request_disk = {args.request_disk}MB\n")
716 MakeSFTsFID.write(
"RequestCpus = 1\n")
717 MakeSFTsFID.write(
"queue 1\n")
720 with open(args.dag_file,
"w")
as dagFID:
721 startTimeAllNodes =
None
722 firstSFTstartTime = 0
734 if adjustSegExtraTime
and not args.synchronize_start:
735 segStartTime = seg[0]
744 segExtraTime = (segEndTime - segStartTime) % args.time_baseline
750 if args.overlap_fraction != 0.0:
751 if (segEndTime - segStartTime) > args.time_baseline:
753 segEndTime - segStartTime - args.time_baseline
754 ) %
int((1.0 - args.overlap_fraction) * args.time_baseline)
758 segExtraStart =
int(segExtraTime / 2)
759 segExtraEnd = segExtraTime - segExtraStart
760 args.analysis_start_time = segStartTime + segExtraStart
764 if args.analysis_start_time > segEndTime:
765 args.analysis_start_time = segEndTime
769 args.analysis_end_time = segEndTime - segExtraEnd
773 if args.analysis_end_time < segStartTime:
774 args.analysis_end_time = segStartTime
779 elif args.synchronize_start:
780 segStartTime = seg[0]
785 if firstSFTstartTime == 0:
786 firstSFTstartTime = segStartTime
790 args.analysis_start_time = (
794 (segStartTime - firstSFTstartTime)
795 / ((1.0 - args.overlap_fraction) * args.time_baseline)
797 * (1.0 - args.overlap_fraction)
806 if args.analysis_start_time > segEndTime:
807 args.analysis_start_time = segEndTime
811 args.analysis_end_time = (
815 (segEndTime - args.analysis_start_time - args.time_baseline)
816 / ((1.0 - args.overlap_fraction) * args.time_baseline)
818 * (1.0 - args.overlap_fraction)
823 + args.analysis_start_time
828 if args.analysis_end_time < segStartTime:
829 args.analysis_end_time = segStartTime
834 args.analysis_start_time = seg[0]
835 args.analysis_end_time = seg[1]
839 startTimeThisNode = args.analysis_start_time
840 endTimeThisNode = args.analysis_start_time
841 endTimeAllNodes = args.analysis_start_time
842 while endTimeAllNodes < args.analysis_end_time:
845 if args.overlap_fraction != 0.0:
848 endTimeAllNodes = endTimeAllNodes + args.time_baseline
850 endTimeAllNodes = endTimeAllNodes +
int(
851 (1.0 - args.overlap_fraction) * args.time_baseline
855 endTimeAllNodes = endTimeAllNodes + args.time_baseline
856 if endTimeAllNodes <= args.analysis_end_time:
859 numThisNode = numThisNode + 1
860 numThisSeg = numThisSeg + 1
861 endTimeThisNode = endTimeAllNodes
862 if numThisNode < args.max_num_per_node:
866 nodeCount = nodeCount + 1
869 args.output_sft_path = (
871 + nodeList[nodeListIndex]
874 if (nodeCount % args.output_jobs_per_node) == 0:
875 nodeListIndex = nodeListIndex + 1
880 startTimeAllNodes = startTimeThisNode
891 if args.overlap_fraction != 0.0:
893 startTimeThisNode = endTimeThisNode -
int(
894 (args.overlap_fraction) * args.time_baseline
898 startTimeThisNode = endTimeThisNode
904 nodeCount = nodeCount + 1
907 args.output_sft_path = (
908 args.node_path + nodeList[nodeListIndex] + savedOutputSFTPath
910 if (nodeCount % args.output_jobs_per_node) == 0:
911 nodeListIndex = nodeListIndex + 1
916 startTimeAllNodes = startTimeThisNode
918 dagFID, nodeCount, startTimeThisNode, endTimeThisNode, site, args
925 endTimeAllNodes = endTimeThisNode
927 if startTimeAllNodes
is None:
928 raise Exception(
"The startTimeAllNodes == none; the DAG file contains no jobs!")
930 if endTimeAllNodes <= startTimeAllNodes:
932 "The endTimeAllNodes <= startTimeAllNodes; the DAG file contains no jobs!"
935 print(startTimeAllNodes, endTimeAllNodes)
DEPRECATED OPTIONS #####.
def __call__(self, parser, namespace, values, option_string=None)
def writeToDag(dagFID, nodeCount, startTimeThisNode, endTimeThisNode, site, args)