[docs]classBilbyOnline(Bilby):""" Bilby Pipeline---Online Replication Configuration. Parameters ---------- production : :class:`asimov.Production` The production object. category : str, optional, default="C01_offline" The category of the job. Raises ------ PipelineException Should a non BilbyOnline production be used to initiate the run """
def__init__(self,production,category=None):super(Bilby,self).__init__(production,category)self.logger.info("Using the bilby pipeline in online replication configuration")
ifnotproduction.pipeline.lower()=="bilbyonline":raisePipelineException("Pipeline does not match")
[docs]defbuild_dag(self,dryrun=False):""" Construct a DAG file in order to submit a production to the condor scheduler using `bilby_pipe_gracedb`. Parameters ---------- dryrun : bool, optional, default=False If true commands will not be run and will instead be printed to standard out. Raises ------ PipelineException Raised if the construction of the DAG fails. """self.logger.info(f"Working in {pathlib.Path.cwd()}")ifnotself.production.rundir:self.production.rundir=pathlib.Path.expanduser("~").joinpath(self.production.event.name,self.production.name,)ifnotpathlib.Path(self.production.rundir).is_dir():pathlib.Path(self.production.rundir).mkdir(parents=True)self.event_id=self.resolve_grace_id()json_data=read_from_gracedb(self.event_id,config.get("gracedb","url"),self.production.rundir)json_file=str(pathlib.Path(self.production.rundir).joinpath(f"{self.event_id}.json"))psd_file=self.psd_file(json_data)settings_fp,likelihood_mode=self.mass_settings(json_data)webdir=str(pathlib.Path(config.get("general","webroot")).joinpath(f"{self.production.event.name}",f"{self.production.name}"))if"channels"inself.production.meta:channel_dict=self.production.meta["channels"]else:channel_dict="online"command=[str(pathlib.Path(config.get("pipelines","environment")).joinpath("bin","bilby_pipe_gracedb")),"--settings",settings_fp,"--cbc-likelihood-mode",likelihood_mode,"--webdir",webdir,"--outdir",self.production.rundir,"--json",json_file,"--psd-file",psd_file,"--channel-dict",channel_dict,]ifdryrun:print(" ".join(command))else:self.logger.info(" ".join(command))pipe=subprocess.Popen(command,stdout=subprocess.PIPE,stderr=subprocess.STDOUT)out,err=pipe.communicate()self.logger.info(out)iferror"DAG generation complete"notinstr(out):self.production.status="stuck"self.logger.error(err)raisePipelineException(f"DAG file could not be created.\n{command}\n{out}\n{err}",production=self.production.name,)returnPipelineLogger(message=out,production=self.production.name)returnNone
[docs]defresolve_grace_id(self)->str:""" Establish the correct GID for the selected event Returns -------- grace_id : str GraceDB Event ID (Gname) Raises ------ ValueError Raised when GraceID cannot be identified from metadata or cannot be found on GraceDB """grace_id=Noneif"ligo"inself.production.meta:ligo_dict=self.production.meta["ligo"]if"gname"inligo_dict:grace_id=ligo_dict["gname"]elif"preferred event"inligo_dict:grace_id=ligo_dict["preferred event"]elif"sname"inligo_dict:grace_id=self.get_gname_from_sname(grace_id)ifgrace_idisNone:ifself.production.event.name.startswith("G"):grace_id=self.production.event.nameelifself.production.event.name.startswith("S"):grace_id=self.get_gname_from_sname(self.production.event.name)else:raiseValueError("Unable to resolve GraceDB ID from provided information")returngrace_id
[docs]defget_gname_from_sname(self,sname)->str:""" Gets the preferred event Gname from the given Sname. Will retrieve the preferred event. Parameters ---------- sname : str GraceDB ID for the Superevent (Sname). Returns ------- gname : str GraceDB ID for the preferred Event (Gname). Raises ------ ValueError If Sname does not recover an associated Gname """gracedb_server=config.get("gracedb","url")gracedb=GraceDb(service_url=gracedb_server)try:superevent_data=gracedb.superevent(sname).json()gname=superevent_data["preferred_event_data"]["graceid"]exceptHTTPErrorasexc:raiseHTTPError(f"Unable to retrieve {sname} from gracedb")fromexcreturngname
[docs]defmass_settings(self,json_data)->tuple[str,str]:""" Determines settings for run based on best fitting template chirp mass. Parameters ---------- json_data : dict GraceDB meta data. Returns ------- settings_fp : str Path to settings for binary type. likelihood_mode : str Contains the ROQ type to use. Raises ------ ValueError If settings cannot be found or the chirp mass is incompatible with setting types """mchirp=float(json_data["extra_attributes"]["CoincInspiral"]["mchirp"])mass_settings=Noneif"mass settings"inself.production.meta:mass_settings=self.production.meta["mass settings"]elif"pipelines"inself.production.meta:pipelines=self.production.meta["pipelines"]if"bilbyonline"inself.production.meta["pipelines"]:if"mass settings"inpipelines["bilbyonline"]:mass_settings=pipelines["bilbyonline"]["mass settings"]ifmass_settingsisNone:raiseValueError("No mass settings available")defaults=mass_settings.pop("defaults",None)ifdefaults:settings_fp=defaults["settings file"]likelihood_mode=defaults["likelihood mode"]forkey,settingsinmass_settings.items():lower=float(settings["low mass bound"])higher=float(settings["high mass bound"])iflower<=mchirp<higher:settings_fp=settings["settings file"]likelihood_mode=settings["likelihood mode"]breakelse:ifnotdefaults:raiseValueError(f"{mchirp} did not have associated settings nor were defaults""available")returnsettings_fp,likelihood_mode
[docs]defpsd_file(self,json_data)->str:""" Establishes which file contains the PSD information Parameters ---------- json_data : dict Contains the metadata retrieved from GraceDB Returns ------- psd_file : str Path to XML file containing PSD information Raises ------ ValueError If unable to retrieve a PSD XML from GraceDB """psd_file=Noneif"coinc_file"injson_data:coinc=json_data["coinc_file"]ifos=json_data["instruments"].split(",")psds_present=0forifoinifos:try:_=FrequencySeries.read(coinc,instrument=ifo)psds_present+=1breakexceptValueError:continueifbool(psds_present):psd_file=coincifpsd_fileisNone:gracedb_server=config.get("gracedb","url")gracedb=GraceDb(service_url=gracedb_server)try:data=gracedb.files(self.event_id,"psd.xml.gz")exceptHTTPErrorasexc:raiseValueError(f"Unable to retrieve PSDs for {self.event_id}")fromexcpsd_file=pathlib.Path(self.production.rundir).joinpath(f"{self.production.event.name}_psd.xml.gz")withopen(psd_file,"wb")asfb:fb.write(data.read())returnstr(psd_file)
[docs]defsubmit_dag(self,dryrun=False):""" Submit a DAG file to the condor cluster Parameters ---------- dryrun : bool, optional, default=False If true, the DAG will not be submitted but all commands will be printed to standard out. Returns ------- int The cluster ID assigned to the running DAG file. PipelineLogger The pipeline logger message. Raises ------ PipelineException Raised if the pipeline fails to submit the job. """self.logger.info(f"Working in {pathlib.Path.cwd()}")self.before_submit()try:dag_filename=f"dag_{self.event_id}.submit"command=["condor_submit_dag","-batch-name",f"bilby_online/{self.production.event.name}/{self.production.name}",str(pathlib.Path(self.production.rundir).joinpath("submit",dag_filename)),]ifdryrun:print(" ".join(command))else:self.logger.info(" ".join(command))dagman=subprocess.Popen(command,stdout=subprocess.PIPE,stderr=subprocess.STDOUT)out,err=dagman.communicate()if"submitted to cluster"instr(out):cluster=re.search(r"submitted to cluster ([\d]+)",str(out)).groups()[0]self.logger.info("Submitted successfully."f"Running with job ID {int(cluster)}")self.production.status="running"self.production.job_id=int(cluster)returncluster,PipelineLogger(out)self.logger.error("Could not submit the job to the cluster")self.logger.info(out)self.logger.error(err)raisePipelineException("The DAG file could not be submitted.",)exceptFileNotFoundErroraserror:self.logger.exception(error)raisePipelineException("It looks like condor isn't installed on this system\n"f"I wanted to run {' '.join(command)}")fromerror