Source code for bilby_pipe.job_creation.nodes.pe_summary_node

import os

from ...utils import BilbyPipeError, logger
from ..node import Node


[docs] class PESummaryNode(Node):
[docs] run_node_on_osg = True
def __init__(self, inputs, merged_node_list, generation_node_list, dag): super().__init__(inputs)
[docs] self.dag = dag
[docs] self.job_name = f"{inputs.label}_pesummary"
[docs] self.request_cpus = 1
n_results = len(merged_node_list) result_files = [merged_node.result_file for merged_node in merged_node_list] labels = [merged_node.label for merged_node in merged_node_list] if self.inputs.transfer_files or self.inputs.osg: input_files_to_transfer = [ self._relative_topdir(fname, self.inputs.initialdir) for fname in result_files ] + inputs.additional_transfer_paths files = [self.inputs.complete_ini_file] if len(generation_node_list) == 1: files.append(generation_node_list[0].data_dump_file) input_files_to_transfer.extend( [ self._relative_topdir(fname, self.inputs.initialdir) for fname in files ] ) for value in [ self.inputs.psd_dict, self.inputs.spline_calibration_envelope_dict, ]: input_files_to_transfer.extend(self.extract_paths_from_dict(value)) if self.transfer_container: input_files_to_transfer.append(self.inputs.container) input_files_to_transfer, need_scitokens = self.job_needs_authentication( input_files_to_transfer ) if need_scitokens: self.extra_lines.extend(self.scitoken_lines) self.extra_lines.extend( self._condor_file_transfer_lines( input_files_to_transfer, [self._relative_topdir(self.inputs.webdir, self.inputs.initialdir)], ) ) self.setup_arguments( add_ini=False, add_unknown_args=False, add_command_line_args=False ) self.arguments.add("webdir", self.inputs.webdir) if self.inputs.email is not None: self.arguments.add("email", self.inputs.email) self.arguments.add( "config", " ".join([self.inputs.complete_ini_file] * n_results) ) result_files = [os.path.relpath(fname) for fname in result_files] self.arguments.add("samples", f"{' '.join(result_files)}") # Using append here as summary pages doesn't take a full name for approximant self.arguments.append("-a") self.arguments.append(" ".join([self.inputs.waveform_approximant] * n_results)) if len(generation_node_list) == 1: self.arguments.add( "gwdata", os.path.relpath(generation_node_list[0].data_dump_file) ) elif len(generation_node_list) > 1: logger.info( "Not adding --gwdata to PESummary job as there are multiple files" ) existing_dir = self.inputs.existing_dir if existing_dir is not None: self.arguments.add("existing_webdir", existing_dir) if isinstance(self.inputs.summarypages_arguments, dict): if "labels" not in self.inputs.summarypages_arguments.keys(): self.arguments.append(f"--labels {' '.join(labels)}") else: if len(labels) != len(result_files): raise BilbyPipeError( "Please provide the same number of labels for postprocessing " "as result files" ) not_recognised_arguments = {} for key, val in self.inputs.summarypages_arguments.items(): if key == "nsamples_for_skymap": self.arguments.add("nsamples_for_skymap", val) elif key == "gw": self.arguments.add_flag("gw") elif key == "no_ligo_skymap": self.arguments.add_flag("no_ligo_skymap") elif key == "burnin": self.arguments.add("burnin", val) elif key == "kde_plot": self.arguments.add_flag("kde_plot") elif key == "gracedb": self.arguments.add("gracedb", val) elif key == "palette": self.arguments.add("palette", val) elif key == "include_prior": self.arguments.add_flag("include_prior") elif key == "notes": self.arguments.add("notes", val) elif key == "publication": self.arguments.add_flag("publication") elif key == "labels": self.arguments.add("labels", f"{' '.join(val)}") else: not_recognised_arguments[key] = val if not_recognised_arguments != {}: logger.warn( "Did not recognise the summarypages_arguments {}. To find " "the full list of available arguments, please run " "summarypages --help".format(not_recognised_arguments) ) self.process_node() for merged_node in merged_node_list: self.job.add_parent(merged_node.job) @property
[docs] def executable(self): return self._get_executable_path("summarypages")
@property
[docs] def request_memory(self): return "16 GB"
@property
[docs] def log_directory(self): return self.inputs.summary_log_directory