[docs]classGenerationNode(Node):def__init__(self,inputs,trigger_time,idx,dag,parent=None):""" Node for data generation jobs Parameters ---------- inputs: bilby_pipe.main.MainInput The user-defined inputs trigger_time: float The trigger time to use in generating analysis data idx: int The index of the data-generation job, used to label data products dag: bilby_pipe.dag.Dag The dag structure parent: bilby_pipe.job_creation.node.Node (optional) Any job to set as the parent to this job - used to enforce dependencies """super().__init__(inputs,retry=3)ifnotinputs.osgandinputs.generation_pool=="igwn-pool":raiseBilbyPipeError("Generation job requested to use the igwn-pool ""(OSG, --generation-pool=igwn-pool), but --osg=False")else:self.run_node_on_osg=inputs.generation_pool=="igwn-pool"
self.setup_arguments()self.arguments.add("label",self.label)self.arguments.add("idx",self.idx)self.arguments.add("trigger-time",self.trigger_time)ifself.inputs.injection_fileisnotNone:self.arguments.add("injection-file",self.inputs.injection_file)ifself.inputs.timeslide_fileisnotNone:self.arguments.add("timeslide-file",self.inputs.timeslide_file)frame_files,success=self.resolve_frame_filesneed_scitokens=notsuccessifself.inputs.transfer_filesorself.inputs.osg:input_files_to_transfer=list()forattrin["complete_ini_file","prior_file","injection_file","gps_file","timeslide_file",]:if(value:=getattr(self.inputs,attr))isnotNone:# input_files_to_transfer.append(str(value))input_files_to_transfer.append(os.path.abspath(str(value)))ifself.transfer_container:input_files_to_transfer.append(self.inputs.container)forvaluein[self.inputs.psd_dict,self.inputs.spline_calibration_envelope_dict,frame_files,]:input_files_to_transfer.extend(self.extract_paths_from_dict(value))input_files_to_transfer.extend(self.inputs.additional_transfer_paths)input_files_to_transfer,need_auth=self.job_needs_authentication(input_files_to_transfer)need_scitokens=need_scitokensorneed_authself.extra_lines.extend(self._condor_file_transfer_lines(input_files_to_transfer,[self._relative_topdir(self.inputs.outdir,self.inputs.initialdir)],))self.arguments.add("outdir",os.path.relpath(self.inputs.outdir))elifnew_frames:=[fnameforfnameinself.extract_paths_from_dict(frame_files)iffname.startswith(self.inputs.data_find_urltype)]:logger.warning("The following frame files were identified by gwdatafind for this analysis. ""These frames may not be found by the data generation stage as file ""transfer is not being used. You should either set transfer-files=True or ""pass these frame files to the data-dict option. You may need to "f"remove a prefix, e.g., file://localhost.\n\t{new_frames}")ifneed_scitokens:self.extra_lines.extend(self.scitoken_lines)self.process_node()ifparent:self.job.add_parent(parent.job)@property
[docs]defresolve_frame_files(self):""" Resolve frame files from frame_type_dict and data_dict. For each detector, if the frame filepath(s) is given return the filepath(s), otherwise use gwdatafind to resolve the frame files using the provided frame type. Returns ------- output: list list of frame filepaths success: bool True if frame files are resolved successfully for all detectors """fromgwdatafindimportfind_urlsfromgwpy.io.datafindimportfind_best_frametypefromrequests.exceptionsimportHTTPErrorsuccess=Trueifself.inputs.gaussian_noiseorself.inputs.zero_noise:returnlist(),successelifself.inputs.channel_dictisNone:raiseBilbyPipeError("channel-dict must be provided if not using gaussian-noise or zero-noise")data=dict()ifself.inputs.frame_type_dictisnotNone:data=self.inputs.frame_type_dictifself.inputs.data_dictisnotNone:data.update(self.inputs.data_dict)output=dict()fordetinself.inputs.detectors:if(self.inputs.channel_dictisnotNoneandself.inputs.channel_dict[det]=="GWOSC"):logger.info(f"Skipping datafind for {det} as GWOSC data is used.")elifisinstance(data.get(det,None),list):output[det]=data[det]elifos.path.exists(data.get(det,"/not/a/real/file")):output[det]=[data[det]]else:start_time=self.inputs.start_timeend_time=self.inputs.start_time+self.inputs.durationif(self.inputs.psd_dictisNoneorself.inputs.psd_dict.get(det,None)isNone):start_time-=self.inputs.psd_duration# If data_find_url is not set, use the environment variable# GWDATAFIND_SERVER, otherwise use the default from bilby_pipe.utilsifself.inputs.data_find_urlisNone:datafind_server=os.environ.get("GWDATAFIND_SERVER")ifdatafind_serverisNone:logger.warning(("GWDATAFIND_SERVER not set, using default "f"gwdatafind server: {DEFAULT_GWDATAFIND_SERVER}"))datafind_server=DEFAULT_GWDATAFIND_SERVERelse:datafind_server=self.inputs.data_find_urlifdetnotindata:channel_name=self.inputs.channel_dict[det]ifnotchannel_name.startswith(f"{det}:"):channel_name=f"{det}:{channel_name}"frame_type=find_best_frametype(channel_name,start_time,end_time,host=datafind_server,)else:frame_type=data[det]kwargs=dict(site=det[0],gpsstart=start_time,gpsend=end_time,urltype=self.inputs.data_find_urltype,host=datafind_server,on_gaps="error",frametype=frame_type,)log_function_call("gwdatafind.find_urls",kwargs)try:output[det]=find_urls(**kwargs)logger.info(f"Found frame files with {frame_type}")except(HTTPError,RuntimeError):logger.warning(f"Failed to resolve frame files for detector {det}, the generation ""job will attempt with gwpy.get.")success=Falsereturnoutput,success
[docs]defuniverse(self):ifself.inputs.local_generation:logger.debug("Data generation done locally: please do not use this when ""submitting a large number of jobs")universe="local"else:logger.debug(f"All data will be grabbed in the {self._universe} universe")universe=self._universereturnuniverse