Loading [MathJax]/extensions/TeX/AMSmath.js
LALPulsar 7.1.1.1-5e288d3
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Modules Pages
knope_utils.py
Go to the documentation of this file.
1"""
2 Classes needed for the known pulsar search pipeline.
3
4 (C) 2006, 2015 Matthew Pitkin
5
6"""
7
8# make print statements python 3-proof
9from __future__ import print_function, division
10
11__author__ = "Matthew Pitkin <matthew.pitkin@ligo.org>"
12__date__ = "$Date$"
13__version__ = "$Revision$"
14
15import os
16import re
17from lal import pipeline
18import sys
19import ast
20import json
21import subprocess as sp
22import shutil
23import uuid
24from configparser import RawConfigParser
25import urllib.parse as urlparse
26from copy import deepcopy
27import numpy as np
28import pickle
29from scipy import optimize
30from collections import OrderedDict
31from lalpulsar import pulsarpputils as pppu
32
33# set some specific error codes and messages
34KNOPE_ERROR_GENERAL = -1
35KNOPE_ERROR_NO_SEGMENTS = 2
36
37KNOPE_ERROR_GENERAL_MSG = "Error... an error has occurred during DAG creation"
38KNOPE_ERROR_NO_SEGMENTS_MSG = (
39 "No required data segments were available to perform the analysis"
40)
41
42# dictionary of error code/message pairs
43KNOPE_ERROR = {
44 KNOPE_ERROR_GENERAL: KNOPE_ERROR_GENERAL_MSG,
45 KNOPE_ERROR_NO_SEGMENTS: KNOPE_ERROR_NO_SEGMENTS_MSG,
46}
47
48# set some specific warning codes
49KNOPE_WARNING_NO_SEGMENTS = 102
50
51"""
52Class for setting up the DAG for the whole known pulsar search pipeline
53"""
54
55
56class knopeDAG(pipeline.CondorDAG):
57 def __init__(self, cp, configfilename, pulsarlist=None):
58 """
59 Initialise with ConfigParser cp object and the filename of the config file.
60
61 If an error occurs the error_code variables will be set to -1. The value will stay as 0 on success.
62 """
63
64 self.error_code = (
65 0 # set error_code to KNOPE_ERROR_GENERAL following any failures
66 )
67 self.warning_code = 0
68 self.config = cp
69 if pulsarlist is not None:
70 if isinstance(pulsarlist, list):
71 self.pulsarlist = pulsarlist
72 else:
73 print("Error... 'pulsarlist' argument must be 'None' or a list.")
74 self.error_code = KNOPE_ERROR_GENERAL
75 return
76
77 # if just re-doing post-processing try reading in previuos analysis pickle file
79 "analysis", "postprocessing_only", cftype="boolean", default=False
80 )
81 prevdag = None
82 if self.postonly:
83 preprocessed_pickle = self.get_config_option(
84 "analysis", "preprocessed_pickle_object"
85 )
86
87 if preprocessed_pickle == None:
88 print(
89 "Error... trying post-processing only, but no previous pickle file is given",
90 file=sys.stderr,
91 )
92 self.error_code = KNOPE_ERROR_GENERAL
93 return
94 else:
95 try:
96 fp = open(preprocessed_pickle, "rb")
97 prevdag = pickle.load(fp)
98 fp.close()
99 except:
100 print(
101 "Error... trying post-processing only, but previous pickle file '%s' cannot be read in"
102 % preprocessed_pickle,
103 file=sys.stderr,
104 )
105 self.error_code = KNOPE_ERROR_GENERAL
106 return
107
108 # Get run directory
110 "analysis", "run_dir", cftype="dir", default=os.getcwd()
111 )
112 if self.error_code != 0:
113 return # quit class if there's been an error
114
115 # a list of allowed IFO names
116 allowed_ifos = ["H1", "H2", "L1", "G1", "V1", "T1", "K1"]
117
118 # Get the interferometers to analyse
119 self.ifos = self.get_config_option("analysis", "ifos", "list")
120 if self.error_code != 0:
121 return # quit class if there's been an error
122
123 # make sure ifo is an allowed value
124 for ifo in self.ifos:
125 if ifo not in allowed_ifos:
126 print(
127 "Error... you have specified an unknown IFO '%s'" % ifo,
128 file=sys.stderr,
129 )
130 self.error_code = KNOPE_ERROR_GENERAL
131 return
132
133 if (
134 self.postonly
135 ): # check that this analysis uses the same, or a subset of, the previous IFOs
136 for ifo in self.ifos:
137 if ifo not in prevdag.ifos:
138 print(
139 "Error... for 'post-processing-only' the current IFOs must be a subset of those in the previous run",
140 file=sys.stderr,
141 )
142 self.error_code = KNOPE_ERROR_GENERAL
143 return
144
145 # Get frequency factors (e.g. 1f and 2f) for analysis (default to twice the rotation frequency)
147 "analysis", "freq_factors", cftype="list", default=[2.0]
148 )
149 if len(self.freq_factors) > 2: # cannot have more than two values
150 print(
151 "Warning... only up to two frequency factors can be given. Defaulting to [2.]"
152 )
153 self.freq_factors = [2.0]
154 if len(self.freq_factors) == 2: # if there are two values they must be 1 and 2
155 if 1.0 not in self.freq_factors and 2.0 not in self.freq_factors:
156 print(
157 "Warning... if giving two frequency factors they must be [1., 2.]. Defaulting to this"
158 )
159 self.freq_factors = [1.0, 2.0]
160 for ff in self.freq_factors:
161 if ff <= 0.0:
162 print(
163 "Warning... frequency factors cannot be negative. Defaulting to [2.]"
164 )
165 self.freq_factors = [2.0]
166
167 if (
168 self.postonly
169 ): # check that this analysis uses the same, or a subset of, the previous frequency factors
170 for ff in self.freq_factors:
171 if ff not in prevdag.freq_factors:
172 print(
173 "Error... for 'post-processing-only' the current frequency factors must be a subset of those in the previous run",
174 file=sys.stderr,
175 )
176 self.error_code = KNOPE_ERROR_GENERAL
177 return
178
179 # Get the base directory for the preprocessing analysis
180 if not self.postonly:
182 "analysis", "preprocessing_base_dir", cftype="dict"
183 )
184 if self.error_code != 0:
185 return
186 else:
187 self.preprocessing_base_dir = prevdag.preprocessing_base_dir
188
189 # try making directory/check if exists
190 if not self.postonly:
191 for ifo in self.ifos:
192 self.mkdirs(self.preprocessing_base_dir[ifo])
193 if self.error_code != 0:
194 return
195
196 # Get the start and end time of the analysis
197 # see if starttime and endtime are integers, or dictionaries (of integers or lists of integers)
198 self.starttime = self.get_config_option("analysis", "starttime")
199 if self.error_code != 0:
200 return
201 self.starttime = ast.literal_eval(self.starttime) # check if int or dict
202 if isinstance(self.starttime, int): # convert to dictionary
203 stdict = {}
204 for ifo in self.ifos:
205 stdict[ifo] = [self.starttime] # convert to list
206 self.starttime = stdict
207 elif isinstance(self.starttime, dict): # check each detector has a start time
208 for ifo in self.ifos:
209 if ifo not in self.starttime:
210 print(
211 "Error... 'starttime' either be a single 'int', or a dictionary containing all detectors with an integer or list of integers.",
212 file=sys.stderr,
213 )
214 self.error_code = KNOPE_ERROR_GENERAL
215 return
216
217 for stkey in dict(self.starttime):
218 if isinstance(self.starttime[stkey], int):
219 # convert to list
220 self.starttime[stkey] = [self.starttime[stkey]]
221 elif isinstance(self.starttime[stkey], list):
222 # check all values are int
223 if len(
224 [v for v in self.starttime[stkey] if isinstance(v, int)]
225 ) != len(self.starttime[stkey]):
226 print(
227 "Error... 'starttime' either be a single 'int', or a dictionary containing all detectors with an integer or list of integers.",
228 file=sys.stderr,
229 )
230 self.error_code = KNOPE_ERROR_GENERAL
231 return
232 else:
233 print(
234 "Error... 'starttime' either be a single 'int', or a dictionary containing all detectors with an integer or list of integers.",
235 file=sys.stderr,
236 )
237 self.error_code = KNOPE_ERROR_GENERAL
238 return
239 else:
240 print(
241 "Error... 'starttime' either be a single 'int', or a dictionary containing all detectors with an integer or list of integers.",
242 file=sys.stderr,
243 )
244 self.error_code = KNOPE_ERROR_GENERAL
245 return
246
247 self.endtime = self.get_config_option("analysis", "endtime")
248 if self.error_code != 0:
249 return
250 self.endtime = ast.literal_eval(self.endtime) # check if int or dict
251 if isinstance(self.endtime, int): # convert to dictionary
252 etdict = {}
253 for ifo in self.ifos:
254 etdict[ifo] = [self.endtime] # convert to list
255 self.endtime = etdict
256 elif isinstance(self.endtime, dict): # check each detector has an end time
257 for ifo in self.ifos:
258 if ifo not in self.endtime:
259 print(
260 "Error... 'endtime' either be a single 'int', or a dictionary containing all detectors with an integer or list of integers.",
261 file=sys.stderr,
262 )
263 self.error_code = KNOPE_ERROR_GENERAL
264 return
265
266 for etkey in dict(self.endtime):
267 if isinstance(self.endtime[etkey], int):
268 # convert to list
269 self.endtime[etkey] = [self.endtime[etkey]]
270 elif isinstance(self.endtime[etkey], list):
271 # check all values are int
272 if len(
273 [v for v in self.endtime[etkey] if isinstance(v, int)]
274 ) != len(self.endtime[etkey]):
275 print(
276 "Error... 'endtime' either be a single 'int', or a dictionary containing all detectors with an integer or list of integers.",
277 file=sys.stderr,
278 )
279 self.error_code = KNOPE_ERROR_GENERAL
280 return
281 else:
282 print(
283 "Error... 'endtime' either be a single 'int', or a dictionary containing all detectors with an integer or list of integers.",
284 file=sys.stderr,
285 )
286 self.error_code = KNOPE_ERROR_GENERAL
287 return
288 else:
289 print(
290 "Error... 'endtime' either be a single 'int', or a dictionary containing all detectors with an integer or list of integers.",
291 file=sys.stderr,
292 )
293 self.error_code = KNOPE_ERROR_GENERAL
294 return
295
296 # check start time and end time lists are consistent
297 self.ndatasets = (
298 {}
299 ) # dictionary of number of seperate datasets (e.g. 2 different observing runs) for each detector
300 for ifo in self.ifos:
301 if len(self.starttime[ifo]) != len(self.endtime[ifo]):
302 print(
303 "Error... 'starttime' and 'endtime' have an inconsistent number of entries.",
304 file=sys.stderr,
305 )
306 self.error_code = KNOPE_ERROR_GENERAL
307 return
308 else:
309 self.ndatasets[ifo] = len(self.starttime[ifo])
310
311 # Get the pre-processing engine (heterodyne or SplInter - default to heterodyne)
312 if not self.postonly:
314 "analysis", "preprocessing_engine", default="heterodyne"
315 )
316 if self.error_code != 0:
317 return
318
319 if self.engine not in ["heterodyne", "splinter"]:
320 print(
321 "Warning... 'preprocessing_engine' value '%s' not recognised. Defaulting to 'heterodyne'."
322 % (self.engine)
323 )
324 self.engine = "heterodyne"
325 else:
326 self.engine = prevdag.engine
327
328 # Get the solar system ephemeris path
329 try:
330 defaultephempath = os.environ["LALPULSAR_DATADIR"]
331 except:
332 defaultephempath = None
334 "analysis", "ephem_path", cftype="dir", default=defaultephempath
335 )
336 if self.error_code != 0:
337 return
338
339 # Get Condor accounting info (default to ligo.prod.o1.cw.targeted.bayesian)
341 "condor", "accounting_group", default="ligo.prod.o1.cw.targeted.bayesian"
342 )
343 if self.error_code != 0:
344 return
345
346 if "cw.targeted.bayesian" not in self.accounting_group:
347 print(
348 "Error... the 'accounting_group' should contain 'cw.targeted.bayesian'",
349 file=sys.stderr,
350 )
351 self.error_code = KNOPE_ERROR_GENERAL
352 return
353
354 if cp.has_option("condor", "accounting_group_user"):
355 self.accounting_group_user = cp.get("condor", "accounting_group_user")
356
357 if len(self.accounting_group_user) == 0:
358 self.accounting_group_user = None # user not specified
359 else:
360 self.accounting_group_user = None # user not specified
361
362 # Get the analysis run directory (where the DAG and sub files will be created and run from) (default to current working directory)
364 "analysis", "run_dir", default=os.getcwd()
365 )
366 self.mkdirs(self.run_dir)
367 if self.error_code != 0:
368 return
369
370 # Get the analysis log directory
371 self.log_dir = self.get_config_option("analysis", "log_dir")
372 self.mkdirs(self.log_dir)
373 if self.error_code != 0:
374 return
375
376 uniqueid = str(uuid.uuid4().hex)
377 daglog = self.get_config_option(
378 "analysis", "dag_name", default="knope-" + uniqueid + ".log"
379 )
380 if (
381 len(daglog) == 0
382 ): # if no dag_name was given, but dag_name was still in .ini file
383 daglog = "known_pulsar_pipeline-" + uniqueid + ".log"
384 self.daglogfile = os.path.join(self.log_dir, daglog)
385 # initialise DAG
386 pipeline.CondorDAG.__init__(self, self.daglogfile)
387
388 # set dag file
389 dagname = self.get_config_option(
390 "analysis", "dag_name", default="knope-" + uniqueid
391 )
392 if (
393 len(dagname) == 0
394 ): # if no dag_name was given, but dag_name was stillin .ini file
395 dagname = "known_pulsar_pipeline-" + uniqueid
396 self.set_dag_file(os.path.join(self.run_dir, dagname))
397
398 # Check if running in autonomous mode
400 "analysis", "autonomous", cftype="boolean", default=False
401 )
402 if self.error_code != 0:
403 return
404
405 if self.autonomous:
406 initialstart = self.get_config_option(
407 "analysis", "autonomous_initial_start", cftype="int"
408 )
409 if self.error_code != 0:
410 return
411
412 # convert to dictionary (to be consistent with start time)
414 for ifo in self.ifos:
415 self.initial_start[ifo] = [
416 initialstart
417 ] # convert to list to be consistent with self.starttimes
418 self.initial_start = self.initial_start
419 else:
420 self.initial_start = self.starttime
421
422 # Get pulsars to analyse (in the future this should be able to get pulsars from a database based on
423 # certain parameters)
424 self.pulsar_param_dir = self.get_config_option("analysis", "pulsar_param_dir")
425 if not os.path.isdir(self.pulsar_param_dir) and not os.path.isfile(
427 ):
428 print(
429 "Error... pulsar parameter file/directory '%s' does not exist!"
430 % self.pulsar_param_dir,
431 file=sys.stderr,
432 )
433 self.error_code = KNOPE_ERROR_GENERAL
434 return
435
436 if self.postonly:
437 if prevdag.pulsar_param_dir != self.pulsar_param_dir:
438 print(
439 "Error... for 'post-processing-only' the pulsar parameter directory must be that same as in the previous run",
440 file=sys.stderr,
441 )
442 self.error_code = KNOPE_ERROR_GENERAL
443 return
444
446 []
447 ) # a list of pulsar .par files that have not been modified since the last run
449 []
450 ) # a list of pulsar .par files that are new, or have been modified since the last run
452 []
453 ) # a list of dictionaries (one for each pulsar) containing the modification time file and modification time
454
455 # Go through pulsar directory and check for any modified/new files
456 if os.path.isdir(self.pulsar_param_dir):
457 self.param_files = [
458 os.path.join(self.pulsar_param_dir, pf)
459 for pf in os.listdir(self.pulsar_param_dir)
460 if ".par" in pf
461 and os.path.isfile(os.path.join(self.pulsar_param_dir, pf))
462 and ".mod_" not in pf
463 ]
464
465 if len(self.param_files) == 0:
466 print(
467 "Error... no pulsar parameter files found in '%s'"
468 % self.pulsar_param_dir,
469 file=sys.stderr,
470 )
471 self.error_code = KNOPE_ERROR_GENERAL
472 return
473
474 self.param_files.sort() # sort the files into alphabetical order
475 elif os.path.isfile(
477 ): # if a single file convert into list
478 self.param_files = [self.pulsar_param_dir]
479 else:
480 print(
481 "Error... pulsar parameter file or directory '%s' does not exist"
482 % self.pulsar_param_dir,
483 file=sys.stderr,
484 )
485 self.error_code = KNOPE_ERROR_GENERAL
486 return
487
489 {}
490 ) # dictionary keyed by PSRJ and containing the .par file path for all analysed pulsars
492 {}
493 ) # dictionary keyed on .par files for all skipped pulsars (including reason for exclusion)
494
495 if not self.postonly:
496 for par in self.param_files:
497 # check that there is a PSRJ name in the .par file - if not then skip this pulsar
498 # Get par file data
499 try:
500 psr = pppu.psr_par(par)
501 except:
502 print(
503 "Could not read in parameter file '%s'. Skipping this pulsar."
504 % par
505 )
506 self.skipped_pulsars[par] = "Could not read in parameter file"
507 continue
508
509 # check that there is a PSRJ name in the .par file (this is required)
510 if "PSRJ" not in psr.__dict__:
511 print(
512 "Could not read 'PSRJ' value from '%s'. Skipping this pulsar."
513 % par
514 )
515 self.skipped_pulsars[par] = "Could not read 'PSRJ' value"
516 continue
517
518 # check if we're just using selected input pulsars
519 if pulsarlist is not None:
520 if psr["PSRJ"] not in pulsarlist:
521 continue
522
523 # check that (if the pulsar is in a binary) it is an allowed binary type (given what's coded in lalpulsar)
524 if "BINARY" in psr.__dict__:
525 bintype = psr["BINARY"]
526 if bintype not in [
527 "BT",
528 "BT1P",
529 "BT2P",
530 "BTX",
531 "ELL1",
532 "DD",
533 "DDS",
534 "MSS",
535 "T2",
536 ]:
537 print(
538 "Binary type '%s' in '%s' is not recognised. Skipping this pulsar."
539 % (bintype, par)
540 )
541 self.skipped_pulsars[par] = (
542 "Binary type '%s' is currenlty not recognised" % bintype
543 )
544 continue
545
546 # check that .par file contains ephemeris information and units (if not present defaults will be used - see get_ephemeris)
547 if psr["EPHEM"] != None:
548 if psr["EPHEM"] not in [
549 "DE200",
550 "DE405",
551 "DE414",
552 "DE421",
553 "DE430",
554 "DE435",
555 "DE436",
556 ]:
557 print(
558 "Unregconised ephemeris '%s' in '%s'. Skipping this source"
559 % (psr["EPHEM"], par)
560 )
561 self.skipped_pulsars[par] = (
562 "Unregconised ephemeris '%s'" % psr["EPHEM"]
563 )
564 continue
565
566 if psr["UNITS"] != None:
567 if psr["UNITS"] not in ["TCB", "TDB"]:
568 print(
569 "Unregconised time units '%s' in '%s'. Skipping this source"
570 % (psr["UNITS"], par)
571 )
572 self.skipped_pulsars[par] = (
573 "Unregconised ephemeris '%s'" % psr["UNITS"]
574 )
575 continue
576
577 self.analysed_pulsars[psr["PSRJ"]] = par
578
579 # create parfile modification time file
580 modtimefile = os.path.join(
581 os.path.dirname(par), ".mod_" + os.path.basename(par)
582 )
583
584 # Check if modification time file exists
585 if not os.path.isfile(modtimefile):
586 # add par file to modified pulsars list
587 self.modified_pulsars.append(par)
588
589 # create modification time file
590 modtime = os.stat(par).st_mtime
591 self.modification_files.append(
592 {"file": modtimefile, "time": str(modtime)}
593 )
594 else: # check whether par file modification time is consistent with value in modtimefile
595 parmodtime = str(os.stat(par).st_mtime)
596 fm = open(modtimefile, "r")
597 try:
598 oldmodtime = fm.readline().strip()
599 except:
600 print(
601 "Warning... could not read modification time from '%s'. Assuming file is modified"
602 % modtimefile
603 )
604 oldmodtime = -1.23456789
605 fm.close()
606
607 if parmodtime == oldmodtime: # file is unmodified
608 self.unmodified_pulsars.append(par)
609 else: # file is modified
610 self.modification_files.append(
611 {"file": modtimefile, "time": parmodtime}
612 ) # update the time in the .mod file
613 self.modified_pulsars.append(par)
614
615 if pulsarlist is not None:
616 if len(self.analysed_pulsars) == 0:
617 print(
618 "Could not find any of the listed pulsars '[%s]' in the .par file directory '%s'."
619 % (", ".join(pulsarlist), self.pulsar_param_dir)
620 )
621 self.error_code = KNOPE_ERROR_GENERAL
622 return
623
625 []
626 ) # to contain a list of pairs of segment files - the former to be concatenated into the latter if required
627
628 if not self.postonly:
629 # Setup pre-processing jobs (datafind, segment finding and heterodyne/splinter running) for each pulsar
631 if self.error_code != 0:
632 return
633
634 # Check whether only the preprocessing (heterodyne or splinter) jobs are required
636 "analysis", "preprocessing_only", cftype="boolean", default=False
637 )
638 if self.error_code != 0:
639 return
640 else:
641 # use information from previous run
642 self.preprocessing_only = False
643 self.remove_job = None
644 if pulsarlist is None:
645 self.unmodified_pulsars = prevdag.unmodified_pulsars
646 self.modified_pulsars = prevdag.modified_pulsars
647 self.analysed_pulsars = prevdag.analysed_pulsars
648 self.skipped_pulsars = prevdag.skipped_pulsars
649 self.processed_files = prevdag.processed_files
650 else: # if analysing only selected pulsars (from pulsarlist) just get information on them
651 self.processed_files = {}
652 for psrl in pulsarlist:
653 if psrl not in prevdag.analysed_pulsars:
654 print(
655 "Error... specified pulsar '%s' could not be found in previous run pickle file '%s'."
656 % (psrl, preprocessed_pickle)
657 )
658 self.error_code = KNOPE_ERROR_GENERAL
659 return
660
661 if psrl in prevdag.unmodified_pulsars:
662 self.unmodified_pulsars[psrl] = prevdag.unmodified_pulsars[psrl]
663 if psrl in prevdag.modified_pulsars:
664 self.modified_pulsars[psrl] = prevdag.modified_pulsars[psrl]
665
666 self.analysed_pulsars[psrl] = prevdag.analysed_pulsars[psrl]
667 self.processed_files[psrl] = prevdag.processed_files[psrl]
668
669 if self.preprocessing_only: # end class initialisation
670 # output par file modification times (do this now, so that modification files are overwritten if the script had failed earlier)
671 for pitem in self.modification_files:
672 fm = open(pitem["file"], "w")
673 fm.write(pitem["time"])
674 fm.close()
675
676 # output amended segment list files
677 for sfs in self.segment_file_update:
678 p = sp.Popen("cat " + sfs[0] + " >> " + sfs[1], shell=True)
679 p.communicate()
680 if p.returncode != 0:
681 print(
682 "Warning... could not append segments to previous segments file. No log of previous segments will be available."
683 )
684 return
685
686 # setup parameter estimation
688 if self.error_code != 0:
689 return
690
691 # setup post-processing page creation
693
694 ### FINAL CLOSE OUT ITEMS
695
696 # output par file modification times (do this now, so that modification files are overwritten if the script had failed earlier)
697 for pitem in self.modification_files:
698 fm = open(pitem["file"], "w")
699 fm.write(pitem["time"])
700 fm.close()
701
702 # output ammended segment list files
703 for sfs in self.segment_file_update:
704 p = sp.Popen("cat " + sfs[0] + " >> " + sfs[1], shell=True)
705 p.communicate()
706 if p.returncode != 0:
707 print(
708 "Warning... could not append segments to previous segments file. No log of previous segments will be available."
709 )
710
711 # output dictionaries of analysed pulsars and skipped pulsars to JSON files
712 fpa = open(os.path.join(self.run_dir, "analysed_pulsars.txt"), "w")
713 json.dump(self.analysed_pulsars, fpa, indent=2)
714 fpa.close()
715
716 fps = open(os.path.join(self.run_dir, "skipped_pulsars.txt"), "w")
717 json.dump(self.skipped_pulsars, fps, indent=2)
718 fps.close()
719
720 # email notification that the analysis has finished if required
721 email = self.get_config_option("analysis", "email", allownone=True)
722 if email != None:
723 if "@" not in email:
724 print(
725 "Warning... email address '%s' is invalid. No notification will be sent."
726 % email
727 )
728 else:
729 import smtplib
730 import socket
731
732 # try setting email server
733 try:
734 # set up sender
735 try:
736 HOST = socket.getfqdn()
737 USER = os.environ["USER"]
738 FROM = USER + "@" + HOST
739 except: # default from to 'matthew.pitkin@ligo.org'
740 FROM = "matthew.pitkin@ligo.org"
741
742 subject = "lalpulsar_knope: successful setup"
743 messagetxt = (
744 "Hi User,\n\nYour analysis using configuration file '%s' has successfully setup the analysis. Once complete the results will be found at %s.\n\nRegards\n\nlalpulsar_knope\n"
745 % (configfilename, self.results_url)
746 )
747
748 emailtemplate = "From: {0}\nTo: {1}\nSubject: {2}\n\n{3}"
749 message = emailtemplate.format(FROM, email, subject, messagetxt)
750 server = smtplib.SMTP("localhost")
751 server.sendmail(FROM, email, message)
752 server.quit()
753 except:
754 print("Warning... could not send notification email.")
755
756 def setup_results_pages(self):
757 """
758 Setup the results webpage creation
759 """
760
761 # get directory for output results
762 self.results_basedir = self.get_config_option("results_page", "web_dir")
764 {}
765 ) # dictionary of results directories for individual pulsars
767 {}
768 ) # dictionary of results configuration files for individual pulsars
769 self.results_pulsar_url = {} # dictionary of results page URLs
770 self.mkdirs(self.results_basedir)
771 if self.error_code != 0:
772 return
773
774 # get URL for output results
775 self.results_url = self.get_config_option("results_page", "base_url")
776
777 # run individual pulsar results page creation
779 "results_page",
780 "results_exec",
781 default="/usr/bin/lalpulsar_knope_result_page",
782 )
784 "results_page", "universe", default="local"
785 )
786
787 # check file exists and is executable
788 if not os.path.isfile(self.results_exec) or not os.access(
789 self.results_exec, os.X_OK
790 ):
791 print(
792 "Warning... 'results_exec' in '[results_page]' does not exist or is not an executable. Try finding code in path."
793 )
794 resultexec = self.find_exec_file("lalpulsar_knope_result_page")
795
796 if resultexec == None:
797 print(
798 "Error... could not find 'lalpulsar_knope_result_page' in 'PATH'",
799 file=sys.stderr,
800 )
801 self.error_code = KNOPE_ERROR_GENERAL
802 return
803 else:
804 self.results_exec = resultexec
805
807 "results_page",
808 "collate_exec",
809 default="/usr/bin/lalpulsar_knope_collate_results",
810 )
811
812 # check file exists and is executable
813 if not os.path.isfile(self.collate_exec) or not os.access(
814 self.collate_exec, os.X_OK
815 ):
816 print(
817 "Warning... 'collate_exec' in '[results_page]' does not exist or is not an executable. Try finding code in path."
818 )
819 collateexec = self.find_exec_file("lalpulsar_knope_collate_results")
820
821 if collateexec == None:
822 print(
823 "Error... could not find 'lalpulsar_knope_collate_results' in 'PATH'",
824 file=sys.stderr,
825 )
826 self.error_code = KNOPE_ERROR_GENERAL
827 return
828 else:
829 self.collate_exec = collateexec
830
831 # check if running on an injection
833 "analysis", "injections", cftype="boolean", default=False
834 )
836 "pe", "use_gw_phase", cftype="boolean", default=False
837 )
838 if self.error_code != 0:
839 return
840
841 # check upper limit credible interval to use
843 "results_page", "upper_limit", cftype="int", default=95
844 )
845
846 # check whether to show joint posterior plot for all parameters
848 "results_page", "show_all_posteriors", cftype="boolean", default=False
849 )
850
851 # check whether to subtract injected/heterodyned values from phase parameters for plots
853 "results_page", "subtract_truths", cftype="boolean", default=False
854 )
855
856 # check whether to plot priors on 1D posteriors plots
858 "results_page", "show_priors", cftype="boolean", default=False
859 )
860
861 # check whether to copy "all" files used to create results (fine heterodyne files, par file, prior file, posterior files)
862 # into the results page directory
864 "results_page", "copy_all_files", cftype="boolean", default=False
865 )
866 cpjob = None
867 if self.copy_all_files:
868 # create job for copying files
869 cpjob = copyJob(
870 accgroup=self.accounting_group,
871 accuser=self.accounting_group_user,
872 logdir=self.log_dir,
873 rundir=self.run_dir,
874 )
875
876 # create parameter estimation job
877 resultpagejob = resultpageJob(
878 self.results_exec,
879 univ=self.results_universe,
880 accgroup=self.accounting_group,
881 accuser=self.accounting_group_user,
882 logdir=self.log_dir,
883 rundir=self.run_dir,
884 )
885 collatejob = collateJob(
886 self.collate_exec,
887 univ=self.results_universe,
888 accgroup=self.accounting_group,
889 accuser=self.accounting_group_user,
890 logdir=self.log_dir,
891 rundir=self.run_dir,
892 )
893
894 # create config file for collating results into a results table
895 cpc = RawConfigParser() # create config parser to output .ini file
896 # create configuration .ini file
897 cinifile = os.path.join(self.results_basedir, "collate.ini")
898
899 # create sections
900 cpc.add_section("output")
901 cpc.add_section("input")
902 cpc.add_section("general")
903
904 cpc.set("output", "path", self.results_basedir) # set output directory
905 cpc.set(
906 "input", "path", self.results_basedir
907 ) # set directory containing individual results directories
908
909 # get sort type and direction
910 sorttype = self.get_config_option(
911 "results_page", "sort_value", default="name"
912 ) # default sorting on name
913 cpc.set("general", "sort_value", sorttype)
914 sortdirection = self.get_config_option(
915 "results_page", "sort_direction", default="ascending"
916 ) # default sorting in ascending order
917 cpc.set("general", "sort_direction", sortdirection)
918 if self.pe_coherent_only:
919 cpc.set("general", "detectors", ["Joint"])
920 elif self.pe_incoherent_only:
921 cpc.set("general", "detectors", self.ifos)
922 else:
923 cdets = deepcopy(self.ifos)
924 cdets.append("Joint")
925 cpc.set("general", "detectors", cdets)
926
927 # get pulsar parameters to output
928 paramout = self.get_config_option(
929 "results_page", "parameters", cftype="list", default=["f0"]
930 )
931 cpc.set("general", "parameters", paramout)
932
933 # get results to output
934 resout = self.get_config_option(
935 "results_page", "results", cftype="list", default=["h0ul"]
936 )
937 cpc.set("general", "results", resout)
938
939 # write and set ini file
940 try:
941 fp = open(cinifile, "w")
942 cpc.write(fp)
943 fp.close()
944 except:
945 print(
946 "Error... could not write configuration file '%s' for results collation page"
947 % cinifile,
948 file=sys.stderr,
949 )
950 self.error_code = KNOPE_ERROR_GENERAL
951 return
952
953 # loop through pulsars
954 for pname in self.analysed_pulsars:
955 # create results directory for pulsar
956 self.results_pulsar_dir[pname] = os.path.join(self.results_basedir, pname)
957 self.results_pulsar_url[pname] = urlparse.urljoin(self.results_url, pname)
958
959 self.mkdirs(self.results_pulsar_dir[pname])
960 if self.error_code != 0:
961 return
962
963 copydir = None
964 if self.copy_all_files: # create directory for file copies
965 copydir = os.path.join(self.results_pulsar_dir[pname], "files")
966 self.mkdirs(copydir)
967 if self.error_code != 0:
968 return
969
970 # copy par file
971 shutil.copy(
972 self.analysed_pulsars[pname], os.path.join(copydir, pname + ".par")
973 )
974
975 # copy prior file
976 shutil.copy(self.pe_prior_files[pname], copydir)
977
978 # copy correlation coefficient file
979 if pname in self.pe_cor_files:
980 shutil.copy(self.pe_cor_files[pname], copydir)
981
982 # if in autonomous mode, and a previous JSON results file already exists, make a copy of it
983 jsonfile = os.path.join(self.results_pulsar_dir[pname], pname + ".json")
984 if self.autonomous:
985 if os.path.isfile(jsonfile):
986 # append starttime (which will be the end time of the previous results) timestamp to JSON file
987 try:
988 shutil.copyfile(
989 jsonfile,
990 jsonfile + "_%d" % list(self.starttime.values())[0],
991 )
992 except:
993 print(
994 "Warning... could not copy previous results JSON file '%s'. Previous results may get overwritten."
995 % jsonfile,
996 file=sys.stderr,
997 )
998
999 # try getting some pulsar information (dist, p1_I and assoc) from the ATNF catalogue for use in lalpulsar_knope_result_page
1000 # NOTE: this is mainly required because on the ARCCA cluster the nodes cannot access the internet
1001 pinfo = pppu.get_atnf_info(pname)
1002 if pinfo is not None:
1003 dist, p1_I, assoc, _ = pinfo # unpack values
1004 psrinfo = {}
1005 psrinfo["Pulsar data"] = {}
1006 psrinfo["Pulsar data"]["DIST"] = dist
1007 psrinfo["Pulsar data"]["P1_I"] = p1_I
1008 psrinfo["Pulsar data"]["ASSOC"] = assoc
1009
1010 try:
1011 fp = open(jsonfile, "w")
1012 json.dump(psrinfo, fp, indent=2)
1013 fp.close()
1014 except:
1015 print(
1016 "Warning... could not write out ATNF catalogue information to JSON file '%s'."
1017 % jsonfile,
1018 file=sys.stderr,
1019 )
1020
1021 cp = RawConfigParser() # create config parser to output .ini file
1022 # create configuration .ini file
1023 inifile = os.path.join(self.results_pulsar_dir[pname], pname + ".ini")
1024
1025 # add sections
1026 cp.add_section("general")
1027 cp.add_section("parameter_estimation")
1028 cp.add_section("data")
1029 cp.add_section("output")
1030 cp.add_section("plotting")
1031
1032 cp.set(
1033 "output", "path", self.results_pulsar_dir[pname]
1034 ) # set output directory
1035 cp.set(
1036 "output",
1037 "indexpage",
1038 os.path.relpath(self.results_basedir, self.results_pulsar_dir[pname]),
1039 )
1040
1041 cp.set(
1042 "general", "parfile", self.analysed_pulsars[pname]
1043 ) # set the pulsar parameter file
1044 cp.set("general", "detectors", self.ifos) # set the detectors
1045 cp.set(
1046 "general", "upper_limit", self.upper_limit
1047 ) # set the upper limit credible interval
1048
1049 if self.pe_coherent_only:
1050 cp.set(
1051 "general", "joint_only", True
1052 ) # only output the joint multi-detector analysis
1053 else:
1054 cp.set("general", "joint_only", False)
1055
1056 if self.pe_incoherent_only:
1057 cp.set(
1058 "general", "with_joint", False
1059 ) # only output individual detector analyses
1060 else:
1061 cp.set(
1062 "general", "with_joint", True
1063 ) # include joint multi-detector analysis
1064
1065 if self.pe_num_background > 0:
1066 cp.set(
1067 "general", "with_background", True
1068 ) # include background analysis
1069 else:
1070 cp.set(
1071 "general", "with_background", False
1072 ) # no background analysis present
1073
1074 if self.injection:
1075 cp.set("general", "injection", True) # set if an injection or not
1076 else:
1077 cp.set("general", "injection", False)
1078
1079 if self.use_gw_phase:
1080 cp.set(
1081 "general", "use_gw_phase", True
1082 ) # use GW initial phase (rather than rotational phase) e.g. for hardware injections
1083 else:
1084 cp.set("general", "use_gw_phase", False)
1085
1086 cp.set(
1087 "general", "harmonics", self.freq_factors
1088 ) # set frequency harmonics in analysis
1089 cp.set(
1090 "general", "model_type", self.pe_model_type
1091 ) # set 'waveform' or 'source' model type
1092 cp.set(
1093 "general", "biaxial", self.pe_biaxial
1094 ) # set if using a biaxial source model
1095
1096 if self.show_priors:
1097 cp.set(
1098 "general", "priorfile", self.pe_prior_files[pname]
1099 ) # set the prior file
1100
1101 # get posterior files (and background directories)
1102 posteriorsfiles = {}
1103 backgrounddir = {}
1104 copydirpos = None
1105 if copydir is not None:
1106 copydirpos = os.path.join(copydir, "posteriors")
1107 self.mkdirs(copydirpos)
1108
1109 for i, comb in enumerate(self.pe_combinations):
1110 dets = comb["detectors"]
1111 detprefix = comb["prefix"]
1112
1113 if len(dets) == 1:
1114 det = dets[0]
1115 else:
1116 det = (
1117 "Joint" # use 'Joint' as the term for a multi-detector analysis
1118 )
1119
1120 posteriorsfiles[det] = os.path.join(self.pe_posterior_basedir, pname)
1121 posteriorsfiles[det] = os.path.join(posteriorsfiles[det], detprefix)
1122
1123 if copydirpos is not None:
1124 self.mkdirs(os.path.join(copydirpos, detprefix))
1125
1126 if self.pe_num_background > 0:
1127 backgrounddir[det] = os.path.join(
1129 )
1130 backgrounddir[det] = os.path.join(backgrounddir[det], detprefix)
1131
1132 dirpostfix = ""
1133 if len(self.freq_factors) > 1: # coherent multi-frequency analysis
1134 dirpostfix = "multiharmonic"
1135 else:
1136 if (
1137 not self.freq_factors[0] % 1.0
1138 ): # for integers just output directory as e.g. 2f
1139 dirpostfix = "%df" % int(self.freq_factors[0])
1140 else:
1141 dirpostfix = "%.2ff" % self.freq_factors[0]
1142
1143 posteriorsfiles[det] = os.path.join(posteriorsfiles[det], dirpostfix)
1144 posteriorsfiles[det] = os.path.join(
1145 posteriorsfiles[det], "posterior_samples_%s.hdf" % pname
1146 )
1147
1148 if copydirpos is not None:
1149 copydirposp = os.path.join(copydirpos, detprefix)
1150 copydirposp = os.path.join(copydirposp, dirpostfix)
1151 self.mkdirs(copydirposp)
1152 cpnode = copyNode(cpjob)
1153 cpnode.set_source(posteriorsfiles[det])
1154 cpnode.set_destination(copydirposp)
1155 for n2pnode in self.pe_nest2pos_nodes[pname]:
1156 cpnode.add_parent(n2pnode)
1157 self.add_node(cpnode)
1158
1159 # if in autonomous mode copy previous posterior files
1160 if self.autonomous:
1161 if os.path.isfile(posteriorsfiles[det]):
1162 try: # copy to file with the start time (i.e. the end time of the previous analysis for which the posterior file belongs) appended
1163 shutil.copyfile(
1164 posteriorsfiles[det],
1165 posteriorsfiles[det].strip(".hdf")
1166 + "_%d.hdf" % list(self.starttime.values())[0],
1167 )
1168 except:
1169 print(
1170 "Warning... could not create copy of current posterior samples file '%s'. This will get overwritten on next autonomous run."
1171 % posteriorsfiles[det],
1172 file=sys.stderr,
1173 )
1174
1175 if self.pe_num_background > 0:
1176 backgrounddir[det] = os.path.join(backgrounddir[det], dirpostfix)
1177
1178 cp.set("parameter_estimation", "posteriors", posteriorsfiles)
1179
1180 if self.pe_num_background > 0:
1181 cp.set("parameter_estimation", "background", backgrounddir)
1182
1183 # copy fine heterodyned/spectrally interpolated files if required
1184 copydirhet = None
1185 if copydir is not None:
1186 copydirhet = os.path.join(copydir, "data")
1187 self.mkdirs(copydirhet)
1188
1189 datafiles = {}
1190 for ifo in self.ifos:
1191 if copydir is not None:
1192 copydirhet = os.path.join(copydirhet, ifo)
1193 self.mkdirs(copydirhet)
1194
1195 filelist = []
1196 for ff in self.freq_factors:
1197 filelist.append(self.processed_files[pname][ifo][ff][-1])
1198
1199 # copy fine heterodyned/spectrally interpolated files
1200 if copydir is not None:
1201 if not ff % 1.0: # for integers just output director as e.g. 2f
1202 ffdir = os.path.join(copydirhet, "%df" % int(ff))
1203 else: # for non-integers use 2 d.p. for dir name
1204 ffdir = os.path.join(copydirhet, "%.3ff" % int(ff))
1205
1206 self.mkdirs(ffdir)
1207 cpnode = copyNode(cpjob)
1208 cpnode.set_source(self.processed_files[pname][ifo][ff][-1])
1209 cpnode.set_destination(ffdir)
1210 if self.engine == "heterodyne" and not self.postonly:
1211 if pname in self.concat_nodes and self.autonomous:
1212 cpnode.add_parent(self.concat_nodes[pname][ifo][ff])
1213 else:
1214 cpnode.add_parent(
1215 self.fine_heterodyne_nodes[pname][ifo][ff]
1216 )
1217 elif self.engine == "splinter" and not self.postonly:
1218 cpnode.add_parent(self.splinter_nodes_modified[ifo][ff])
1219 self.add_node(cpnode)
1220
1221 if len(self.freq_factors) == 1:
1222 filelist = filelist[0] # don't need list if only one value
1223 datafiles[ifo] = filelist
1224
1225 if copydir is not None:
1226 copydirhet = os.path.join(copydir, "data") # reset to base path
1227
1228 cp.set("data", "files", datafiles)
1229
1230 cp.set("plotting", "all_posteriors", self.show_all_posteriors)
1231 cp.set("plotting", "subtract_truths", self.subtract_truths)
1232
1233 # output configuration file
1234 try:
1235 fp = open(inifile, "w")
1236 cp.write(fp)
1237 fp.close()
1238 except:
1239 print(
1240 "Error... could not write configuration file '%s' for results page"
1241 % inifile,
1242 file=sys.stderr,
1243 )
1244 self.error_code = KNOPE_ERROR_GENERAL
1245 return
1246
1247 # create dag node
1248 resultsnode = resultpageNode(resultpagejob)
1249 resultsnode.set_config(inifile)
1250
1251 # add parent lalinference_nest2pos job
1252 for n2pnode in self.pe_nest2pos_nodes[pname]:
1253 resultsnode.add_parent(n2pnode)
1254
1255 if self.pe_num_background > 0:
1256 for n2pnode in self.pe_nest2pos_background_nodes[pname]:
1257 resultsnode.add_parent(n2pnode)
1258
1259 self.add_node(resultsnode)
1260
1261 # add as parent to the collation node
1262 collatenode = collateNode(
1263 collatejob
1264 ) # create a collate node for each pulsar, so that the table gets regenerated as each new reults comes in
1265 collatenode.set_config(cinifile)
1266 collatenode.add_parent(resultsnode)
1267 self.add_node(collatenode)
1268
1269 def setup_preprocessing(self):
1270 """
1271 Setup the preprocessing analysis: data finding, segment finding and heterodyne/splinter data processing
1272 """
1273
1274 # set the data find job and nodes
1275 self.setup_datafind()
1276 if self.error_code != 0:
1277 return
1278
1279 # loop through the pulsars and setup required jobs for each
1280 if self.engine == "heterodyne":
1281 self.setup_heterodyne()
1282 if self.error_code != 0:
1283 return
1284
1285 if self.engine == "splinter":
1286 self.setup_splinter()
1287 if self.error_code != 0:
1288 return
1289
1290 # create jobs to concatenate output fine heterodyned/Splinter files if required
1291 self.remove_job = None
1292 self.concatenate_files()
1293 if self.error_code != 0:
1294 return
1295
1297 """
1298 Setup parameter estimation jobs/nodes for signal and background analyses
1299 """
1300
1301 # get executable
1303 "pe", "pe_exec", default="lalpulsar_parameter_estimation_nested"
1304 )
1305 if self.error_code != 0:
1306 return
1307
1308 # check file exists and is executable
1309 if not os.path.isfile(self.pe_exec) or not os.access(self.pe_exec, os.X_OK):
1310 print(
1311 "Warning... 'pe_exec' in '[pe]' does not exist or is not an executable. Try finding code in path."
1312 )
1313 peexec = self.find_exec_file("lalpulsar_parameter_estimation_nested")
1314
1315 if peexec == None:
1316 print(
1317 "Error... could not find 'lalpulsar_parameter_estimation_nested' in 'PATH'",
1318 file=sys.stderr,
1319 )
1320 self.error_code = KNOPE_ERROR_GENERAL
1321 return
1322 else:
1323 self.pe_exec = peexec
1324
1325 # Get Condor universe (default to vanilla)
1326 self.pe_universe = self.get_config_option("pe", "universe", default="vanilla")
1327 if self.error_code != 0:
1328 return
1329
1331 {}
1332 ) # condor nodes for the lalinference_nest2pos jobs (needed to use as parents for results processing jobs)
1333 self.pe_nest2pos_background_nodes = {} # nodes for background analysis
1334
1335 # see whether to run just as independent detectors, or independently AND coherently over all detectors
1336 if len(self.ifos) == 1:
1337 self.pe_incoherent_only = True # set to True for one detector
1338 else:
1340 "analysis", "incoherent_only", cftype="boolean", default=False
1341 )
1342
1343 # see whether to run only the coherent multidetector analysis analysis
1344 self.pe_coherent_only = False
1345 if len(self.ifos) > 1:
1347 "analysis", "coherent_only", cftype="boolean", default=False
1348 )
1349
1350 # get the number of background analyses to perform
1352 "analysis", "num_background", cftype="int", default=0
1353 )
1354 if self.pe_num_background < 0:
1355 print(
1356 "Warning... 'num_background' is a negative value. Defaulting to zero background runs"
1357 )
1358 self.pe_num_background = 0
1359
1360 # get the PE output directory
1362 "pe", "pe_output_dir", cftype="dir"
1363 )
1364
1365 # Make directory
1366 self.mkdirs(self.pe_output_basedir)
1367 if self.error_code != 0:
1368 return
1369
1370 # set background run directories if required
1372 "pe", "pe_output_dir_background", cftype="dir", allownone=True
1373 )
1374 if self.pe_num_background != 0:
1375 if self.pe_output_background_basedir == None:
1376 print(
1377 "Error... no background analysis directory has been set",
1378 file=sys.stderr,
1379 )
1380 self.error_code = KNOPE_ERROR_GENERAL
1381 return
1382 else:
1384 if self.error_code != 0:
1385 return
1386
1387 # get some general PE parameters
1389 "pe", "n_live", cftype="int", default=2048
1390 ) # number of live points
1392 "pe", "n_runs", cftype="int", default=1
1393 ) # number of parallel runs
1395 "pe", "tolerance", cftype="float", default=0.1
1396 ) # nested sampling stopping criterion
1398 "pe", "random_seed", cftype="int", allownone=True
1399 ) # random number generator seed
1401 "pe", "n_mcmc", cftype="int", allownone=True
1402 ) # number of MCMC steps for each nested sample
1404 "pe", "n_mcmc_initial", cftype="int", default=500
1405 )
1407 "pe", "non_gr", cftype="boolean", default=False
1408 ) # use non-GR parameterisation (default to False)
1409
1410 # check if only using a section of data
1412 "pe", "starttime", cftype="float", allownone=True
1413 )
1415 "pe", "endtime", cftype="float", allownone=True
1416 )
1418 "pe", "truncate_time", cftype="float", allownone=True
1419 )
1421 "pe", "truncate_samples", cftype="int", allownone=True
1422 )
1424 "pe", "truncate_fraction", cftype="float", allownone=True
1425 )
1426
1427 # parameters for background runs
1429 "pe", "n_runs_background", cftype="int", default=1
1430 )
1432 "pe", "n_live_background", cftype="int", default=1024
1433 )
1434
1435 # parameters for ROQ
1437 "pe", "use_roq", cftype="boolean", default=False
1438 ) # check if using Reduced Order Quadrature (ROQ)
1440 "pe", "roq_ntraining", cftype="int", default=2500
1441 ) # number of training waveforms for reduced basis generation
1443 "pe", "roq_tolerance", cftype="float", default=5e-12
1444 ) # mis-match tolerance when producing reduced basis
1446 "pe", "roq_uniform", cftype="boolean", default=False
1447 ) # check if setting uniform distributions for sprinkling traning waveform parameters
1449 "pe", "roq_chunkmax", cftype="int", default=1440
1450 ) # maximum data chunk length for creating ROQ
1451
1452 # FIXME: Currently this won't run with non-GR parameters, so output a warning and default back to GR!
1453 if self.pe_non_gr:
1454 print(
1455 "Warning... currently this will not run with non-GR parameters. Reverting to GR-mode."
1456 )
1457 self.pe_non_gr = False
1458
1459 # if searching at both the rotation frequency and twice rotation frequency set which parameterisation to use
1461 "pe", "model_type", default="waveform"
1462 )
1463 if self.pe_model_type not in ["waveform", "source"]:
1464 print(
1465 "Warning... the given 'model_type' '%s' is not allowed. Defaulting to 'waveform'"
1466 % self.pe_model_type
1467 )
1468 self.pe_model_type = "waveform"
1469
1470 self.pe_biaxial = False
1471 # check whether using a biaxial signal
1472 if len(self.freq_factors) == 2 and self.pe_non_gr == False:
1473 self.pe_biaxial = self.get_config_option(
1474 "pe", "biaxial", cftype="boolean", default=False
1475 ) # use a biaxial signal model
1476
1477 # check whether using the Student's t-likelihood or Gaussian likelihood
1479 "pe", "gaussian_like", cftype="boolean", default=False
1480 )
1481 if (
1482 self.engine == "splinter"
1483 ): # always use the Gaussian likelihood for the SplInter processed data
1484 self.pe_gaussian_like = True
1485
1486 # check if there is a pre-made prior file to use for all pulsars
1488 "pe", "prior_options", cftype="dict"
1489 )
1491 "pe", "premade_prior_file", allownone=True
1492 )
1493
1494 if self.pe_premade_prior_file is not None:
1495 if not os.path.isfile(self.pe_premade_prior_file):
1496 print(
1497 "Error... pre-made prior file '{}' does not exist!".format(
1499 ),
1500 file=sys.stderr,
1501 )
1502 self.error_code = KNOPE_ERROR_GENERAL
1503 return
1504
1506 "pe", "derive_amplitude_prior", cftype="boolean", default=False
1507 )
1508
1510 # get JSON file containing any previous amplitude upper limits for pulsars
1512 "pe", "amplitude_prior_file", allownone=True
1513 )
1514
1515 # get JSON file containing any previous posterior files from which to use amplitude vs cosiota samples as GMM prior
1518 "pe", "previous_posteriors_file", allownone=True
1519 )
1520
1521 self.pe_prior_info = None
1523
1524 # get file, or dictionary of amplitude spectral density files (e.g. from a previous run) to derive amplitude priors
1525 try:
1526 self.pe_amplitude_prior_asds = ast.literal_eval(
1527 self.get_config_option("pe", "amplitude_prior_asds", allownone=True)
1528 )
1529 except:
1531 "pe", "amplitude_prior_asds", allownone=True
1532 )
1533
1534 try:
1535 self.pe_amplitude_prior_obstimes = ast.literal_eval(
1536 self.get_config_option(
1537 "pe", "amplitude_prior_obstimes", allownone=True
1538 )
1539 )
1540 except:
1542 "pe", "amplitude_prior_obstimes", allownone=True
1543 )
1544
1546 "pe", "amplitude_prior_type", default="fermidirac"
1547 )
1548
1549 # check if using par files errors in parameter estimation
1551 "pe", "use_parameter_errors", cftype="boolean", default=False
1552 )
1553
1554 # check for executable for merging nested sample files/converting them to posteriors
1556 "pe", "n2p_exec", default="lalinference_nest2pos"
1557 )
1558 if self.error_code != 0:
1559 return
1560
1561 # check file exists and is executable
1562 if not os.path.isfile(self.pe_n2p_exec) or not os.access(
1563 self.pe_n2p_exec, os.X_OK
1564 ):
1565 print(
1566 "Warning... 'pe_n2p_exec' in '[pe]' does not exist or is not an executable. Try finding code in path."
1567 )
1568 pen2pexec = self.find_exec_file("lalinference_nest2pos")
1569
1570 if pen2pexec == None:
1571 print(
1572 "Error... could not find 'lalinference_nest2pos' in 'PATH'",
1573 file=sys.stderr,
1574 )
1575 self.error_code = KNOPE_ERROR_GENERAL
1576 return
1577 else:
1578 self.pe_n2p_exec = pen2pexec
1579
1580 self.pe_posterior_basedir = self.get_config_option("pe", "n2p_output_dir")
1581 if self.pe_posterior_basedir == None:
1582 print(
1583 "Error... no 'n2p_output_dir' specified in '[pe]' giving path for posterior sample outputs",
1584 file=sys.stderr,
1585 )
1586 self.error_code = KNOPE_ERROR_GENERAL
1587 return
1588 self.mkdirs(self.pe_posterior_basedir)
1589 if self.error_code != 0:
1590 return
1591
1592 if self.pe_num_background > 0:
1594 "pe", "n2p_output_dir_background"
1595 )
1596 if self.pe_posterior_background_basedir == None:
1597 print(
1598 "Error... no 'n2p_output_dir_background' specified in '[pe]' giving path for posterior sample outputs",
1599 file=sys.stderr,
1600 )
1601 self.error_code = KNOPE_ERROR_GENERAL
1602 return
1604 if self.error_code != 0:
1605 return
1606
1607 # check whether to remove all the nested sample files after the run
1609 "pe", "clean_nest_samples", cftype="boolean", default=False
1610 )
1611
1612 # check if there are memory requirements for the PE jobs
1614 "pe", "pe_request_memory", cftype="int", allownone=True
1615 )
1616
1617 # create parameter estimation job
1618 pejob = ppeJob(
1619 self.pe_exec,
1620 univ=self.pe_universe,
1621 accgroup=self.accounting_group,
1622 accuser=self.accounting_group_user,
1623 logdir=self.log_dir,
1624 rundir=self.run_dir,
1625 requestmemory=self.pe_request_memory,
1626 )
1627
1628 # create job to merge nested samples and convert to posterior samples
1629 n2pjob = nest2posJob(
1630 self.pe_n2p_exec,
1631 univ="local",
1632 accgroup=self.accounting_group,
1633 accuser=self.accounting_group_user,
1634 logdir=self.log_dir,
1635 rundir=self.run_dir,
1636 )
1637
1638 # create job for moving SNR files
1639 mvjob = moveJob(
1640 accgroup=self.accounting_group,
1641 accuser=self.accounting_group_user,
1642 logdir=self.log_dir,
1643 rundir=self.run_dir,
1644 )
1645
1646 # create job for removing nested samples (use previous remove job if existing)
1647 if self.remove_job == None:
1648 rmjob = removeJob(
1649 accgroup=self.accounting_group,
1650 accuser=self.accounting_group_user,
1651 logdir=self.log_dir,
1652 rundir=self.run_dir,
1653 )
1654 else:
1655 rmjob = self.remove_job
1656
1657 # work out combinations of parameter estimation jobs to run (FIXME: if non-GR mode was able to be enabled then another
1658 # combination would include those below, but in both GR and non-GR flavours)
1659 # NOTE: if running with two frequency factors (which have been fixed, so they can only be 1f and 2f), this just runs
1660 # for a multi-harmonic search at both frequencies. If you want to run at a single frequency then that should be set up
1661 # independently.
1663 for ifo in self.ifos:
1664 if not self.pe_coherent_only:
1665 self.pe_combinations.append(
1666 {"detectors": [ifo], "prefix": ifo}
1667 ) # all individual detector runs
1668
1669 if not self.pe_incoherent_only:
1670 self.pe_combinations.append(
1671 {"detectors": self.ifos, "prefix": "".join(self.ifos)}
1672 ) # add joint multi-detector run
1673
1674 self.pe_prior_files = {} # dictionary to contain prior files for each pulsar
1676 {}
1677 ) # dictionary to contain correlation coefficient matrices for pulsars as required
1678
1679 # dictionary of nest2pos nodes
1680 n2pnodes = {}
1681
1682 # loop over the total number of jobs (single signal job and background jobs)
1683 njobs = self.pe_num_background + 1
1684 for j in range(njobs):
1685 # loop through each pulsar that has been analysed
1686 for pname in self.analysed_pulsars:
1687 psr = pppu.psr_par(self.analysed_pulsars[pname])
1688
1689 # directories for output nested sample files
1690 if j == 0: # first iteration of outer loop is the signal job
1691 psrdir = os.path.join(self.pe_output_basedir, pname)
1692 else: # subsequent iterations are for background jobs
1693 psrdir = os.path.join(self.pe_output_background_basedir, pname)
1694
1695 # directories for output posterior files
1696 if j == 0: # first iteration of outer loop is the signal job
1697 psrpostdir = os.path.join(self.pe_posterior_basedir, pname)
1698 else: # subsequent iterations are for background jobs
1699 psrpostdir = os.path.join(
1701 )
1702
1703 if self.pe_roq:
1704 nroqruns = 1 # add one run for the ROQ weights calculation
1705 else:
1706 nroqruns = 0
1707
1708 # create directory for that pulsar
1709 self.mkdirs(psrdir)
1710 if self.error_code != 0:
1711 return
1712
1713 self.mkdirs(psrpostdir)
1714 if self.error_code != 0:
1715 return
1716
1717 n2pnodes[
1718 pname
1719 ] = (
1720 []
1721 ) # list of nodes for lalinference_nest2pos jobs for a given pulsar
1722
1723 for comb in self.pe_combinations:
1724 dets = comb["detectors"]
1725 detprefix = comb["prefix"]
1726
1727 # set up directories for detectors
1728 detdir = os.path.join(psrdir, detprefix)
1729 self.mkdirs(detdir)
1730 if self.error_code != 0:
1731 return
1732
1733 detpostdir = os.path.join(psrpostdir, detprefix)
1734 self.mkdirs(detpostdir)
1735 if self.error_code != 0:
1736 return
1737
1738 # make directories for frequency factors
1739 if len(self.freq_factors) > 1: # coherent multi-frequency analysis
1740 ffdir = os.path.join(detdir, "multiharmonic")
1741 ffpostdir = os.path.join(detpostdir, "multiharmonic")
1742 else:
1743 if (
1744 not self.freq_factors[0] % 1.0
1745 ): # for integers just output directory as e.g. 2f
1746 ffdir = os.path.join(
1747 detdir, "%df" % int(self.freq_factors[0])
1748 )
1749 ffpostdir = os.path.join(
1750 detpostdir, "%df" % int(self.freq_factors[0])
1751 )
1752 else:
1753 ffdir = os.path.join(detdir, "%.2ff" % self.freq_factors[0])
1754 ffpostdir = os.path.join(
1755 detpostdir, "%.2ff" % self.freq_factors[0]
1756 )
1757
1758 # add integer numbering to directories if running background analysis jobs
1759 if j > 0:
1760 ffdir = os.path.join(
1761 ffdir, "%05d" % (j - 1)
1762 ) # start directory names as 00000, 00001, 00002, etc
1763 ffpostdir = os.path.join(ffpostdir, "%05d" % (j - 1))
1764
1765 self.mkdirs(ffdir)
1766 if self.error_code != 0:
1767 return
1768
1769 self.mkdirs(ffpostdir)
1770 if self.error_code != 0:
1771 return
1772
1773 randomiseseed = ""
1774 if j == 0:
1775 # create prior file for analysis (use this same file for all background runs)
1776 priorfile = self.create_prior_file(
1777 psr, psrdir, dets, self.freq_factors, ffdir
1778 )
1779 if pname not in self.pe_prior_files:
1780 self.pe_prior_files[
1781 pname
1782 ] = priorfile # set prior file (just use first one as they should be the same for each combination of detectors)
1783
1784 nruns = self.pe_nruns
1785 nlive = self.pe_nlive
1786 else:
1787 nruns = self.pe_nruns_background
1788 nlive = self.pe_nlive_background
1789 # set seed for randomising data (this needs to be the same over each nruns)
1790 randomiseseed = "".join(
1791 [str(f) for f in np.random.randint(1, 10, size=15).tolist()]
1792 )
1793
1794 # set output ROQ weights file
1795 if self.pe_roq:
1796 roqweightsfile = os.path.join(ffdir, "roqweights.bin")
1797
1798 nestfiles = [] # list of nested sample file names
1799 penodes = []
1800 roqinputnode = None
1801
1802 # setup job(s)
1803 i = counter = 0
1804 while (
1805 counter < nruns + nroqruns
1806 ): # loop over the required number of runs
1807 penode = ppeNode(pejob, psrname=pname)
1808 if self.pe_random_seed is not None:
1809 penode.set_randomseed(
1810 self.pe_random_seed
1811 ) # set seed for RNG
1812 penode.set_detectors(",".join(dets)) # add detectors
1813 penode.set_par_file(
1814 self.analysed_pulsars[pname]
1815 ) # add parameter file
1816 if pname in self.pe_cor_files:
1817 penode.set_cor_file(
1818 self.pe_cor_files[pname]
1819 ) # set parameter correlation coefficient file
1820 penode.set_prior_file(priorfile) # set prior file
1821 penode.set_harmonics(
1822 ",".join([str(ffv) for ffv in self.freq_factors])
1823 ) # set frequency harmonics
1824
1825 penode.set_Nlive(nlive) # set number of live points
1826 if self.pe_nmcmc is not None:
1827 penode.set_Nmcmc(
1828 self.pe_nmcmc
1829 ) # set number of MCMC iterations for choosing new points
1830 penode.set_Nmcmcinitial(
1831 self.pe_nmcmc_initial
1832 ) # set initial number of MCMC interations for reampling the prior
1833 penode.set_tolerance(
1834 self.pe_tolerance
1835 ) # set tolerance for ending nested sampling
1836
1837 if self.pe_starttime is not None:
1838 penode.set_start_time(
1839 self.pe_starttime
1840 ) # set start time of data to use
1841
1842 if self.pe_endtime is not None:
1843 penode.set_end_time(
1844 self.pe_endtime
1845 ) # set end time of data to use
1846
1847 if self.pe_starttime is None and self.pe_endtime is None:
1848 if self.pe_truncate_time is not None:
1849 penode.set_truncate_time(self.pe_truncate_time)
1850 elif self.pe_truncate_samples is not None:
1851 penode.set_truncate_samples(self.pe_truncate_samples)
1852 elif self.pe_truncate_fraction is not None:
1853 penode.set_truncate_fraction(self.pe_truncate_fraction)
1854
1855 # set the output nested samples file
1856 nestfiles.append(
1857 os.path.join(
1858 ffdir, "nested_samples_%s_%05d.hdf" % (pname, i)
1859 )
1860 )
1861 penode.set_outfile(nestfiles[i])
1862
1863 if self.pe_roq:
1864 penode.set_roq()
1865 penode.set_roq_chunkmax(str(self.pe_roq_chunkmax))
1866
1867 if counter == 0: # first time round just output weights
1868 penode.set_roq_ntraining(str(self.pe_roq_ntraining))
1869 penode.set_roq_tolerance(str(self.pe_roq_tolerance))
1870 penode.set_roq_outputweights(roqweightsfile)
1871 if self.pe_roq_uniform:
1872 penode.set_roq_uniform()
1873 else: # use pre-created weights file
1874 penode.set_roq_inputweights(roqweightsfile)
1875
1876 # for background runs set the randomise seed
1877 if j > 0:
1878 penode.set_randomise(randomiseseed)
1879
1880 # add input files
1881 inputfiles = []
1882 for det in dets:
1883 for ff in self.freq_factors:
1884 inputfiles.append(
1885 self.processed_files[pname][det][ff][-1]
1886 )
1887 penode.set_input_files(",".join(inputfiles))
1888
1889 if self.pe_gaussian_like or self.engine == "splinter":
1890 # use Gaussian likelihood
1891 penode.set_gaussian_like()
1892
1893 if (
1894 len(self.freq_factors) == 2
1895 and 1.0 in self.freq_factors
1896 and 2.0 in self.freq_factors
1897 ):
1898 # set whether using source model
1899 if self.pe_model_type == "source":
1900 penode.set_source_model()
1901
1902 # set whether using a biaxial signal
1903 if self.pe_biaxial:
1904 penode.set_biaxial()
1905
1906 # set Earth, Sun and time ephemeris files
1907 earthfile, sunfile, timefile = self.get_ephemeris(psr)
1908 penode.set_ephem_earth(earthfile)
1909 penode.set_ephem_sun(sunfile)
1910 penode.set_ephem_time(timefile)
1911
1912 # add parents (unless just doing post-processing)
1913 if not self.postonly:
1914 for ff in self.freq_factors:
1915 for det in dets:
1916 # set parents of node
1917 if pname in self.concat_nodes:
1918 penode.add_parent(
1919 self.concat_nodes[pname][det][ff]
1920 )
1921 else:
1922 if self.engine == "heterodyne":
1923 try: # fine heterodyne nodes might not exist if (in automated mode) no new segments were found, but previous data was available
1924 penode.add_parent(
1925 self.fine_heterodyne_nodes[pname][
1926 det
1927 ][ff]
1928 )
1929 except:
1930 pass
1931 if self.engine == "splinter":
1932 try: # splinter nodes might not exist if (in automated mode) no new segments were found, but previous data was available
1933 if pname in self.splinter_modified_pars:
1934 penode.add_parent(
1936 det
1937 ][ff]
1938 )
1939 elif (
1940 pname
1942 ):
1943 penode.add_parent(
1945 det
1946 ][ff]
1947 )
1948 except:
1949 pass
1950
1951 # if using ROQ add first PE node as parent to the rest
1952 if self.pe_roq:
1953 if (
1954 roqinputnode is not None
1955 ): # add first penode (which generates the ROQ interpolant) as a parent to subsequent nodes
1956 penode.add_parent(roqinputnode)
1957
1958 if (
1959 counter == 0
1960 ): # get first penode (generating and outputting the ROQ interpolant) to add as parents to subsequent nodes
1961 roqinputnode = penode
1962 self.add_node(penode)
1963 counter = (
1964 counter + 1
1965 ) # increment "counter", but not "i"
1966 del nestfiles[-1] # remove last element of nestfiles
1967 continue
1968
1969 # add node to dag
1970 self.add_node(penode)
1971 penodes.append(penode)
1972
1973 # move SNR files into posterior directory
1974 mvnode = moveNode(mvjob)
1975 snrsourcefile = (
1976 os.path.splitext(nestfiles[i])[0] + "_SNR"
1977 ) # source SNR file
1978 snrdestfile = os.path.join(
1979 ffpostdir, "SNR_%05d.txt" % i
1980 ) # destination SNR file in posterior directory
1981 mvnode.set_source(snrsourcefile)
1982 mvnode.set_destination(snrdestfile)
1983 mvnode.add_parent(penode)
1984 self.add_node(mvnode)
1985
1986 counter = counter + 1
1987 i = i + 1
1988
1989 # add lalinference_nest2pos node to combine outputs/convert to posterior samples
1990 n2pnode = nest2posNode(n2pjob)
1991 postfile = os.path.join(
1992 ffpostdir, "posterior_samples_%s.hdf" % pname
1993 )
1994 n2pnode.set_outfile(postfile) # output posterior file
1995 n2pnode.set_nest_files(nestfiles) # nested sample files
1996
1997 n2pnodes[pname].append(n2pnode)
1998
1999 # add parents
2000 for pn in penodes:
2001 n2pnode.add_parent(pn)
2002
2003 self.add_node(n2pnode) # add node
2004
2005 # check whether to remove the nested sample files
2006 if self.pe_clean_nest_samples:
2007 rmnode = removeNode(rmjob)
2008 # add name of header file
2009 rmnode.set_files(nestfiles)
2010 rmnode.add_parent(n2pnode)
2011 self.add_node(rmnode)
2012
2013 if j == 0:
2014 # add main n2p nodes
2015 for pname in self.analysed_pulsars:
2016 self.pe_nest2pos_nodes[pname] = n2pnodes[pname]
2017 else:
2018 # add background n2p nodes
2019 for pname in self.analysed_pulsars:
2020 self.pe_nest2pos_background_nodes[pname] = n2pnodes[pname]
2021
2023 self, psr, psrdir, detectors, freqfactors, outputpath, scalefactor=25.0
2024 ):
2025 """
2026 Create the prior file to use for a particular job defined by a set of detectors, or single detector, and
2027 a set of frequency factors, or a single frequency factor. If creating an prior limit based on a set of given
2028 amplitude spectral densities (by calculating an estimate of the 95% UL they would produce) then it will
2029 additionally be scaled by a factor of `scalefactor`.
2031 Return the full output file and the create prior node
2032 """
2033
2034 # create the output file
2035 pname = psr["PSRJ"]
2036 outfile = os.path.join(outputpath, "%s.prior" % pname)
2037
2038 # if using a pre-made prior file then just create a symbolic link to that file into outputpath
2039 if self.pe_premade_prior_file is not None:
2040 try:
2041 os.symlink(self.pe_premade_prior_file, outfile)
2042 except:
2043 print(
2044 "Error... could not create symbolic link to prior file '%s'"
2045 % self.pe_premade_prior_file,
2046 file=sys.stderr,
2047 )
2048 self.error_code = KNOPE_ERROR_GENERAL
2049
2050 return outfile
2051
2052 # check if requiring to add parameters with errors in the .par file to the prior options
2053 prior_options = {}
2054 if self.pe_prior_options is not None:
2055 prior_options = deepcopy(self.pe_prior_options)
2056
2058 # create and output a covariance matrix (in the pulsar directory) if it does not already exist
2059 corfile = os.path.join(psrdir, "%s.cor" % pname)
2060 fp = open(corfile, "w")
2061 fp.write("\t") # indent the list of parameters
2062
2063 erritems = [] # list of values with errors
2064
2065 ignore_pars = [
2066 "DM",
2067 "START",
2068 "FINISH",
2069 "NTOA",
2070 "TRES",
2071 "TZRMJD",
2072 "TZRFRQ",
2073 "TZRSITE",
2074 "NITS",
2075 "ELAT",
2076 "ELONG",
2077 ] # keys to ignore from par file
2078
2079 for paritem in pppu.float_keys:
2080 if paritem in ignore_pars:
2081 continue
2082 if (
2083 psr["%s_ERR" % paritem] != None
2084 and psr["%s_FIT" % paritem] != None
2085 ): # get values with a given error (suffixed with _ERR)
2086 if psr["%s_FIT" % paritem] == 1:
2087 # set Gaussian prior with mean being the parameter value and sigma being the error
2088 if paritem in ["RA_RAD", "DEC_RAD"]:
2089 itemname = paritem.replace("_RAD", "")
2090 else:
2091 itemname = paritem
2092 prior_options[itemname] = {
2093 "priortype": "gaussian",
2094 "ranges": [psr[paritem], psr["%s_ERR" % paritem]],
2095 }
2096
2097 fp.write(itemname + " ")
2098 erritems.append(itemname)
2099
2100 if len(erritems) > 0:
2101 fp.write("\n")
2102
2103 for i, ei in enumerate(
2104 erritems
2105 ): # have all values uncorrelated except w0 and T0 or wdot and Pb, which should be highly correlated for small eccentricity binaries
2106 fp.write(ei + "\t")
2107 for j in range(i + 1):
2108 if i == j:
2109 fp.write(
2110 "1 "
2111 ) # diagonal values of correlation coefficient matrix
2112 else:
2113 # check if an eccentricity of between 0 and 0.001
2114 ecc = 0.0
2115 if psr["E"] != None:
2116 ecc = psr["E"]
2117 elif psr["ECC"] != None:
2118 ecc = psr["ECC"]
2119
2120 if ecc >= 0.0 and ecc < 0.001:
2121 if (
2122 (ei == "T0" and erritems[j] == "OM")
2123 or (ei == "OM" and erritems[j] == "T0")
2124 ) and ("T0" in erritems and "OM" in erritems):
2125 fp.write(
2126 "0.9999 "
2127 ) # set parameters to be highly correlated (although not fully due to issues with fully correlated parameters)
2128 elif (
2129 (ei == "PB" and erritems[j] == "OMDOT")
2130 or (ei == "OMDOT" and erritems[j] == "PB")
2131 ) and ("PB" in erritems and "OMDOT" in erritems):
2132 fp.write(
2133 "0.9999 "
2134 ) # set parameters to be highly correlated (although not fully due to issues with fully correlated parameters)
2135 else:
2136 fp.write("0 ")
2137 else:
2138 fp.write(
2139 "0 "
2140 ) # set everything else to be uncorrelated
2141 fp.write("\n")
2142 fp.close()
2143
2144 if len(erritems) > 0:
2145 self.pe_cor_files[pname] = corfile
2146 else: # no error value were found so remove corfile
2147 os.remove(corfile)
2148
2149 # check if deriving amplitude priors
2151 # check if there is a file containing a previous posterior to use for amplitude vs cosiota prior
2152 posteriorfile = None
2153 if self.pe_previous_posterior_files is None:
2154 if self.pe_previous_posteriors_file is not None:
2155 try:
2156 fp = open(self.pe_previous_posteriors_file, "r")
2157 self.pe_previous_posterior_files = json.load(fp)
2158 fp.close()
2159 posteriorfile = self.pe_previous_posterior_files[pname]
2160 except:
2161 print(
2162 "Error... could not open file '%s' listing previous posterior files."
2164 file=sys.stderr,
2165 )
2166 self.error_code = KNOPE_ERROR_GENERAL
2167 return outfile
2168 else:
2169 # check if current pulsar has a previous posterior sample file to use as prior
2170 if psr in self.pe_previous_posterior_files:
2171 posteriorfile = self.pe_previous_posterior_files[pname]
2172
2173 # open output prior file
2174 try:
2175 fp = open(outfile, "w")
2176 except:
2177 print(
2178 "Error... could not open prior file '%s'" % outfile, file=sys.stderr
2179 )
2180 self.error_code = KNOPE_ERROR_GENERAL
2181 return outfile
2182
2183 # write out any priors that have been given
2184 for prioritem in prior_options:
2185 if "priortype" not in prior_options[prioritem]:
2186 print(
2187 "Error... no 'priortype' given for parameter '%s'" % prioritem,
2188 file=sys.stderr,
2189 )
2190 self.error_code = KNOPE_ERROR_GENERAL
2191 return outfile
2192
2193 ptype = prior_options[prioritem]["priortype"]
2194
2195 if ptype != "gmm":
2196 if "ranges" not in prior_options[prioritem]:
2197 print(
2198 "Error... no 'ranges' given for parameter '%s'" % prioritem,
2199 file=sys.stderr,
2200 )
2201 self.error_code = KNOPE_ERROR_GENERAL
2202 return outfile
2203
2204 rangevals = prior_options[prioritem]["ranges"]
2205
2206 if len(rangevals) != 2:
2207 print(
2208 "Error... 'ranges' for parameter '%s' must be a list or tuple with two entries"
2209 % prioritem,
2210 file=sys.stderr,
2211 )
2212 self.error_code = KNOPE_ERROR_GENERAL
2213 return outfile
2214
2215 # ignore cosiota if using previous posterior file as prior
2216 if posteriorfile is not None and prioritem.upper() == "COSIOTA":
2217 continue
2218 else:
2219 fp.write(
2220 "%s\t%s\t%.16le\t%.16le\n"
2221 % (prioritem, ptype, rangevals[0], rangevals[1])
2222 )
2223 else:
2224 # check if item is prior for multiple parameters
2225 npars = len(prioritem.split(":"))
2226
2227 # output if a Gaussian Mixture Model prior is set
2228 if "nmodes" not in prior_options[prioritem]:
2229 print(
2230 "Error... no 'nmodes' given for parameter '{}'".format(
2231 prioritem
2232 ),
2233 file=sys.stderr,
2234 )
2235 self.error_code = KNOPE_ERROR_GENERAL
2236 return outfile
2237
2238 nmodes = prior_options[prioritem]["nmodes"]
2239
2240 if "means" not in prior_options[prioritem]:
2241 print(
2242 "Error... no 'means' given for parameter '{}'".format(
2243 prioritem
2244 ),
2245 file=sys.stderr,
2246 )
2247 self.error_code = KNOPE_ERROR_GENERAL
2248 return outfile
2249
2250 means = prior_options[prioritem]["means"]
2251
2252 if len(means) != nmodes:
2253 print(
2254 "Error... number of mean values must be equal to the number of modes",
2255 file=sys.stderr,
2256 )
2257 self.error_code = KNOPE_ERROR_GENERAL
2258 return outfile
2259
2260 for mean in means:
2261 if len(mean) != npars:
2262 print(
2263 "Error... number of mean values must be equal to the number of parameters",
2264 file=sys.stderr,
2265 )
2266 self.error_code = KNOPE_ERROR_GENERAL
2267 return outfile
2268
2269 if "covs" not in prior_options[prioritem]:
2270 print(
2271 "Error... no 'covs' given for parameter '{}'".format(
2272 prioritem
2273 ),
2274 file=sys.stderr,
2275 )
2276 self.error_code = KNOPE_ERROR_GENERAL
2277 return outfile
2278
2279 covs = prior_options[prioritem]["covs"]
2280
2281 if len(means) != nmodes:
2282 print(
2283 "Error... number of covariance matrices values must be equal to the number of modes",
2284 file=sys.stderr,
2285 )
2286 self.error_code = KNOPE_ERROR_GENERAL
2287 return outfile
2288
2289 for cov in covs:
2290 npcov = np.array(cov)
2291 if npcov.shape[0] != npcov.shape[1] and npcov.shape[1] != npars:
2292 print(
2293 "Error... number of covariance matrices rows/columns must be equal to the number of parameters",
2294 file=sys.stderr,
2295 )
2296 self.error_code = KNOPE_ERROR_GENERAL
2297 return outfile
2298
2299 if "weights" not in prior_options[prioritem]:
2300 print(
2301 "Error... no 'weights' given for parameter '{}'".format(
2302 prioritem
2303 ),
2304 file=sys.stderr,
2305 )
2306 self.error_code = KNOPE_ERROR_GENERAL
2307 return outfile
2308
2309 weights = prior_options[prioritem]["weights"]
2310
2311 if len(weights) != nmodes:
2312 print(
2313 "Error... number of weights must be equal to the number of modes",
2314 file=sys.stderr,
2315 )
2316 self.error_code = KNOPE_ERROR_GENERAL
2317 return outfile
2318
2319 if "ranges" in prior_options[prioritem]:
2320 ranges = prior_options[prioritem]["ranges"]
2321
2322 if len(ranges) != npars:
2323 print(
2324 "Error... number of ranges must be equal to the number of parameters",
2325 file=sys.stderr,
2326 )
2327 self.error_code = KNOPE_ERROR_GENERAL
2328 return outfile
2329
2330 for rangevals in ranges:
2331 if len(rangevals) != 2:
2332 print(
2333 "Error... ranges must have two values",
2334 file=sys.stderr,
2335 )
2336 self.error_code = KNOPE_ERROR_GENERAL
2337 return outfile
2338 else:
2339 ranges = None
2340
2341 fp.write("{}\tgmm\t".format(prioritem))
2342 fp.write("{}\t".format(nmodes))
2343 fp.write("{}\t".format(re.sub(r"\s+", "", str(means))))
2344 fp.write("{}\t".format(re.sub(r"\s+", "", str(covs))))
2345 fp.write("{}".format(re.sub(r"\s+", "", str(weights))))
2346
2347 if ranges is not None:
2348 for rangevals in ranges:
2349 fp.write("\t")
2350 fp.write("{}".format(re.sub(r"\s+", "", str(rangevals))))
2351
2352 fp.write("\n")
2353
2354 # set the required amplitude priors or parameters to estimate from previous posterior samples (and limits)
2355 requls = {} # dictionary to contain the required upper limits
2356 gmmpars = OrderedDict()
2357
2358 if self.pe_model_type == "waveform":
2359 if 2.0 in self.freq_factors:
2360 requls["C22"] = None
2361 gmmpars["C22"] = [0.0, np.inf] # limits on C22 prior
2362 if 1.0 in self.freq_factors:
2363 requls["C21"] = None
2364 gmmpars["C21"] = [0.0, np.inf] # limits on C21 prior
2365 elif self.pe_model_type == "source":
2366 if len(self.freq_factors) == 1:
2367 requls["H0"] = None
2368 gmmpars["H0"] = [0.0, np.inf] # limits on H0 prior
2369 if len(self.freq_factors) == 2:
2370 if 1.0 in self.freq_factors and 2.0 in self.freq_factors:
2371 requls["I21"] = None
2372 requls["I31"] = None
2373 gmmpars["I21"] = [0.0, np.inf]
2374 gmmpars["I31"] = [0.0, np.inf]
2375 gmmpars["COSIOTA"] = [
2376 -1.0,
2377 1.0,
2378 ] # limits on COSIOTA prior (required in all cases)
2379
2380 if len(requls) == 0:
2381 print(
2382 "Error... unknown frequency factors or model type in configuration file.",
2383 file=sys.stderr,
2384 )
2385 self.error_code = KNOPE_ERROR_GENERAL
2386 return outfile
2387
2388 # try and get the file containing previous upper limits
2389 if os.path.isfile(self.pe_amplitude_prior_file):
2390 # check file can be read
2391 if self.pe_prior_info is None:
2392 try:
2393 fpp = open(self.pe_amplitude_prior_file, "r")
2394 self.pe_prior_info = json.load(fpp) # should be JSON file
2395 fpp.close()
2396 except:
2397 print(
2398 "Error... could not parse prior file '%s'."
2400 file=sys.stderr,
2401 )
2402 self.error_code = KNOPE_ERROR_GENERAL
2403 return outfile
2404
2405 # see if pulsar is in prior file
2406 if pname in self.pe_prior_info:
2407 uls = self.pe_prior_info[pname]
2408 for ult in requls:
2409 if ult == "C22":
2410 if "C22UL" not in uls and "H0UL" in uls:
2411 # use 'H0' value for 'C22' if present
2412 requls["C22"] = uls["H0UL"]
2413 else:
2414 if ult + "UL" in uls:
2415 requls[ult] = uls[ult + "UL"]
2416
2417 # if there are some required amplitude limits that have not been obtained try and get amplitude spectral densities
2418 freq = psr["F0"]
2419
2420 if None in requls.values() and freq > 0.0 and posteriorfile is None:
2421 if (
2422 self.pe_amplitude_prior_asds is not None
2423 and self.pe_amplitude_prior_obstimes is not None
2424 ):
2425 asdfiles = self.pe_amplitude_prior_asds
2426 obstimes = self.pe_amplitude_prior_obstimes
2427
2428 if not isinstance(
2429 asdfiles, dict
2430 ): # if a single file is given convert into dictionary
2431 asdfilestmp = {}
2432 obstimestmp = {}
2433 asdfilestmp["det"] = asdfiles
2434 obstimestmp["det"] = float(obstimes)
2435 asdfiles = asdfilestmp
2436 obstimes = obstimestmp
2437
2438 asdlist = []
2439 for dk in asdfiles: # read in all the ASD files
2440 if dk not in obstimes:
2441 print(
2442 "Error... no corresponding observation times for detector '%s'"
2443 % dk,
2444 file=sys.stderr,
2445 )
2446 self.error_code = KNOPE_ERROR_GENERAL
2447 return outfile
2448 else:
2449 if not isinstance(obstimes[dk], float) and not isinstance(
2450 obstimes[dk], int
2451 ):
2452 print(
2453 "Error... observation time must be a float or int.",
2454 file=sys.stderr,
2455 )
2456 self.error_code = KNOPE_ERROR_GENERAL
2457 return outfile
2458 if dk not in self.pe_prior_asds:
2459 if not os.path.isfile(asdfiles[dk]):
2460 print(
2461 "Error... ASD file '%s' does not exist."
2462 % asdfiles[dk],
2463 file=sys.stderr,
2464 )
2465 self.error_code = KNOPE_ERROR_GENERAL
2466 return outfile
2467 else:
2468 try:
2469 self.pe_prior_asds[dk] = np.loadtxt(
2470 asdfiles[dk], comments=["%", "#"]
2471 )
2472 except:
2473 print(
2474 "Error... could not load file '%s'."
2475 % asdfiles[dk],
2476 file=sys.stderr,
2477 )
2478 self.error_code = KNOPE_ERROR_GENERAL
2479 return outfile
2480
2481 asd = self.pe_prior_asds[dk]
2482 asdv = [] # empty array
2483 if 1.0 in self.freq_factors and (
2484 asd[0, 0] <= freq and asd[-1, 0] >= freq
2485 ): # add ASD at 1f
2486 idxf = (
2487 np.abs(asd[:, 0] - freq)
2488 ).argmin() # get value nearest required frequency
2489 asdv.append(asd[idxf, 1])
2490 if 2.0 in self.freq_factors and (
2491 asd[0, 0] <= 2.0 * freq and asd[-1, 0] >= 2.0 * freq
2492 ):
2493 idxf = (
2494 np.abs(asd[:, 0] - 2.0 * freq)
2495 ).argmin() # get value nearest required frequency
2496 asdv.append(asd[idxf, 1])
2497
2498 if len(asdv) > 0:
2499 asdlist.append(
2500 np.array(asdv) ** 2 / (obstimes[dk] * 86400.0)
2501 )
2502 else:
2503 print(
2504 "Error... frequency range in ASD file does not span pulsar frequency.",
2505 file=sys.stderr,
2506 )
2507 self.error_code = KNOPE_ERROR_GENERAL
2508 return outfile
2509
2510 # get upper limit spectrum (harmonic mean of all the weighted spectra)
2511 mspec = np.zeros(len(self.freq_factors))
2512 for asdv in asdlist:
2513 # interpolate frequencies
2514 mspec = mspec + (1.0 / asdv)
2515
2516 mspec = np.sqrt(1.0 / mspec) # final weighted spectrum
2517 ulspec = (
2518 10.8 * mspec
2519 ) # scaled to given "averaged" 95% upper limit estimate
2520
2521 # set upper limits for creating priors
2522 if self.pe_model_type == "waveform":
2523 if 1.0 in self.freq_factors:
2524 if requls["C21"] == None:
2525 requls["C21"] = (
2526 ulspec[self.freq_factors.index(1.0)] * scalefactor
2527 )
2528 if 2.0 in self.freq_factors:
2529 if requls["C22"] == None:
2530 requls["C22"] = (
2531 ulspec[self.freq_factors.index(2.0)] * scalefactor
2532 )
2533 if self.pe_model_type == "source":
2534 if len(self.freq_factors) == 1:
2535 if requls["H0"] == None:
2536 requls["H0"] = ulspec[0] * scalefactor
2537 else:
2538 if 1.0 in self.freq_factors and 2.0 in self.freq_factors:
2539 # set both I21 and I31 to use the maximum of the 1f and 2f estimate
2540 if requls["I21"] == None:
2541 requls["I21"] = np.max(ulspec) * scalefactor
2542 if requls["I31"] == None:
2543 requls["I31"] = np.max(ulspec) * scalefactor
2544
2545 # get amplitude prior type
2546 if (
2547 self.pe_amplitude_prior_type not in ["fermidirac", "uniform"]
2548 and posteriorfile is None
2549 ):
2550 print(
2551 "Error... prior type must be 'fermidirac' or 'uniform'",
2552 file=sys.stderr,
2553 )
2554 self.error_code = KNOPE_ERROR_GENERAL
2555 return outfile
2556
2557 if posteriorfile is None:
2558 # go through required upper limits and output a Fermi-Dirac prior that also has a 95% limit at that value
2559 for ult in requls:
2560 if requls[ult] == None:
2561 print(
2562 "Error... a required upper limit for '%s' is not available."
2563 % ult,
2564 file=sys.stderr,
2565 )
2566 self.error_code = KNOPE_ERROR_GENERAL
2567 return outfile
2568 else:
2569 if self.pe_amplitude_prior_type == "fermidirac":
2570 try:
2571 b, a = self.fermidirac_rsigma(requls[ult])
2572 except:
2573 print(
2574 "Error... problem deriving the Fermi-Dirac prior for '%s'."
2575 % ult,
2576 file=sys.stderr,
2577 )
2578 self.error_code = KNOPE_ERROR_GENERAL
2579 return outfile
2580 else:
2581 a = 0.0 # uniform prior bound at 0
2582 b = requls[ult] / 0.95 # stretch limit to ~100% bound
2583
2584 fp.write(
2585 "%s\t%s\t%.16le\t%.16le\n"
2586 % (ult, self.pe_amplitude_prior_type, a, b)
2587 )
2588 else:
2589 # try and fit Gaussian Mixture Model to required amplitude parameters
2590 means, covs, weights, _ = self.gmm_prior(
2591 posteriorfile, gmmpars, taper="elliptical", decaywidth=1.0
2592 )
2593 if self.error_code == -1:
2594 print(
2595 "Error... could not set GMM prior using previous posterior samples.",
2596 file=sys.stderr,
2597 )
2598 return outfile
2599 # output GMM prior
2600 parssep = ":".join(gmmpars.keys())
2601 fp.write("%s\tgmm\t%d\t" % (parssep, len(means)))
2602 # write out means
2603 meanstr = ",".join(
2604 [
2605 "[" + ",".join([str(v) for v in vs.tolist()]) + "]"
2606 for vs in means
2607 ]
2608 )
2609 fp.write("[%s]\t" % meanstr)
2610 # write out covariance matrices
2611 covstr = ",".join(
2612 [
2613 "["
2614 + ",".join(
2615 [
2616 "[" + ",".join([str(ca) for ca in c]) + "]"
2617 for c in cs.tolist()
2618 ]
2619 )
2620 + "]"
2621 for cs in covs
2622 ]
2623 )
2624 fp.write("[%s]\t" % covstr)
2625 # write out weights
2626 fp.write("[%s]\t" % ",".join([str(w) for w in weights]))
2627 # write out limits for each parameter in turn
2628 for gp in gmmpars:
2629 fp.write("[%s]\t" % ",".join([str(lim) for lim in gmmpars[gp]]))
2630 fp.write("\n")
2631
2632 fp.close()
2633 else:
2634 # make prior file from parameters
2635 try:
2636 fp = open(outfile, "w")
2637 except:
2638 print(
2639 "Error... could not write prior file '%s'" % outfile,
2640 file=sys.stderr,
2641 )
2642 self.error_code = KNOPE_ERROR_GENERAL
2643 return outfile
2644
2645 for prioritem in prior_options:
2646 ptype = prior_options[prioritem]["priortype"]
2647
2648 if ptype != "gmm":
2649 rangevals = prior_options[prioritem]["ranges"]
2650
2651 if len(rangevals) != 2:
2652 print(
2653 "Error... the ranges in the prior for '%s' are not set properly"
2654 % prioritem,
2655 file=sys.stderr,
2656 )
2657 self.error_code = KNOPE_ERROR_GENERAL
2658 return outfile
2659 fp.write(
2660 "%s\t%s\t%.16e\t%.16e\n"
2661 % (prioritem, ptype, rangevals[0], rangevals[1])
2662 )
2663 else:
2664 # check if item is prior for multiple parameters
2665 npars = len(prioritem.split(":"))
2666
2667 # output if a Gaussian Mixture Model prior is set
2668 if "nmodes" not in prior_options[prioritem]:
2669 print(
2670 "Error... no 'nmodes' given for parameter '{}'".format(
2671 prioritem
2672 ),
2673 file=sys.stderr,
2674 )
2675 self.error_code = KNOPE_ERROR_GENERAL
2676 return outfile
2677
2678 nmodes = prior_options[prioritem]["nmodes"]
2679
2680 if "means" not in prior_options[prioritem]:
2681 print(
2682 "Error... no 'means' given for parameter '{}'".format(
2683 prioritem
2684 ),
2685 file=sys.stderr,
2686 )
2687 self.error_code = KNOPE_ERROR_GENERAL
2688 return outfile
2689
2690 means = prior_options[prioritem]["means"]
2691
2692 if len(means) != nmodes:
2693 print(
2694 "Error... number of mean values must be equal to the number of modes",
2695 file=sys.stderr,
2696 )
2697 self.error_code = KNOPE_ERROR_GENERAL
2698 return outfile
2699
2700 for mean in means:
2701 if len(mean) != npars:
2702 print(
2703 "Error... number of mean values must be equal to the number of parameters",
2704 file=sys.stderr,
2705 )
2706 self.error_code = KNOPE_ERROR_GENERAL
2707 return outfile
2708
2709 if "covs" not in prior_options[prioritem]:
2710 print(
2711 "Error... no 'covs' given for parameter '{}'".format(
2712 prioritem
2713 ),
2714 file=sys.stderr,
2715 )
2716 self.error_code = KNOPE_ERROR_GENERAL
2717 return outfile
2718
2719 covs = prior_options[prioritem]["covs"]
2720
2721 if len(means) != nmodes:
2722 print(
2723 "Error... number of covariance matrices values must be equal to the number of modes",
2724 file=sys.stderr,
2725 )
2726 self.error_code = KNOPE_ERROR_GENERAL
2727 return outfile
2728
2729 for cov in covs:
2730 npcov = np.array(cov)
2731 if npcov.shape[0] != npcov.shape[1] and npcov.shape[1] != npars:
2732 print(
2733 "Error... number of covariance matrices rows/columns must be equal to the number of parameters",
2734 file=sys.stderr,
2735 )
2736 self.error_code = KNOPE_ERROR_GENERAL
2737 return outfile
2738
2739 if "weights" not in prior_options[prioritem]:
2740 print(
2741 "Error... no 'weights' given for parameter '{}'".format(
2742 prioritem
2743 ),
2744 file=sys.stderr,
2745 )
2746 self.error_code = KNOPE_ERROR_GENERAL
2747 return outfile
2748
2749 weights = prior_options[prioritem]["weights"]
2750
2751 if len(weights) != nmodes:
2752 print(
2753 "Error... number of weights must be equal to the number of modes",
2754 file=sys.stderr,
2755 )
2756 self.error_code = KNOPE_ERROR_GENERAL
2757 return outfile
2758
2759 if "ranges" in prior_options[prioritems]:
2760 ranges = prior_options[prioritem]["ranges"]
2761
2762 if len(ranges) != npars:
2763 print(
2764 "Error... number of ranges must be equal to the number of parameters",
2765 file=sys.stderr,
2766 )
2767 self.error_code = KNOPE_ERROR_GENERAL
2768 return outfile
2769
2770 for rangevals in ranges:
2771 if len(rangevals) != 2:
2772 print(
2773 "Error... ranges must have two values",
2774 file=sys.stderr,
2775 )
2776 self.error_code = KNOPE_ERROR_GENERAL
2777 return outfile
2778 else:
2779 ranges = None
2780
2781 fp.write("{}\tgmm\t".format(prioritem))
2782 fp.write("{}\t".format(nmodes))
2783 fp.write("{}\t".format(re.sub(r"\s+", "", str(means))))
2784 fp.write("{}\t".format(re.sub(r"\s+", "", str(covs))))
2785 fp.write("{}".format(re.sub(r"\s+", "", str(weights))))
2786
2787 if ranges is not None:
2788 for rangevals in ranges:
2789 fp.write("\t")
2790 fp.write("{}".format(re.sub(r"\s+", "", str(rangevals))))
2791
2792 fp.write("\n")
2793
2794 fp.close()
2795
2796 return outfile
2797
2798 def fermidirac_rsigma(self, ul, mufrac=0.4, cdf=0.95):
2799 """
2800 Calculate the r and sigma parameter of the Fermi-Dirac distribution to be used.
2801
2802 Based on the definition of the distribution given in https://www.authorea.com/users/50521/articles/65214/_show_article
2803 the distribution will be defined by a mu parameter at which the distribution has 50% of it's maximum
2804 probability, and mufrac which is the fraction of mu defining the range from which the distribution falls from
2805 97.5% of the maximum down to 2.5%. Using an upper limit defining a given cdf of the distribution the parameters
2806 r and sigma will be returned.
2807 """
2808
2809 Z = 7.33 # factor that defined the 97.5% -> 2.5% probability attenuation band around mu
2810 r = 0.5 * Z / mufrac # set r
2811
2812 # using the Fermi-Dirac CDF to find sigma given a distribution where the cdf value is found at ul
2813 solution = optimize.root(
2814 lambda s: cdf * np.log(1.0 + np.exp(r))
2815 - np.log(1.0 + np.exp(-r))
2816 - (ul / s)
2817 - np.log(1.0 + np.exp((ul / s) - r)),
2818 ul,
2819 )
2820 sigma = solution.x[0]
2821
2822 return r, sigma
2823
2824 def gmm_prior(self, prevpostfile, pardict, ncomps=20, taper=None, decaywidth=5.0):
2825 """
2826 Create an ND Gaussian Mixture Model for use as a prior.
2827
2828 This will use the BayesianGaussianMixture Model from scikit-learn, which fits a Dirichlet Process Gaussian
2829 Mixture Model to the input data infering the number of components required. The input to this should be
2830 a previously calculated posterior sample file, or numpy array of samples. If a files is given then the
2831 parameters given as keys in the `pardict` ordered dictionary will be extracted. For each parameter name
2832 key in the `pardict` ordered there should be pairs of hard upper an lower limits of the particular parameters.
2833 If any of these are not +/-infinity then the samples will be duplicated and reflected around that limit. This
2834 is to avoid edge effects for the inferred Gaussian distributions. `ncomps` sets the hyperparameter used in
2835 the Dirichlet process related to the number of Gaussian components.
2836
2837 `taper` sets whether or not to taper-off any reflected samples, and how that tapering happens. Tapering can
2838 use: a 'gaussian' taper, where the half-width of the Gaussian is set by the range of the samples multiplied
2839 by `decaywidth`; a 'triangular' taper, which falls from one to zero over the range of the samples; an
2840 'exponential' taper, where the decay constant is defined by 'decaywidth' multiplied by the range of the
2841 samples; or, an 'elliptical' taper, where the axis of the ellipse is set by 'decaywidth' multiplied by the
2842 range of the samples. The default is that no tapering is applied, and it should be noted that tapering can
2843 still leave artifacts in the final GMM.
2844
2845 The means, covariance matrices and weights of the Gaussian components will be returned, along with
2846 the full set of points (including reflected points) used for the estimation.
2847
2848 An example of using this would be for "H0" versus "COSIOTA", in which case the `pardict` might be:
2849 >> pardict = OrderedDict()
2850 >> pardict['H0'] = [0., np.inf]
2851 >> pardict['COSIOTA'] = [-1., 1.]
2852 """
2853
2854 try:
2855 from sklearn import mixture
2856 except:
2857 print("Error... could not import scikit-learn.", file=sys.stderr)
2858 self.error_code = KNOPE_ERROR_GENERAL
2859 return None, None, None, None
2860
2861 means = None
2862 covs = None
2863 weights = None
2864
2865 if not isinstance(pardict, OrderedDict):
2866 print("Error... Input must be an ordered dictionary", file=sys.stderr)
2867 self.error_code = KNOPE_ERROR_GENERAL
2868 return means, covs, weights, None
2869
2870 npars = len(pardict)
2871 allsamples = []
2872
2873 # get samples
2874 try:
2875 if not isinstance(prevpostfile, (np.ndarray, np.generic)):
2876 # get samples from posterior sample hdf5 file
2877 if not os.path.isfile(prevpostfile):
2878 print(
2879 "Error... previous posterior sample file '%s' does not exist"
2880 % prevpostfile,
2881 file=sys.stderr,
2882 )
2883 self.error_code = KNOPE_ERROR_GENERAL
2884 return means, covs, weights, None
2885
2886 possamps, _, _ = pppu.pulsar_nest_to_posterior(prevpostfile)
2887 for par in pardict:
2888 allsamples.append(possamps[par.upper()].samples)
2889 else: # get samples fron numpy array
2890 if prevpostfile.shape[1] == npars:
2891 for i in range(npars):
2892 allsamples.append(
2893 prevpostfile[:, i].reshape(len(prevpostfile), 1)
2894 )
2895 else:
2896 print(
2897 "Error... input numpy array does not contain correct number of parameters",
2898 file=sys.stderr,
2899 )
2900 self.error_code = KNOPE_ERROR_GENERAL
2901 return means, covs, weights, None
2902 except:
2903 print(
2904 "Error... could not extract posterior samples from file or numpy array",
2905 file=sys.stderr,
2906 )
2907 self.error_code = KNOPE_ERROR_GENERAL
2908 return means, covs, weights, None
2909
2910 # reflect and duplicate samples if required for all parameters (for each reflection add to ncomp)
2911 allsamplesnp = np.copy(allsamples).squeeze().T
2912 for i, p in enumerate(pardict.keys()):
2913 refsamples = None
2914
2915 for lim in pardict[p]:
2916 if np.isfinite(lim):
2917 maxp = np.max(allsamples[i])
2918 minp = np.min(allsamples[i])
2919
2920 sigmap = decaywidth * (maxp - minp)
2921
2922 dist = lim - allsamplesnp[:, i]
2923
2924 refidxs = np.ones(len(allsamplesnp[:, i]), dtype=bool)
2925 # reflect about this limit (with given tapering if required)
2926 if taper is not None:
2927 if lim > maxp:
2928 deltav = allsamplesnp[:, i] + 2.0 * dist - maxp
2929 elif lim < minp:
2930 deltav = minp - (allsamplesnp[:, i] + 2.0 * dist)
2931 else:
2932 print(
2933 "Warning... limit is inside the extent of the samples",
2934 file=sys.stderr,
2935 )
2936 continue
2937
2938 probkeep = np.ones(len(allsamplesnp[:, i]))
2939 if taper == "gaussian":
2940 probkeep = np.exp(-0.5 * (deltav) ** 2 / sigmap**2)
2941 elif taper == "triangular":
2942 probkeep = 1.0 - (deltav) / (maxp - minp)
2943 elif taper == "exponential":
2944 probkeep = np.exp(-(deltav) / sigmap)
2945 elif taper == "elliptical":
2946 probkeep = np.zeros(len(allsamplesnp[:, i]))
2947 probkeep[deltav < sigmap] = np.sqrt(
2948 1.0 - (deltav[deltav < sigmap] / sigmap) ** 2
2949 )
2950 else:
2951 print(
2952 "Warning... unknown tapering has been set, so none will be applied",
2953 file=sys.stderr,
2954 )
2955
2956 refidxs = (
2957 np.random.rand(len(allsamplesnp[:, i])) < probkeep
2958 ).flatten()
2959
2960 thesesamples = allsamplesnp[refidxs, :]
2961 thesesamples[:, i] += 2.0 * dist[refidxs]
2962 if refsamples is None:
2963 refsamples = np.copy(thesesamples)
2964 else:
2965 refsamples = np.concatenate((refsamples, thesesamples))
2966
2967 # stack samples
2968 if refsamples is not None:
2969 allsamplesnp = np.concatenate((allsamplesnp, refsamples))
2970
2971 # scale parameters to avoid dynamic range issues
2972 parscales = np.std(allsamplesnp, axis=0)
2973 scalesmat = np.identity(npars) * parscales
2974 scaledsamples = allsamplesnp / parscales
2975
2976 # run DPGMM (tolerance and max_iter are increased over the default values
2977 # to aid convergence, although "unconverged" initialisations are generally
2978 # still fine to use)
2979 dpgmm = mixture.BayesianGaussianMixture(
2980 n_components=ncomps, covariance_type="full", tol=5e-2, max_iter=500
2981 ).fit(scaledsamples)
2982
2983 # predict the GMM components in which the samples live
2984 parpred = dpgmm.predict(scaledsamples)
2985
2986 # get the means, covariances and weights of the GMM components in which actually contain predicted samples
2987 means = []
2988 covs = []
2989 weights = []
2990 for i, (mean, covar, weight) in enumerate(
2991 zip(dpgmm.means_, dpgmm.covariances_, dpgmm.weights_)
2992 ):
2993 if np.any(
2994 parpred == i
2995 ): # check if any samples predicted to be in component i
2996 # check that mode is within 3.5 sigma of limits otherwise don't include it
2997 outlims = 0
2998 for mus, sigs, lowlim, highlim in zip(
2999 mean * parscales,
3000 parscales * np.sqrt(np.diag(covar)),
3001 [pardict[p][0] for p in pardict],
3002 [pardict[p][1] for p in pardict],
3003 ):
3004 if mus < lowlim - 3.0 * sigs or mus > highlim + 3.0 * sigs:
3005 outlims += 1
3006
3007 if outlims == 2:
3008 continue
3009
3010 # rescale to get back to true values
3011 means.append(mean * parscales)
3012 covs.append(np.dot(scalesmat, np.dot(covar, scalesmat)))
3013 weights.append(weight)
3014
3015 if len(means) == 0:
3016 print("Error... no GMM components returned", file=sys.stderr)
3017 self.error_code = KNOPE_ERROR_GENERAL
3018
3019 return means, covs, weights, allsamplesnp
3020
3021 def setup_heterodyne(self):
3022 """
3023 Setup the coarse and fine heterodyne jobs/nodes.
3024 """
3025
3026 # get executable
3028 "heterodyne", "heterodyne_exec", default="lalpulsar_heterodyne"
3029 )
3030 if self.error_code != 0:
3031 return
3032
3033 # check file exists and is executable
3034 if not os.path.isfile(self.heterodyne_exec) or not os.access(
3035 self.heterodyne_exec, os.X_OK
3036 ):
3037 print(
3038 "Warning... 'heterodyne_exec' in '[heterodyne]' does not exist or is not an executable. Try finding code in path."
3039 )
3040 hetexec = self.find_exec_file("lalpulsar_heterodyne")
3041
3042 if hetexec == None:
3043 print(
3044 "Error... could not find 'lalpulsar_heterodyne' in 'PATH'",
3045 file=sys.stderr,
3046 )
3047 self.error_code = KNOPE_ERROR_GENERAL
3048 return
3049 else:
3050 self.heterodyne_exec = hetexec
3051
3052 # Get Condor universe (default to vanilla)
3054 "heterodyne", "universe", default="vanilla"
3055 )
3056 if self.error_code != 0:
3057 return
3058
3059 # Get some general setup data
3061 "heterodyne", "filter_knee", "float", default=0.25
3062 )
3064 "heterodyne", "coarse_sample_rate", "int", default=16384
3065 )
3067 "heterodyne", "coarse_resample_rate", "float", default=1.0
3068 )
3070 "heterodyne", "channels", "dict"
3071 )
3073 "heterodyne", "binary_output", "boolean", default=True
3074 )
3076 "heterodyne", "gzip_coarse_output", "boolean", default=False
3077 )
3079 "heterodyne", "coarse_request_memory", "int", allownone=True
3080 )
3082 "heterodyne", "coarse_max_data_length", "int", default=512
3083 )
3084 if self.error_code != 0:
3085 return
3087 print(
3088 "Warning... cannot output coarse heterdyned data as gzip and binary. Defaulting to binary output."
3089 )
3091
3093 "heterodyne", "fine_resample_rate", "string", default="1/60"
3094 )
3096 "heterodyne", "stddev_thresh", "float", default=3.5
3097 )
3099 "heterodyne", "gzip_fine_output", "boolean", default=True
3100 )
3102 "heterodyne", "fine_request_memory", "int", allownone=True
3103 )
3104 if self.error_code != 0:
3105 return
3106
3107 # create heterodyne job
3108 chetjob = heterodyneJob(
3109 self.heterodyne_exec,
3110 univ=self.heterodyne_universe,
3111 accgroup=self.accounting_group,
3112 accuser=self.accounting_group_user,
3113 logdir=self.log_dir,
3114 rundir=self.run_dir,
3115 subprefix="coarse",
3116 requestmemory=self.coarse_heterodyne_request_memory,
3117 )
3118 fhetjob = heterodyneJob(
3119 self.heterodyne_exec,
3120 univ=self.heterodyne_universe,
3121 accgroup=self.accounting_group,
3122 accuser=self.accounting_group_user,
3123 logdir=self.log_dir,
3124 rundir=self.run_dir,
3125 subprefix="fine",
3126 requestmemory=self.fine_heterodyne_request_memory,
3127 )
3128
3129 self.processed_files = {} # dictionary of processed (fine heterodyned files)
3131 {}
3132 ) # dictionary for fine heterodyne nodes to use as parents to later jobs
3133
3135 getmodsciencesegs = (
3136 {}
3137 ) # flag for modified pulsars setting whether script needs to get the science segment list
3138 segfiletimes = (
3139 {}
3140 ) # (for unmodified pulsars) store segment files that have already been generated (keyed by the start time) to save repeated segment list finding calls
3141 for ifo in self.ifos:
3142 getmodsciencesegs[ifo] = True
3143 segfiletimes[ifo] = {}
3144
3145 # check if channels is a single value for each IFO or a list
3146 for ifo in self.ifos:
3147 if ifo not in self.coarse_heterodyne_channels:
3148 print(
3149 "Error... could channel not specified for '{}'".format(ifo),
3150 file=sys.stderr,
3151 )
3152 self.error_code = KNOPE_ERROR_GENERAL
3153 return
3154 else:
3155 if isinstance(self.coarse_heterodyne_channels[ifo], str):
3156 # convert to list
3157 self.coarse_heterodyne_channels[ifo] = [
3159 ]
3160 elif not isinstance(self.coarse_heterodyne_channels[ifo], list):
3161 print(
3162 "Error... channel must be a string or a list of strings",
3163 file=sys.stderr,
3164 )
3165 self.error_code = KNOPE_ERROR_GENERAL
3166 return
3167
3168 # loop through all pulsars
3169 for pname in self.analysed_pulsars:
3170 par = self.analysed_pulsars[pname]
3171 modified_pulsar = unmodified_pulsar = False
3172
3173 # check if pulsar is new/has modified par file, or has an unmodified par file
3174 if par in self.modified_pulsars:
3175 modified_pulsar = True
3176 elif par in self.unmodified_pulsars:
3177 unmodified_pulsar = True
3178
3179 # get ephemeris
3180 earthfile, sunfile, timefile = self.get_ephemeris(pppu.psr_par(par))
3181 if self.error_code != 0:
3182 return
3183
3184 segfiles = {}
3185
3186 self.fine_heterodyne_nodes[pname] = {}
3187 self.processed_files[pname] = {}
3188
3189 # loop through detectors and set up directory structure and get science segments on first pass
3190 for ifo in self.ifos[
3191 :
3192 ]: # use self.ifos[:] to create a copy of the IFOs list, so that removal can be done if required https://stackoverflow.com/a/8544571/1862861
3193 psrdir = os.path.join(self.preprocessing_base_dir[ifo], pname)
3194 self.mkdirs(psrdir)
3195 if self.error_code != 0:
3196 return
3197
3198 datadir = os.path.join(psrdir, "data")
3199 self.mkdirs(datadir)
3200 if self.error_code != 0:
3201 return
3202
3203 coarsedir = os.path.join(datadir, "coarse")
3204 self.mkdirs(coarsedir)
3205 if self.error_code != 0:
3206 return
3207
3208 finedir = os.path.join(datadir, "fine")
3209 self.mkdirs(finedir)
3210 if self.error_code != 0:
3211 return
3212
3213 segfiles[ifo] = os.path.join(psrdir, "segments.txt")
3214
3215 # get segment list
3216 if modified_pulsar:
3217 if getmodsciencesegs[ifo]:
3218 self.modified_pulsars_segment_list_tmp[ifo] = os.path.join(
3219 self.preprocessing_base_dir[ifo], "segments_tmp.txt"
3220 )
3221 self.find_data_segments(
3222 self.initial_start[ifo],
3223 self.endtime[ifo],
3224 ifo,
3226 )
3227 if self.error_code != 0:
3228 return
3229
3230 # copy segment list into pulsar directories
3231 try:
3232 shutil.copy2(
3233 self.modified_pulsars_segment_list_tmp[ifo], segfiles[ifo]
3234 )
3235 except:
3236 print(
3237 "Error... could not copy segment list into pulsar directory",
3238 file=sys.stderr,
3239 )
3240 self.error_code = KNOPE_ERROR_GENERAL
3241 return
3242
3243 if getmodsciencesegs[ifo]:
3244 getmodsciencesegs[ifo] = False
3245
3246 # remove any "previous_segments.txt" file if it exists
3247 prevsegs = os.path.join(
3248 os.path.dirname(segfiles[ifo]), "previous_segments.txt"
3249 )
3250 if os.path.isfile(prevsegs):
3251 try:
3252 os.remove(prevsegs)
3253 except:
3254 print(
3255 "Warning... previous segment list file '%s' could not be removed"
3256 % prevsegs
3257 )
3258 elif unmodified_pulsar:
3259 # append current segment file (if it exists) to file containing previous segments
3260 if os.path.isfile(segfiles[ifo]):
3261 # get filename for list containing all previous segments
3262 prevsegs = os.path.join(
3263 os.path.dirname(segfiles[ifo]), "previous_segments.txt"
3264 )
3265 if not os.path.isfile(prevsegs):
3266 shutil.copy2(
3267 segfiles[ifo], prevsegs
3268 ) # copy current (unupdated) segfile into previous segments file if it does not already exist
3269 self.segment_file_update.append(
3270 (segfiles[ifo], prevsegs)
3271 ) # add to list of files to be concatenated together at the end (i.e. provided no errors have occurred)
3272
3273 if self.autonomous:
3274 # if running in automatic mode make sure we don't miss data by using the end time in the previous segment
3275 # file as the new start time (rather than self.starttime)
3276 if os.path.isfile(segfiles[ifo]):
3277 # get end time from segment file
3278 p = sp.check_output("tail -1 " + segfiles[ifo], shell=True)
3279 if len(p) != 0:
3280 self.starttime[ifo] = [
3281 int(p.split()[1])
3282 ] # add as a list for consistency
3283 else:
3284 print(
3285 "Error... could not get end time out of previous segment file '%s'"
3286 % segfiles[ifo],
3287 file=sys.stderr,
3288 )
3289 self.error_code = KNOPE_ERROR_GENERAL
3290 return
3291
3292 # check if a segment file for the given start time has already been created, and if so use that
3293 if self.starttime[ifo][0] in segfiletimes[ifo]:
3294 shutil.copy2(
3295 segfiletimes[ifo][self.starttime[ifo][0]], segfiles[ifo]
3296 )
3297 else:
3298 segfiletimes[ifo][self.starttime[ifo][0]] = segfiles[ifo]
3299
3300 # create new segment file
3301 self.find_data_segments(
3302 self.starttime[ifo], self.endtime[ifo], ifo, segfiles[ifo]
3303 )
3304 if self.error_code != 0:
3305 return
3306
3307 # check if generated data segment file contains any segments and not in autonomous mode
3308 if (
3309 self.warning_code == KNOPE_WARNING_NO_SEGMENTS
3310 and not self.autonomous
3311 ):
3312 self.ifos.remove(ifo) # remove IFO for which no segments exist
3313 if len(self.ifos) == 0:
3314 print(
3315 "Error... no segments were available for any required IFOs",
3316 file=sys.stderr,
3317 )
3318 self.error_code = KNOPE_ERROR_NO_SEGMENTS
3319 return
3320 else:
3321 self.warning_code = 0
3322 continue
3323
3324 self.fine_heterodyne_nodes[pname][ifo] = {}
3325 self.processed_files[pname][ifo] = {}
3326
3327 # loop through frequency factors for analysis
3328 for freqfactor in self.freq_factors:
3329 if (
3330 not freqfactor % 1.0
3331 ): # for integers just output directory as e.g. 2f
3332 freqfacdircoarse = os.path.join(
3333 coarsedir, "%df" % int(freqfactor)
3334 )
3335 freqfacdirfine = os.path.join(finedir, "%df" % int(freqfactor))
3336 else: # for non-integers use 2 d.p. for dir name
3337 freqfacdircoarse = os.path.join(coarsedir, "%.2ff" % freqfactor)
3338 freqfacdirfine = os.path.join(finedir, "%.2ff" % freqfactor)
3339
3340 self.mkdirs(freqfacdircoarse)
3341 if self.error_code != 0:
3342 return
3343 self.mkdirs(freqfacdirfine)
3344 if self.error_code != 0:
3345 return
3346
3347 # loop over number of "datasets"
3348 finetmpfiles = (
3349 []
3350 ) # file list to concatenate if there is more than one "dataset"
3351 if (
3352 self.ndatasets[ifo] > 1
3353 ): # create concatenation node for fine heterodyne
3354 fineoutput = os.path.join(
3355 freqfacdirfine,
3356 "fine-%s-%d-%d.txt"
3357 % (
3358 ifo,
3359 int(self.starttime[ifo][0]),
3360 int(self.endtime[ifo][-1]),
3361 ),
3362 )
3364 fineoutput += ".gz"
3365 concatjob = concatJob(
3366 subfile=os.path.join(freqfacdirfine, "concat.sub"),
3367 output=fineoutput,
3368 accgroup=self.accounting_group,
3369 accuser=self.accounting_group_user,
3370 logdir=self.log_dir,
3371 )
3372 for k in range(self.ndatasets[ifo]):
3373 # create output files
3374 if (
3375 self.warning_code != KNOPE_WARNING_NO_SEGMENTS
3376 ): # only add coarse heterodyne if there is segment data
3378 coarseoutput = os.path.join(
3379 freqfacdircoarse,
3380 "coarse-%s-%d-%d.bin"
3381 % (
3382 ifo,
3383 int(self.starttime[ifo][k]),
3384 int(self.endtime[ifo][k]),
3385 ),
3386 )
3387 else:
3388 coarseoutput = os.path.join(
3389 freqfacdircoarse,
3390 "coarse-%s-%d-%d.txt"
3391 % (
3392 ifo,
3393 int(self.starttime[ifo][k]),
3394 int(self.endtime[ifo][k]),
3395 ),
3396 )
3397
3398 # create coarse heterodyne node
3399 coarsenode = heterodyneNode(chetjob)
3400
3401 # add data find parent to coarse job
3402 if self.datafind_nodes is not None:
3403 coarsenode.add_parent(self.datafind_nodes[ifo][k])
3404
3405 # set data, segment location, channel and output location
3406 coarsenode.set_data_file(self.cache_files[ifo][k])
3407 coarsenode.set_max_data_length(
3409 )
3410 coarsenode.set_seg_list(segfiles[ifo])
3411 coarsenode.set_channel(
3412 self.coarse_heterodyne_channels[ifo][k]
3413 )
3414 coarsenode.set_output_file(coarseoutput)
3416 coarsenode.set_binoutput()
3418 coarsenode.set_gzip_output()
3419
3420 # set some coarse heterodyne info
3421 coarsenode.set_ifo(ifo) # detector
3422 coarsenode.set_het_flag(0) # perform coarse heterodyne
3423 coarsenode.set_pulsar(pname) # pulsar name
3424 coarsenode.set_param_file(par) # pulsar parameter file
3425 coarsenode.set_filter_knee(
3427 )
3428 coarsenode.set_sample_rate(
3430 )
3431 coarsenode.set_resample_rate(
3433 )
3434 coarsenode.set_freq_factor(
3435 freqfactor
3436 ) # multiplicative factor for the rotation frequency
3437
3438 self.add_node(coarsenode)
3439
3440 if (
3441 self.warning_code != KNOPE_WARNING_NO_SEGMENTS
3442 ): # only add fine heterodyne if there is segment data
3443 # create fine heterodyne node
3444 finenode = heterodyneNode(fhetjob)
3445
3446 if self.ndatasets[ifo] > 1:
3447 # create concatnode
3448 if k == 0:
3449 concatnode = concatNode(concatjob)
3450 concatnode.add_parent(
3451 finenode
3452 ) # add fine heterodyne as parent
3453
3454 # add coarse parent to fine job
3455 finenode.add_parent(coarsenode)
3456
3457 # create output files
3458 fineoutput = os.path.join(
3459 freqfacdirfine,
3460 "fine-%s-%d-%d.txt"
3461 % (
3462 ifo,
3463 int(self.starttime[ifo][k]),
3464 int(self.endtime[ifo][k]),
3465 ),
3466 )
3467 finetmpfiles.append(fineoutput)
3468
3469 # set data, segment location and output location
3471 finenode.set_data_file(coarseoutput + ".gz")
3472 else:
3473 finenode.set_data_file(coarseoutput)
3474
3476 finenode.set_bininput()
3477
3478 finenode.set_seg_list(segfiles[ifo])
3479 finenode.set_output_file(fineoutput)
3480
3481 # set fine heterodyne info
3482 finenode.set_ifo(ifo)
3483 finenode.set_het_flag(1) # perform fine heterodyne
3484 finenode.set_pulsar(pname) # pulsar name
3485 finenode.set_param_file(par) # pulsar parameter file
3486 finenode.set_sample_rate(
3488 ) # use resample rate from coarse heterodyne
3489 finenode.set_resample_rate(
3491 )
3492 finenode.set_filter_knee(self.coarse_heterodyne_filter_knee)
3493 finenode.set_freq_factor(freqfactor)
3494 finenode.set_stddev_thresh(
3496 )
3497 finenode.set_ephem_earth_file(earthfile)
3498 finenode.set_ephem_sun_file(sunfile)
3499 finenode.set_ephem_time_file(timefile)
3500
3502 finenode.set_gzip_output()
3503 finetmpfiles[-1] += ".gz"
3504
3505 self.add_node(finenode)
3506
3507 if self.ndatasets[ifo] > 1:
3508 if k == self.ndatasets[ifo] - 1:
3509 concatnode.set_files(finetmpfiles)
3510 self.add_node(concatnode)
3511 # reset output name to that for the concatenated file
3512 fineoutput = os.path.join(
3513 freqfacdirfine,
3514 "fine-%s-%d-%d.txt"
3515 % (
3516 ifo,
3517 int(self.starttime[ifo][0]),
3518 int(self.endtime[ifo][-1]),
3519 ),
3520 )
3521
3522 # remove the seperate find heterodyned files
3523 rmjob = removeJob(
3524 accgroup=self.accounting_group,
3525 accuser=self.accounting_group_user,
3526 logdir=self.log_dir,
3527 rundir=self.run_dir,
3528 ) # job for removing old files
3529 rmnode = removeNode(rmjob)
3530 rmnode.set_files(finetmpfiles)
3531 rmnode.add_parent(concatnode)
3532 self.add_node(rmnode)
3533
3534 # record fine nodes and files
3535 if self.ndatasets[ifo] > 1:
3536 self.fine_heterodyne_nodes[pname][ifo][
3537 freqfactor
3538 ] = concatnode # set concatenation node as parent of future children
3539 else:
3540 self.fine_heterodyne_nodes[pname][ifo][
3541 freqfactor
3542 ] = finenode
3544 fineoutput = (
3545 fineoutput + ".gz"
3546 ) # append .gz to recorded file name
3547
3548 if modified_pulsar:
3549 if self.warning_code != KNOPE_WARNING_NO_SEGMENTS:
3550 self.processed_files[pname][ifo][freqfactor] = [fineoutput]
3551 elif unmodified_pulsar:
3552 # check for other fine heterodyned files already in the directory
3553 hetfilescheck = [
3554 os.path.join(freqfacdirfine, hf)
3555 for hf in os.listdir(freqfacdirfine)
3556 if "fine" in hf
3557 ]
3558 hetfilescheck.sort()
3559
3560 # if gzipping the current file, but older files are not gzipped, gzip them now
3562 for i, hf in enumerate(hetfilescheck):
3563 if ".gz" not in hf:
3564 # try gzipping file
3565 p = sp.Popen("gzip " + hf, shell=True)
3566 out, err = p.communicate()
3567 if p.returncode != 0:
3568 print(
3569 "Error... could not gzip previous fine heterodyned file '%s': %s, %s"
3570 % (hf, out, err),
3571 file=sys.stderr,
3572 )
3573 self.error_code = KNOPE_ERROR_GENERAL
3574 return
3575
3576 hetfilescheck[i] = hf + ".gz" # add .gz suffix
3577 else: # alternatively, if not gzipping current file, but older files are gzipped then gunzip them now
3578 for i, hf in enumerate(hetfilescheck):
3579 if ".gz" in hf:
3580 # try gunzipping file
3581 p = sp.Popen("gunzip " + hf, shell=True)
3582 out, err = p.communicate()
3583 if p.returncode != 0:
3584 print(
3585 "Error... could not gunzip previous fine heterodyned file '%s': %s, %s"
3586 % (hf, out, err),
3587 file=sys.stderr,
3588 )
3589 self.error_code = KNOPE_ERROR_GENERAL
3590 return
3591
3592 self.processed_files[pname][ifo][freqfactor] = hetfilescheck
3593 if (
3594 self.warning_code == KNOPE_WARNING_NO_SEGMENTS
3595 ): # if there are no segments this time, and also no previous fine heterodyned data then ignore this IFO
3596 if len(hetfilescheck) == 0:
3597 self.ifos.remove(
3598 ifo
3599 ) # remove IFO for which no segments exist
3600 if len(self.ifos) == 0:
3601 print(
3602 "Error... no segments were available for any required IFOs",
3603 file=sys.stderr,
3604 )
3605 self.error_code = KNOPE_ERROR_NO_SEGMENTS
3606 return
3607 else:
3608 self.warning_code = 0
3609 continue
3610
3611 self.processed_files[pname][ifo][freqfactor].append(
3612 fineoutput
3613 ) # append new file
3614
3615 # remove temporary segment file
3616 if len(self.modified_pulsars) > 0:
3617 for ifo in self.ifos:
3618 try:
3619 if os.path.isfile(self.modified_pulsars_segment_list_tmp[ifo]):
3620 os.remove(self.modified_pulsars_segment_list_tmp[ifo])
3621 except:
3622 print(
3623 "Warning... could not remove temporary segment list '%s'"
3625 )
3626
3627 def setup_splinter(self):
3628 """
3629 Setup the Spectral Interpolation jobs/nodes.
3630 """
3631
3632 # NOTE: splinter runs on multiple pulsars at once, so requires a directory containing the necessary pulsars,
3633 # therefore for new/modified par files need to be linked to from one directory, and unmodifed par files need
3634 # to be linked to from another directory
3635
3636 # Spectral interpolation output files have the format Splinter_PSRJNAME_DET e.g. Splinter_J0534+2200_H1
3637
3638 # get executable
3640 "splinter", "splinter_exec", default="lalpulsar_SplInter"
3641 )
3642 if self.error_code != 0:
3643 return
3644
3647
3648 # check file exists and is executable
3649 if not os.path.isfile(self.splinter_exec) or not os.access(
3650 self.splinter_exec, os.X_OK
3651 ):
3652 print(
3653 "Warning... 'splinter_exec' in '[splinter]' does not exist or is not an executable. Try finding code in path."
3654 )
3655
3656 splexec = self.find_exec_file("lalpulsar_SplInter")
3657
3658 if splexec == None:
3659 print(
3660 "Error... could not find 'lalpulsar_SplInter' in 'PATH'",
3661 file=sys.stderr,
3662 )
3663 self.error_code = KNOPE_ERROR_GENERAL
3664 return
3665 else:
3666 self.splinter_exec = splexec
3667
3668 # Get Condor universe (default to vanilla)
3670 "splinter", "universe", default="vanilla"
3671 )
3672 if self.error_code != 0:
3673 return
3674
3675 # check any memory requirements for a splinter job
3677 "splinter", "splinter_request_memory", "int", allownone=True
3678 )
3679
3680 # create splinter job
3681 spljob = splinterJob(
3682 self.splinter_exec,
3683 univ=self.splinter_universe,
3684 accgroup=self.accounting_group,
3685 accuser=self.accounting_group_user,
3686 logdir=self.log_dir,
3687 rundir=self.run_dir,
3688 requestmemory=self.splinter_request_memory,
3689 )
3690
3691 # create job for moving splinter output files
3692 mvjob = moveJob(
3693 accgroup=self.accounting_group,
3694 accuser=self.accounting_group_user,
3695 logdir=self.log_dir,
3696 rundir=self.run_dir,
3697 )
3698
3699 # get splinter options
3701 "splinter", "bandwidth", "float", 0.3
3702 )
3704 "splinter", "freq_range", "list", [30.0, 2000.0]
3705 )
3707 "splinter", "stddev_thresh", "float", 3.5
3708 )
3710 "splinter", "min_seg_length", "int", 1800
3711 )
3713 "splinter", "max_seg_length", "int", 21600
3714 ) # max seg length defaults to 1/4 day
3716 "splinter", "gzip_output", "boolean", False
3717 )
3718 if self.error_code != 0:
3719 return
3720
3722 print(
3723 "Error... minimum segment length ({}) for SplInter is larger than maximum segment length ({})".format(
3725 ),
3726 file=sys.stderr,
3727 )
3728 self.error_code = KNOPE_ERROR_GENERAL
3729 return
3730
3731 # create splinter nodes (one for unmodified pulsars and one for modified pulsars)
3736
3737 # set location for SplInter data to eventually be copied to
3739
3740 self.processed_files = {} # dictionary of processed (splinter) files
3741
3742 # check for whether there are any modified or unmodified pulsars
3743 modpars = False
3744 unmodpars = False
3745 if len(self.modified_pulsars) > 0:
3746 modpars = True
3747 if len(self.unmodified_pulsars) > 0:
3748 unmodpars = True
3749
3750 for ifo in self.ifos[
3751 :
3752 ]: # iterate over copy of self.ifos as may need to remove IFOs if no segments found
3753 # Create splinter base directory to contain data products and par file directories
3754 self.splinter_dir = os.path.join(
3755 self.preprocessing_base_dir[ifo], "splinter"
3756 )
3757 self.mkdirs(self.splinter_dir)
3758 if self.error_code != 0:
3759 return
3760
3761 # Create par file directories: one containing links to files for modified pulsars and one for unmodified pulsars - if directories already exist then remove files from them
3762 for modsuffix in ["modified", "unmodified"]:
3763 pardir = os.path.join(self.splinter_dir, modsuffix)
3764
3765 if modsuffix == "modified":
3766 if modpars is True:
3767 self.splinter_modified_pulsar_dir = pardir
3768 parlist = self.modified_pulsars
3769 self.splinter_nodes_modified[ifo] = {}
3770 else:
3771 continue
3772
3773 if modsuffix == "unmodified":
3774 if unmodpars is True:
3775 self.splinter_unmodified_pulsar_dir = pardir
3776 parlist = self.unmodified_pulsars
3777 self.splinter_nodes_unmodified[ifo] = {}
3778 else:
3779 continue
3780
3781 if os.path.isdir(pardir):
3782 # remove all files within the directory
3783 for f in os.listdir(pardir):
3784 try:
3785 os.remove(os.path.join(pardir, f))
3786 except:
3787 print(
3788 "Warning... problem removing par file '%s' from '%s'. This file may be overwritten."
3789 % (f, pardir)
3790 )
3791 else: # make the directory and make symbolic links to all the modified par files in to
3792 self.mkdirs(pardir)
3793 if self.error_code != 0:
3794 return
3795
3796 for par in parlist:
3797 try:
3798 psr = pppu.psr_par(par)
3799 pname = psr["PSRJ"]
3800
3801 parlink = os.path.join(pardir, os.path.basename(par))
3802 if modsuffix == "modified":
3803 self.splinter_modified_pars[pname] = parlink
3804 if modsuffix == "unmodified":
3805 self.splinter_unmodified_pars[pname] = parlink
3806 os.symlink(par, parlink)
3807
3808 # create dictionary to hold list of names of the output files that will be created by lalpulsar_SplInter
3809 if pname not in self.processed_files:
3810 self.processed_files[pname] = {}
3811 self.splinter_data_location[pname] = {}
3812
3813 self.processed_files[pname][ifo] = {}
3814 self.splinter_data_location[pname][ifo] = {}
3815
3816 # create directory for the output file to eventually moved into
3817 psrdir = os.path.join(self.preprocessing_base_dir[ifo], pname)
3818 self.mkdirs(psrdir)
3819 if self.error_code != 0:
3820 return
3821
3822 datadir = os.path.join(psrdir, "data")
3823 self.mkdirs(datadir)
3824 if self.error_code != 0:
3825 return
3826
3827 splintercpydir = os.path.join(datadir, "splinter")
3828 self.mkdirs(splintercpydir)
3829 if self.error_code != 0:
3830 return
3831
3832 for freqfactor in self.freq_factors:
3833 if (
3834 not freqfactor % 1.0
3835 ): # for integers just output director as e.g. 2f
3836 ffdir = os.path.join(
3837 splintercpydir, "%df" % int(freqfactor)
3838 )
3839 else: # for non-integers use 2 d.p. for dir name
3840 ffdir = os.path.join(
3841 splintercpydir, "%.3ff" % int(freqfactor)
3842 )
3843
3844 # the name of the file that will be output by lalpulsar_SplInter
3845 self.processed_files[pname][ifo][freqfactor] = [
3846 os.path.join(ffdir, "SplInter_%s_%s" % (pname, ifo))
3847 ]
3848 if self.splinter_gzip_output:
3849 self.processed_files[pname][ifo][freqfactor][
3850 0
3851 ] += ".gz" # add gz suffix for gzipped files
3852
3853 # the location to move that file to
3854 self.mkdirs(ffdir)
3855 if self.error_code != 0:
3856 return
3857 self.splinter_data_location[pname][ifo][freqfactor] = ffdir
3858 except:
3859 print(
3860 "Warning... could not create link to par file '%s' in '%s'. This file may be overwritten."
3861 % (par, pardir)
3862 )
3863
3864 # reset modpars and unmodpars based on whether any files are in self.splinter_(un)modified_pars
3865 if len(self.splinter_modified_pars) == 0:
3866 modpars = False
3867 if len(self.splinter_unmodified_pars) == 0:
3868 unmodpars = False
3869
3870 # Create segment list for modified pulsars
3871 modsegfile = None
3872 if modpars:
3873 modsegfile = os.path.join(
3874 self.preprocessing_base_dir[ifo], "segments_modified.txt"
3875 )
3876 self.find_data_segments(
3877 self.initial_start[ifo], self.endtime[ifo], ifo, modsegfile
3878 )
3879 if self.error_code != 0:
3880 return
3881
3882 if self.warning_code == KNOPE_WARNING_NO_SEGMENTS:
3883 self.ifos.remove(ifo) # remove current IFO from job
3884 if len(self.ifos) == 0:
3885 print(
3886 "Error... no segments could were available for any required IFOs",
3887 file=sys.stderr,
3888 )
3889 self.error_code = KNOPE_ERROR_NO_SEGMENTS
3890 return
3891 else:
3892 self.warning_code = 0
3893 continue
3894
3895 # Create segment list for unmodified pulsars
3896 unmodsegfile = None
3897 if unmodpars:
3898 unmodsegfile = os.path.join(
3899 self.preprocessing_base_dir[ifo], "segments_unmodified.txt"
3900 )
3901
3902 if self.autonomous:
3903 # if running in automatic mode make sure we don't miss data by using the end time in the previous segment
3904 # file as the new start time (rather than self.starttime)
3905 if os.path.isfile(unmodsegfile):
3906 # get end time from segment list
3907 p = sp.check_output("tail -1 " + unmodsegfile, shell=True)
3908 if len(p) != 0:
3909 self.starttime[ifo] = int(p.split()[1])
3910 else:
3911 print(
3912 "Error... could not get end time out of previous segment file '%s'"
3913 % unmodsegfile,
3914 file=sys.stderr,
3915 )
3916 self.error_code = KNOPE_ERROR_GENERAL
3917 return
3918
3919 # create new segment file
3920 self.find_data_segments(
3921 self.starttime[ifo], self.endtime[ifo], ifo, unmodsegfile
3922 )
3923 if self.error_code != 0:
3924 return
3925
3926 if self.warning_code == KNOPE_WARNING_NO_SEGMENTS:
3927 self.ifos.remove(ifo) # remove current IFO from job
3928 if len(self.ifos) == 0:
3929 print(
3930 "Error... no segments were available for any required IFOs",
3931 file=sys.stderr,
3932 )
3933 self.error_code = KNOPE_ERROR_NO_SEGMENTS
3934 return
3935 else:
3936 self.warning_code = 0
3937 continue
3938
3939 # append segments to file containing all previously analysed segments
3940 # get filename for list containing all previous segments
3941 prevsegs = os.path.join(
3942 self.preprocessing_base_dir[ifo], "previous_segments.txt"
3943 )
3944 if not os.path.isfile(prevsegs):
3945 shutil.copy2(
3946 unmodsegfile, prevsegs
3947 ) # copy current (unupdated) segfile into previous segments file if it does not already exist
3948 self.segment_file_update.append(
3949 (unmodsegfile, prevsegs)
3950 ) # add to list of files to be concatenated together at the end (i.e. provided no errors have occurred)
3951
3952 # loop through frequency factors for analysis
3953 for freqfactor in self.freq_factors:
3954 if not freqfactor % 1.0: # for integers just output director as e.g. 2f
3955 splinterdir = os.path.join(
3956 self.splinter_dir, "%df" % int(freqfactor)
3957 )
3958 else: # for non-integers use 2 d.p. for dir name
3959 splinterdir = os.path.join(self.splinter_dir, "%.2ff" % freqfactor)
3960
3961 self.mkdirs(splinterdir)
3962 if self.error_code != 0:
3963 return
3964
3965 # create nodes (first for modified pulsars and second for unmodified)
3966 splsegfiles = [modsegfile, unmodsegfile]
3967 splpardirs = [
3970 ]
3971
3972 for idx, splbool in enumerate([modpars, unmodpars]):
3973 if splbool:
3974 splnode = splinterNode(spljob)
3975
3976 splnode.add_parent(self.datafind_nodes[ifo][0])
3977
3978 splnode.set_sft_lalcache(
3979 self.cache_files[ifo]
3980 ) # SFT cache file
3981 splnode.set_output_dir(splinterdir) # output directory
3982 splnode.set_param_dir(
3983 splpardirs[idx]
3984 ) # pulsar parameter file directory
3985 splnode.set_seg_list(
3986 splsegfiles[idx]
3987 ) # science segment list file
3988 splnode.set_freq_factor(freqfactor) # frequency scaling factor
3989 splnode.set_ifo(ifo) # detector
3990
3991 splnode.set_bandwidth(
3993 ) # bandwidth of data to use
3994 splnode.set_min_seg_length(
3996 ) # minimum length of usable science segments
3997 splnode.set_max_seg_length(
3999 ) # maximum segment length (use to stop too many SFTs being read in at once and eating all the memory)
4000 splnode.set_start_freq(
4001 self.splinter_freq_range[0]
4002 ) # low frequency cut-off to read from SFTs
4003 splnode.set_end_freq(
4004 self.splinter_freq_range[1]
4005 ) # high frequency cut-off to read from SFTs
4006 splnode.set_stddev_thresh(
4008 ) # threshold for vetoing data
4009 splnode.set_ephem_dir(
4010 self.ephem_path
4011 ) # path to solar system ephemeris files
4012 if self.splinter_gzip_output:
4013 splnode.set_gzip_output() # gzip the output files
4014
4015 if idx == 0:
4016 self.splinter_nodes_modified[ifo][freqfactor] = splnode
4017 else:
4018 self.splinter_nodes_unmodified[ifo][freqfactor] = splnode
4019 self.add_node(splnode) # add node to DAG
4020
4021 # move all created files into the required directory structure
4022 for par in parlist:
4023 psr = pppu.psr_par(par)
4024 pname = psr["PSRJ"]
4025 mvnode = moveNode(mvjob)
4026 splinterfile = os.path.join(
4027 splinterdir, "SplInter_%s_%s" % (pname, ifo)
4028 )
4029 if self.splinter_gzip_output:
4030 splinterfile += ".gz"
4031 mvnode.set_source(splinterfile)
4032 mvnode.set_destination(
4033 self.processed_files[pname][ifo][freqfactor][0]
4034 )
4035 mvnode.add_parent(splnode)
4036 self.add_node(mvnode)
4037
4038 def get_ephemeris(self, psr):
4039 """
4040 Get the ephemeris file information based on the content of the psr file
4041 """
4042
4043 # get ephemeris value from par file
4044 ephem = psr["EPHEM"]
4045 if ephem is None: # if not present default to DE405
4046 ephem = "DE405"
4047
4048 # get the time units (TDB)
4049 units = psr["UNITS"]
4050 if units is None: # default to TEMPO2 standard of TCB
4051 units = "TCB"
4052
4053 earthfile = os.path.join(self.ephem_path, "earth00-40-%s.dat.gz" % ephem)
4054 sunfile = os.path.join(self.ephem_path, "sun00-40-%s.dat.gz" % ephem)
4055
4056 if units == "TDB":
4057 timefile = os.path.join(self.ephem_path, "tdb_2000-2040.dat.gz")
4058 else:
4059 timefile = os.path.join(self.ephem_path, "te405_2000-2040.dat.gz")
4060
4061 if not os.path.isfile(earthfile):
4062 print(
4063 "Error... Earth ephemeris file '%s' does not exist" % earthfile,
4064 file=sys.stderr,
4065 )
4066 self.error_code = KNOPE_ERROR_GENERAL
4067 return
4068
4069 if not os.path.isfile(sunfile):
4070 print(
4071 "Error... Sun ephemeris file '%s' does not exist" % sunfile,
4072 file=sys.stderr,
4073 )
4074 self.error_code = KNOPE_ERROR_GENERAL
4075 return
4076
4077 if not os.path.isfile(timefile):
4078 print(
4079 "Error... time ephemeris file '%s' does not exist" % timefile,
4080 file=sys.stderr,
4081 )
4082 self.error_code = KNOPE_ERROR_GENERAL
4083 return
4084
4085 return earthfile, sunfile, timefile
4086
4087 def concatenate_files(self):
4088 """
4089 Create jobs to concatenate fine heterodyned/Splinter data files in cases where multiple files exist (e.g. in automated search).
4090 Go though the processed file list and find ones that have more than one file, also also remove previous files.
4091 """
4092
4093 rmjob = removeJob(
4094 accgroup=self.accounting_group,
4095 accuser=self.accounting_group_user,
4096 logdir=self.log_dir,
4097 rundir=self.run_dir,
4098 ) # job for removing old files
4099 self.remove_job = rmjob
4100
4101 self.concat_nodes = {} # concatenation job nodes
4102
4103 # loop over pulsars
4104 for pitem in self.processed_files: # pitems will be pulsar names
4105 pifos = self.processed_files[pitem]
4106
4107 # loop through detectors
4108 for pifo in pifos:
4109 ffs = pifos[pifo]
4110
4111 # loop through frequency factors
4112 for ff in ffs:
4113 finefiles = ffs[ff]
4114 concatnode = None
4115 subloc = None
4116 prevfile = None
4117 newfinefile = None
4118 catfiles = None
4119
4120 # check for more than one file
4121 if self.engine == "heterodyne": # do this for heterodyned data
4122 if len(finefiles) > 1:
4123 if pitem not in self.concat_nodes:
4124 self.concat_nodes[pitem] = {}
4125
4126 if pifo not in self.concat_nodes[pitem]:
4127 self.concat_nodes[pitem][pifo] = {}
4128
4129 # get start time of first file and end time of last file (assuming files have name fine-det-start-end.txt)
4130 firststart = os.path.basename(finefiles[0]).split("-")[2]
4131 lastend = os.path.basename(finefiles[-1]).split("-")[3]
4132 lastend = lastend.replace(".txt", "")
4133 lastend = lastend.replace(".gz", "")
4134
4135 # create new file name for concatenated file
4136 newfinefile = os.path.join(
4137 os.path.dirname(finefiles[0]),
4138 "fine-%s-%s-%s.txt" % (pifo, firststart, lastend),
4139 )
4141 newfinefile = newfinefile + ".gz"
4142
4143 catfiles = finefiles
4144 try: # fine heterodyne node may not exist if (in autonomous mode) no new segments were found, but previous data did exist
4145 parent = self.fine_heterodyne_nodes[pitem][pifo][
4146 ff
4147 ] # add equivalent fine heterodyned node as parent
4148 except:
4149 parent = None
4150
4151 # create new concat job for each case (each one uses it's own sub file)
4152 subloc = os.path.join(
4153 os.path.dirname(finefiles[0]), "concat.sub"
4154 )
4155 elif self.engine == "splinter": # do this for splinter data
4156 # check if final location already contains a file
4157 prevfile = os.listdir(
4158 self.splinter_data_location[pitem][pifo][ff]
4159 )
4160
4161 if len(prevfile) > 1:
4162 print(
4163 "Error... more than one previous Splinter file in directory '%s'"
4164 % self.splinter_data_location[pitem][pifo][ff]
4165 )
4166 self.error_code = KNOPE_ERROR_GENERAL
4167 return
4168
4169 if (
4170 pitem in self.splinter_modified_pars
4171 ): # for modified pulsars set start time (for file name) from run values
4172 startname = str(self.starttime[pifo][0])
4173 else: # a previous file exists use the start time from that (for file name)
4174 if len(prevfile) == 1:
4175 try:
4176 startname = os.path.basename(prevfile).split("-")[1]
4177 except:
4178 print(
4179 "Warning... could not get previous start time from Splinter file name '%s'. Using start time from this run: '%d'"
4180 % (prevfile, self.starttime[pifo])
4181 )
4182 startname = str(self.starttime[pifo][0])
4183
4184 catfiles = [
4185 os.path.join(
4186 self.splinter_data_location[pitem][pifo][ff],
4187 prevfile[0],
4188 ),
4189 finefiles,
4190 ]
4191 else:
4192 startname = str(self.starttime[pifo][0])
4193 try: # splinter node may not exist if (in autonomous mode) no new segments were found, but previous data did exist
4194 parent = self.splinter_nodes_unmodified[pifo][ff]
4195 except:
4196 parent = None
4197
4198 endname = str(self.endtime[pifo][-1])
4199
4200 # create new file name containing analysis time stamps
4201 newfinefile = os.path.join(
4202 self.splinter_data_location[pitem][pifo][ff],
4203 "%s-%s-%s.txt"
4204 % (
4205 (os.path.basename(finefiles[0])).strip(".gz"),
4206 startname,
4207 endname,
4208 ),
4209 )
4210 if self.splinter_gzip_output:
4211 newfinefile = newfinefile + ".gz"
4212
4213 # create new concat job for each case (each one uses it's own sub file)
4214 subloc = os.path.join(
4215 self.splinter_data_location[pitem][pifo][ff], "concat.sub"
4216 )
4217
4218 # create new concat job for each case (each one uses it's own sub file)
4219 if subloc is not None and catfiles is not None:
4220 concatjob = concatJob(
4221 subfile=subloc,
4222 output=newfinefile,
4223 accgroup=self.accounting_group,
4224 accuser=self.accounting_group_user,
4225 logdir=self.log_dir,
4226 )
4227 concatnode = concatNode(concatjob)
4228 concatnode.set_files(catfiles)
4229 if parent is not None:
4230 concatnode.add_parent(parent)
4231 self.add_node(concatnode)
4232
4233 self.concat_nodes[pitem][pifo][ff] = concatnode
4234
4235 # make the processed file now just contain the final file location
4236 self.processed_files[pitem][pifo][ff] = [newfinefile]
4237
4238 # remove original files
4239 if (
4240 len(finefiles) > 1 or self.engine == "splinter"
4241 ) and concatnode != None:
4242 rmnode = removeNode(rmjob)
4243
4244 if prevfile != None: # remove previous files
4245 finefiles.append(
4246 os.path.join(
4247 self.splinter_data_location[pitem][pifo][ff],
4248 prevfile[0],
4249 )
4250 )
4251
4252 rmnode.set_files(finefiles)
4253 if subloc is not None and catfiles is not None:
4254 rmnode.add_parent(concatnode) # only do after concatenation
4255 self.add_node(rmnode)
4256
4258 self, section, option, cftype=None, default=None, allownone=False
4259 ):
4260 """
4261 Get a value of type cftype ('string', 'int', 'float', 'boolean', 'list', 'dict' or 'dir') from the configuration parser object.
4263 Return value on success and None on failure
4264 """
4265
4266 value = None # return value for failure to parse option
4267
4268 if cftype == None or cftype == "string" or cftype == "dir":
4269 try:
4270 value = self.config.get(section, option)
4271 except:
4272 if not allownone:
4273 if not isinstance(default, str):
4274 print(
4275 "Error... could not parse '%s' option from '[%s]' section."
4276 % (option, section),
4277 file=sys.stderr,
4278 )
4279 self.error_code = KNOPE_ERROR_GENERAL
4280 else:
4281 print(
4282 "Warning... could not parse '%s' option from '[%s]' section. Defaulting to %s."
4283 % (option, section, default)
4284 )
4285 value = default
4286 elif cftype == "float":
4287 try:
4288 value = self.config.getfloat(section, option)
4289 except:
4290 if not allownone:
4291 if not isinstance(default, float):
4292 print(
4293 "Error... could not parse '%s' float option from '[%s]' section."
4294 % (option, section),
4295 file=sys.stderr,
4296 )
4297 self.error_code = KNOPE_ERROR_GENERAL
4298 else:
4299 print(
4300 "Warning... could not parse '%s' float option from '[%s]' section. Defaulting to %f."
4301 % (option, section, default)
4302 )
4303 value = default
4304 elif cftype == "boolean":
4305 try:
4306 value = self.config.getboolean(section, option)
4307 except:
4308 if not allownone:
4309 if not isinstance(default, bool):
4310 print(
4311 "Error... could not parse '%s' boolean option from '[%s]' section."
4312 % (option, section),
4313 file=sys.stderr,
4314 )
4315 self.error_code = KNOPE_ERROR_GENERAL
4316 else:
4317 print(
4318 "Warning... could not parse '%s' boolean option from '[%s]' section. Defaulting to %r."
4319 % (option, section, default)
4320 )
4321 value = default
4322 elif cftype == "int":
4323 try:
4324 value = self.config.getint(section, option)
4325 except:
4326 if not allownone:
4327 if not isinstance(default, int):
4328 print(
4329 "Error... could not parse '%s' int option from '[%s]' section."
4330 % (option, section),
4331 file=sys.stderr,
4332 )
4333 self.error_code = KNOPE_ERROR_GENERAL
4334 else:
4335 print(
4336 "Warning... could not parse '%s' int option from '[%s]' section. Defaulting to %d."
4337 % (option, section, default)
4338 )
4339 value = default
4340 elif cftype == "list":
4341 try:
4342 value = ast.literal_eval(self.config.get(section, option))
4343 if not isinstance(value, list):
4344 value = [value]
4345 except:
4346 if not allownone:
4347 if not isinstance(default, list):
4348 print(
4349 "Error... could not parse '%s' list option from '[%s]' section."
4350 % (option, section),
4351 file=sys.stderr,
4352 )
4353 self.error_code = KNOPE_ERROR_GENERAL
4354 else:
4355 print(
4356 "Warning... could not parse '%s' list option from '[%s]' section. Defaulting to [%s]."
4357 % (option, section, ", ".join(default))
4358 )
4359 value = default
4360 elif cftype == "dict":
4361 try:
4362 value = ast.literal_eval(self.config.get(section, option))
4363
4364 if not isinstance(value, dict) and isinstance(default, dict):
4365 print(
4366 "Warning... could not parse '%s' dictionary option from '[%s]' section. Defaulting to %s."
4367 % (option, section, str(default))
4368 )
4369 value = default
4370 elif not isinstance(value, dict) and not isinstance(default, dict):
4371 if not allownone:
4372 print(
4373 "Error... could not parse '%s' dictionary option from '[%s]' section."
4374 % (option, section),
4375 file=sys.stderr,
4376 )
4377 self.error_code = KNOPE_ERROR_GENERAL
4378 else:
4379 value = None
4380 except:
4381 if not allownone:
4382 if not isinstance(default, dict):
4383 print(
4384 "Error... could not parse '%s' dictionary option from '[%s]' section."
4385 % (option, section),
4386 file=sys.stderr,
4387 )
4388 self.error_code = KNOPE_ERROR_GENERAL
4389 else:
4390 print(
4391 "Warning... could not parse '%s' dictionary option from '[%s]' section. Defaulting to %s."
4392 % (option, section, str(default))
4393 )
4394 value = default
4395 else:
4396 print(
4397 "Error... unknown trying to get unknown type '%s' from configuration file"
4398 % cftype,
4399 file=sys.stderr,
4400 )
4401 self.error_code = KNOPE_ERROR_GENERAL
4402
4403 return value
4404
4405 def check_universe(self, universe):
4406 """
4407 Check Condor universe is 'local', 'vanilla' or 'standard'
4408 """
4409 if universe not in ["local", "vanilla", "standard"]:
4410 return True
4411 else:
4412 return False
4413
4414 def setup_datafind(self):
4415 """
4416 Create and setup the data find jobs.
4417 """
4418
4419 # Create data find job
4420 self.cache_files = {}
4421 self.datafind_job = None
4422
4423 if self.config.has_option("condor", "datafind"):
4424 # check if a dictionary of ready made cache files has been given
4425 datafind = self.get_config_option(
4426 "condor", "datafind", cftype="dict", allownone=True
4427 )
4428
4429 if datafind is not None:
4430 # check there is a file for each detector and that they exist
4431 for ifo in self.ifos:
4432 if ifo not in datafind:
4433 print(
4434 "Warning... no frame/SFT cache file given for %s, try using system gw_data_find instead"
4435 % ifo
4436 )
4437 datafindexec = self.find_exec_file("gw_data_find")
4438 if datafindexec is None:
4439 print(
4440 "Error... could not find 'gw_data_find' in your 'PATH'",
4441 file=sys.stderr,
4442 )
4443 self.error_code = KNOPE_ERROR_GENERAL
4444 return
4445 else:
4446 self.config.set(
4447 "condor", "datafind", datafindexec
4448 ) # set value in config file parser
4449 else:
4450 if not isinstance(datafind[ifo], list):
4451 datafind[ifo] = [datafind[ifo]]
4452
4453 self.cache_files[ifo] = []
4454 for cachefile in datafind[ifo]:
4455 if not os.path.isfile(cachefile):
4456 print(
4457 "Warning... frame/SFT cache file '%s' does not exist, try using system gw_data_find instead"
4458 % ifo
4459 )
4460 datafindexec = self.find_exec_file("gw_data_find")
4461 if datafindexec is None:
4462 print(
4463 "Error... could not find 'gw_data_find' in your 'PATH'",
4464 file=sys.stderr,
4465 )
4466 self.error_code = KNOPE_ERROR_GENERAL
4467 return
4468 else:
4469 self.config.set(
4470 "condor", "datafind", datafindexec
4471 ) # set value in config file parser
4472 else:
4473 self.cache_files[ifo].append(cachefile)
4474
4475 # check if a datafind job is needed for any of the detectors
4476 if len(self.cache_files) < len(self.ifos):
4477 self.datafind_job = pipeline.LSCDataFindJob(
4478 list(self.preprocessing_base_dir.values())[0],
4479 self.log_dir,
4480 self.config,
4481 )
4482 else: # a data find exectable has been given
4483 datafind = self.get_config_option("condor", "datafind")
4484
4485 if os.path.isfile(datafind) and os.access(datafind, os.X_OK):
4486 self.datafind_job = pipeline.LSCDataFindJob(
4487 list(self.preprocessing_base_dir.values())[0],
4488 self.log_dir,
4489 self.config,
4490 )
4491 else:
4492 print(
4493 "Warning... data find executable '%s' does not exist, or is not executable, try using system gw_data_find instead"
4494 % datafind
4495 )
4496 datafindexec = self.find_exec_file("gw_data_find")
4497 if datafindexec is None:
4498 print(
4499 "Error... could not find 'gw_data_find' in your 'PATH'",
4500 file=sys.stderr,
4501 )
4502 self.error_code = KNOPE_ERROR_GENERAL
4503 return
4504 else:
4505 self.config.set(
4506 "condor", "datafind", datafindexec
4507 ) # set value in config file parser
4508 self.datafind_job = pipeline.LSCDataFindJob(
4509 list(self.preprocessing_base_dir.values())[0],
4510 self.log_dir,
4511 self.config,
4512 )
4513 else:
4514 # if no data find is specified try using the system gw_data_find
4515 datafindexec = self.find_exec_file("gw_data_find")
4516 if datafindexec is None:
4517 print(
4518 "Error... could not find 'gw_data_find' in your 'PATH'",
4519 file=sys.stderr,
4520 )
4521 self.error_code = KNOPE_ERROR_GENERAL
4522 return
4523 else:
4524 self.config.set(
4525 "condor", "datafind", datafindexec
4526 ) # set value in config file parser
4527 self.datafind_job = pipeline.LSCDataFindJob(
4528 list(self.preprocessing_base_dir.values())[0],
4529 self.log_dir,
4530 self.config,
4531 )
4532
4533 # add additional options to data find job
4534 if self.datafind_job is not None:
4535 self.datafind_job.add_condor_cmd("accounting_group", self.accounting_group)
4536
4537 if self.accounting_group_user is not None:
4538 self.datafind_job.add_condor_cmd(
4539 "accounting_group_user", self.accounting_group_user
4540 )
4541
4542 # reset the sub file location
4543 self.datafind_job.set_sub_file(os.path.join(self.run_dir, "datafind.sub"))
4544
4545 # Set gw_data_find nodes (one for each detector)
4546 if len(self.cache_files) < len(self.ifos):
4547 self.set_datafind_nodes()
4548 else:
4549 self.datafind_nodes = None
4550
4551 def set_datafind_nodes(self):
4552 """
4553 Add data find nodes to a dictionary of nodes
4554 """
4555
4556 self.datafind_nodes = {}
4557
4558 if self.config.has_option("datafind", "type"):
4559 frtypes = ast.literal_eval(self.config.get("datafind", "type"))
4560
4561 if not isinstance(frtypes, dict):
4562 print(
4563 "Error... the data find 'types' must be a dictionary of values for each detector",
4564 file=sys.stderr,
4565 )
4566 self.error_code = KNOPE_ERROR_GENERAL
4567 return
4568 else:
4569 print("Error... no frame 'type' specified for data find", file=sys.stderr)
4570 self.error_code = KNOPE_ERROR_GENERAL
4571 return
4572
4573 for ifo in self.ifos:
4574 frtypelist = [] # list of frame types
4575 if not ifo in frtypes:
4576 print("Error... no data find type for %s" % ifo, file=sys.stderr)
4577 self.error_code = KNOPE_ERROR_GENERAL
4578 return
4579 else:
4580 if isinstance(frtypes[ifo], str):
4581 for i in range(self.ndatasets[ifo]):
4582 frtypelist.append(frtypes[ifo])
4583 elif isinstance(frtypes[ifo], list):
4584 frtypelist = frtypes[ifo]
4585 if len(frtypelist) != self.ndatasets[ifo]:
4586 print(
4587 "Error... the number of frame types must be the same as the number of start times",
4588 file=sys.stderr,
4589 )
4590 self.error_code = KNOPE_ERROR_GENERAL
4591 return
4592 else:
4593 print(
4594 "Error... frame types for '{}' must be a single string or a list of strings".format(
4595 ifo
4596 ),
4597 file=sys.stderr,
4598 )
4599 self.error_code = KNOPE_ERROR_GENERAL
4600 return
4601
4602 # check if cache file is already set for the given detector
4603 if ifo in self.cache_files:
4604 continue
4605
4606 self.datafind_nodes[ifo] = []
4607 self.cache_files[ifo] = []
4608
4609 if self.engine == "splinter":
4610 # for splinter we can concatenate the data find output SFT caches together
4611 concatjob = concatJob(
4612 subfile=os.path.join(
4613 self.preprocessing_base_dir[ifo], "concat.sub"
4614 ),
4615 output=os.path.join(self.preprocessing_base_dir[ifo], "cache.lcf"),
4616 accgroup=self.accounting_group,
4617 accuser=self.accounting_group_user,
4618 logdir=self.log_dir,
4619 )
4620
4621 for i in range(self.ndatasets[ifo]):
4622 dfnode = pipeline.LSCDataFindNode(self.datafind_job)
4623 dfnode.set_observatory(ifo[0])
4624 dfnode.set_type(frtypelist[i])
4625 dfnode.set_start(self.initial_start[ifo][i])
4626 dfnode.set_end(self.endtime[ifo][i])
4627
4628 # reset the default LSCDataFindJob output cache filename
4629 if self.ndatasets[ifo] == 1:
4630 cachefile = os.path.join(
4631 self.preprocessing_base_dir[ifo], "cache.lcf"
4632 )
4633 else:
4634 cachefile = os.path.join(
4635 self.preprocessing_base_dir[ifo], "cache_{}.lcf".format(i)
4636 )
4637 self.cache_files[ifo].append(cachefile)
4638 dfnode.set_output(cachefile)
4639
4640 self.datafind_nodes[ifo].append(dfnode)
4641 self.add_node(dfnode) # add nodes into DAG
4642
4643 if self.engine == "splinter":
4644 # for splinter we can concatenate the data find output SFT caches together
4645 if self.ndatasets[ifo] == 0:
4646 concatnode = concatNode(concatjob)
4647 concatnode.set_files(self.cache_files[ifo])
4648 for dfn in self.datafind_nodes[ifo]:
4649 concatnode.add_parent(dfn)
4650 self.add_node(concatnode)
4651
4652 # reset the self.datafind_nodes to the concatnode for future parents to use
4653 self.datafind_nodes[ifo] = [concatnode]
4654
4655 # reset name of cache file
4656 self.cache_files[ifo] = [
4657 os.path.join(self.preprocessing_base_dir[ifo], "cache.lcf")
4658 ]
4659
4660 def find_data_segments(self, starttime, endtime, ifo, outfile):
4661 """
4662 Find data segments and output them to an acsii text segment file containing the start and end times of
4663 each segment. Pairs of start and end times in the `starttime` and `endtime` lists will be iterated over
4664 and all segments within those pair will be concatenated into the final file.
4665
4666 Args:
4667 starttime (list): a list of GPS start times
4668 endtime (list): a list of GPS end times
4669 ifo (str): a detector name, e.g., 'H1'
4670 outfile (str): the file to output the segment list file to
4671 """
4672
4673 # check if segment file(s) given (as a dictionary)
4674 segfiles = self.get_config_option(
4675 "segmentfind", "segfind", cftype="dict", allownone=True
4676 )
4677 if segfiles is not None: # check if it is a dictionary
4678 if ifo not in segfiles:
4679 print("Error... No segment file given for '%s'" % ifo)
4680 self.error_code = KNOPE_ERROR_GENERAL
4681 return
4682 else:
4683 segfile = segfiles[ifo]
4684
4685 # check segment file exists
4686 if not os.path.isfile(segfile):
4687 print("Error... segment file '%s' does not exist." % segfile)
4688 self.error_code = KNOPE_ERROR_GENERAL
4689 return
4690 else:
4691 # copy segment file to the 'outfile' location
4692 try:
4693 shutil.copyfile(segfile, outfile)
4694 except:
4695 print(
4696 "Error... could not copy segment file to location of '%s'."
4697 % outfile
4698 )
4699 self.error_code = KNOPE_ERROR_GENERAL
4700 return # exit function
4701
4702 # otherwise try and get the segment list using gwpy
4703 try:
4704 from gwpy.segments import DataQualityFlag
4705 except ImportError:
4706 print("Error... gwpy is required for finding segment lists")
4707 self.error_code = KNOPE_ERROR_GENERAL
4708 return
4709
4710 # get server
4711 server = self.get_config_option(
4712 "segmentfind", "server", default="https://segments.ligo.org"
4713 )
4714
4715 # get segment types to include
4716 segmenttypes = self.get_config_option(
4717 "segmentfind", "segmenttype", cftype="dict"
4718 )
4719
4720 # get segment types to exclude
4721 excludesegs = self.get_config_option(
4722 "segmentfind", "excludetype", cftype="dict", allownone=True
4723 )
4724
4725 # check segment types dictionary contains type for the given detector
4726 if ifo not in segmenttypes:
4727 print("Error... No segment type for %s" % ifo, file=sys.stderr)
4728 self.error_code = KNOPE_ERROR_GENERAL
4729 return
4730
4731 if isinstance(starttime, int) and isinstance(endtime, int):
4732 sts = [starttime] # have start time as a list
4733 ets = [endtime] # have end time as a list
4734 elif isinstance(starttime, list) and isinstance(endtime, list):
4735 if len(starttime) != len(endtime):
4736 print(
4737 "Error... list of start and end times for the segments are not the same lengths",
4738 file=sys.stderr,
4739 )
4740 self.error_code = KNOPE_ERROR_GENERAL
4741 return
4742
4743 for st, et in zip(starttime, endtime):
4744 if st > et:
4745 print("Error... start time comes before end time!", file=sys.stderr)
4746 self.error_code = KNOPE_ERROR_GENERAL
4747 return
4748 sts = starttime
4749 ets = endtime
4750 else:
4751 print("Error... start and end times must be integers or lists of integers")
4752 self.error_code = KNOPE_ERROR_GENERAL
4753 return
4754
4755 sidx = 0
4756 try:
4757 segfp = open(outfile, "w")
4758 except IOError:
4759 print("Error... could not open segment list file")
4760 self.error_code = KNOPE_ERROR_GENERAL
4761 return
4762
4763 # loop over start and end time pairs
4764 for st, et in zip(sts, ets):
4765 # check whether there are different segment types for the different times
4766 if isinstance(segmenttypes[ifo], list):
4767 if not isinstance(segmenttypes[ifo][sidx], str):
4768 print("Error... segment types must be a string")
4769 self.error_code = KNOPE_ERROR_GENERAL
4770 return
4771 elif len(segmenttypes[ifo]) != len(sts):
4772 print(
4773 "Error... number of segment types is not the same as the number of start times"
4774 )
4775 self.error_code = KNOPE_ERROR_GENERAL
4776 return
4777 else:
4778 segmenttype = segmenttypes[ifo][sidx]
4779 else:
4780 if not isinstance(segmenttypes[ifo], str):
4781 print("Error... segment types must be a string")
4782 self.error_code = KNOPE_ERROR_GENERAL
4783 return
4784 else:
4785 segmenttype = segmenttypes[ifo]
4786
4787 # is multiple segment types are specified (comma seperated string), loop
4788 # over them and find the intersection
4789 segtypes = [
4790 sts.strip() for sts in segmenttype.split(",")
4791 ] # split and strip whitespace
4792
4793 segquery = None
4794 for i in range(len(segtypes)):
4795 # create query
4796 query = DataQualityFlag.query_dqsegdb(segtypes[i], st, et, url=server)
4797 query = query.active # use "active" segment list
4798
4799 if excludesegs is not None:
4800 # check whether there are different exclusion types for the different times
4801 if ifo in excludesegs:
4802 if isinstance(excludesegs[ifo], list):
4803 if not isinstance(excludesegs[ifo][sidx], str):
4804 print("Error... exclude types must be a string")
4805 self.error_code = KNOPE_ERROR_GENERAL
4806 return
4807 elif len(excludesegs[ifo]) != len(sts):
4808 print(
4809 "Error... number of exclude types is not the same as the number of start times"
4810 )
4811 self.error_code = KNOPE_ERROR_GENERAL
4812 return
4813 else:
4814 excludetype = excludesegs[ifo][sidx]
4815 else:
4816 if not isinstance(excludesegs[ifo], str):
4817 print("Error... exclude types must be a string")
4818 self.error_code = KNOPE_ERROR_GENERAL
4819 return
4820 else:
4821 excludetype = excludesegs[ifo]
4822
4823 if len(excludetype) > 0:
4824 segextypes = [
4825 sts.strip() for sts in excludetype.split(",")
4826 ] # split and strip whitespace
4827
4828 for j in range(len(segextypes)):
4829 exquery = DataQualityFlag.query_dqsegdb(
4830 segextypes[j], st, et, url=server
4831 )
4832
4833 # exclude segments
4834 query = query & ~exquery.active
4835
4836 if segquery is None:
4837 segquery = query.copy()
4838 else:
4839 # get intersection of segments
4840 segquery = segquery & query
4841
4842 # write out segment list
4843 for thisseg in segquery:
4844 print(int(thisseg[0]), int(thisseg[1]), file=segfp)
4845
4846 sidx += 1
4847
4848 segfp.close()
4849
4850 def find_exec_file(self, filename):
4851 """
4852 Search through the PATH environment for the first instance of the executable file "filename"
4853 """
4854
4855 if os.path.isfile(filename) and os.access(filename, os.X_OK):
4856 return filename # already have the executable file
4857
4858 # search through PATH environment variable
4859 for d in os.environ["PATH"].split(os.pathsep):
4860 filecheck = os.path.join(d, filename)
4861 if os.path.isfile(filecheck) and os.access(filecheck, os.X_OK):
4862 return filecheck
4863
4864 return None
4865
4866 def mkdirs(self, path):
4867 """
4868 Helper function. Make the given directory, creating intermediate
4869 dirs if necessary, and don't complain about it already existing.
4870 """
4871 if os.access(path, os.W_OK) and os.path.isdir(path):
4872 return
4873 else:
4874 try:
4875 os.makedirs(path)
4876 except:
4877 print("Error... cannot make directory '%s'" % path, file=sys.stderr)
4878 self.error_code = KNOPE_ERROR_GENERAL
4879
4880
4881class heterodyneJob(pipeline.CondorDAGJob, pipeline.AnalysisJob):
4882 """
4883 A lalpulsar_heterodyne job to heterodyne the data.
4884 """
4885
4887 self,
4888 execu,
4889 univ="vanilla",
4890 accgroup=None,
4891 accuser=None,
4892 logdir=None,
4893 rundir=None,
4894 subprefix="",
4895 requestmemory=None,
4896 ):
4897 self.__executable = execu
4898 self.__universe = univ
4899 pipeline.CondorDAGJob.__init__(self, self.__universe, self.__executable)
4900 pipeline.AnalysisJob.__init__(self, None)
4901
4902 if accgroup != None:
4903 self.add_condor_cmd("accounting_group", accgroup)
4904 if accuser != None:
4905 self.add_condor_cmd("accounting_group_user", accuser)
4906
4907 self.add_condor_cmd("getenv", "True")
4908
4909 if requestmemory is not None:
4910 if isinstance(requestmemory, int):
4911 self.add_condor_cmd("request_memory", requestmemory)
4912
4913 # set log files for job
4914 if logdir != None:
4915 self.set_stdout_file(os.path.join(logdir, "heterodyne-$(cluster).out"))
4916 self.set_stderr_file(os.path.join(logdir, "heterodyne-$(cluster).err"))
4917 else:
4918 self.set_stdout_file("heterodyne-$(cluster).out")
4919 self.set_stderr_file("heterodyne-$(cluster).err")
4920
4921 if rundir != None:
4922 self.set_sub_file(os.path.join(rundir, subprefix + "heterodyne.sub"))
4923 else:
4924 self.set_sub_file(subprefix + "heterodyne.sub")
4925
4926
4927class heterodyneNode(pipeline.CondorDAGNode, pipeline.AnalysisNode):
4928 """
4929 A heterodyneNode runs an instance of lalpulsar_heterodyne in a condor DAG.
4930 """
4931
4932 def __init__(self, job):
4933 """
4934 job = A CondorDAGJob that can run an instance of lalpulsar_heterodyne
4935 """
4936 pipeline.CondorDAGNode.__init__(self, job)
4937 pipeline.AnalysisNode.__init__(self)
4938
4939 # initilise job variables
4940 self.__ifo = None
4941 self.__param_file = None
4942 self.__freq_factor = None
4943 self.__filter_knee = None
4944 self.__sample_rate = None
4945 self.__resample_rate = None
4946 self.__data_file = None
4947 self.__channel = None
4948 self.__seg_list = None
4949 self.__data_file = None
4950 self.__output_file = None
4951 self.__het_flag = None
4952 self.__pulsar = None
4953 self.__high_pass = None
4954 self.__scale_fac = None
4955 self.__manual_epoch = None
4956 self.__gzip = False
4957 self.__max_data_length = None
4958
4959 def set_data_file(self, data_file):
4960 # set file containing data to be heterodyne (either list of frames or coarse het output)
4961 self.add_var_opt("data-file", data_file)
4962 self.__data_file = data_file
4963
4964 def set_max_data_length(self, maxdatalength):
4965 # set the maximum length of data to be read from a frame file
4966 self.add_var_opt("data-chunk-length", maxdatalength)
4967 self.__max_data_length = maxdatalength
4968
4969 def set_output_file(self, output_file):
4970 # set output directory
4971 self.add_var_opt("output-file", output_file)
4972 self.__output_file = output_file
4973
4974 def set_seg_list(self, seg_list):
4975 # set segment list to be used
4976 self.add_var_opt("seg-file", seg_list)
4977 self.__seg_list = seg_list
4978
4979 def set_ifo(self, ifo):
4980 # set detector
4981 self.add_var_opt("ifo", ifo)
4982 self.__ifo = ifo
4983
4984 def set_param_file(self, param_file):
4985 # set pulsar parameter file
4986 self.add_var_opt("param-file", param_file)
4987 self.__param_file = param_file
4988
4989 def set_freq_factor(self, freq_factor):
4990 # set the factor by which to muliply the pulsar spin frequency (normally 2.0)
4991 self.add_var_opt("freq-factor", freq_factor)
4992 self.__freq_factor = freq_factor
4993
4994 def set_param_file_update(self, param_file_update):
4995 # set file containing updated pulsar parameters
4996 self.add_var_opt("param-file-update", param_file_update)
4997
4998 def set_manual_epoch(self, manual_epoch):
4999 # set manual pulsar epoch
5000 self.add_var_opt("manual-epoch", manual_epoch)
5001 self.__manual_epoch = manual_epoch
5002
5003 def set_ephem_earth_file(self, ephem_earth_file):
5004 # set the file containing the earth's ephemeris
5005 self.add_var_opt("ephem-earth-file", ephem_earth_file)
5006
5007 def set_ephem_sun_file(self, ephem_sun_file):
5008 # set the file containing the sun's ephemeris
5009 self.add_var_opt("ephem-sun-file", ephem_sun_file)
5010
5011 def set_ephem_time_file(self, ephem_time_file):
5012 # set the file containing the Einstein delay correction ephemeris
5013 self.add_var_opt("ephem-time-file", ephem_time_file)
5014
5015 def set_pulsar(self, pulsar):
5016 # set pulsar name
5017 self.add_var_opt("pulsar", pulsar)
5018 self.__pulsar = pulsar
5019
5020 def set_het_flag(self, het_flag):
5021 # set heterodyne flag
5022 self.add_var_opt("heterodyne-flag", het_flag)
5023 self.__het_flag = het_flag
5024
5025 def set_filter_knee(self, filter_knee):
5026 # set filter knee frequency
5027 self.add_var_opt("filter-knee", filter_knee)
5028 self.__filter_knee = filter_knee
5029
5030 def set_channel(self, channel):
5031 # set channel containing data from frames
5032 self.add_var_opt("channel", channel)
5033 self.__channel = channel
5034
5035 def set_sample_rate(self, sample_rate):
5036 # set sample rate of input data
5037 self.add_var_opt("sample-rate", sample_rate)
5038 self.__sample_rate = sample_rate
5039
5040 def set_resample_rate(self, resample_rate):
5041 # set resample rate for output data
5042 self.add_var_opt("resample-rate", resample_rate)
5043 self.__resample_rate = resample_rate
5044
5045 def set_stddev_thresh(self, stddev_thresh):
5046 # set standard deviation threshold at which to remove outliers
5047 self.add_var_opt("stddev-thresh", stddev_thresh)
5048
5049 def set_calibrate(self):
5050 # set calibration flag
5051 self.add_var_opt("calibrate", "") # no variable required
5052
5053 def set_verbose(self):
5054 # set verbose flag
5055 self.add_var_opt("verbose", "") # no variable required
5056
5057 def set_bininput(self):
5058 # set binary input file flag
5059 self.add_var_opt("binary-input", "") # no variable required
5060
5061 def set_binoutput(self):
5062 # set binary output file flag
5063 self.add_var_opt("binary-output", "") # no variable required
5064
5066 # set gzipped output file flag
5067 self.add_var_opt("gzip-output", "") # no variable required
5068 self.__gzip = True
5069
5070 def set_response_function(self, response_function):
5071 # set reponse function file
5072 self.add_var_opt("response-file", response_function)
5073
5074 def set_coefficient_file(self, coefficient_file):
5075 # set the file containing the calibration coefficients (e.g alpha and gammas)
5076 self.add_var_opt("coefficient-file", coefficient_file)
5077
5078 def set_sensing_function(self, sensing_function):
5079 # set file containing the sensing function for calibration
5080 self.add_var_opt("sensing-function", sensing_function)
5081
5082 def set_open_loop_gain(self, open_loop_gain):
5083 # set file containing the open loop gain for calibration
5084 self.add_var_opt("open-loop-gain", open_loop_gain)
5085
5086 def set_scale_fac(self, scale_fac):
5087 # set scale factor for calibrated data
5088 self.add_var_opt("scale-factor", scale_fac)
5089
5090 def set_high_pass(self, high_pass):
5091 # set high-pass frequency for calibrated data
5092 self.add_var_opt("high-pass-freq", high_pass)
5093
5094
5095class splinterJob(pipeline.CondorDAGJob, pipeline.AnalysisJob):
5096 """
5097 A lalpulsar_SplInter job to process SFT data.
5098 """
5099
5101 self,
5102 execu,
5103 univ="vanilla",
5104 accgroup=None,
5105 accuser=None,
5106 logdir=None,
5107 rundir=None,
5108 requestmemory=None,
5109 ):
5110 self.__executable = execu
5111 self.__universe = univ
5112 pipeline.CondorDAGJob.__init__(self, self.__universe, self.__executable)
5113 pipeline.AnalysisJob.__init__(self, None)
5114
5115 if accgroup != None:
5116 self.add_condor_cmd("accounting_group", accgroup)
5117 if accuser != None:
5118 self.add_condor_cmd("accounting_group_user", accuser)
5119
5120 self.add_condor_cmd("getenv", "True")
5121
5122 if requestmemory is not None:
5123 if isinstance(requestmemory, int):
5124 self.add_condor_cmd("request_memory", requestmemory)
5125
5126 # set log files for job
5127 if logdir != None:
5128 self.set_stdout_file(os.path.join(logdir, "splinter-$(cluster).out"))
5129 self.set_stderr_file(os.path.join(logdir, "splinter-$(cluster).err"))
5130 else:
5131 self.set_stdout_file("splinter-$(cluster).out")
5132 self.set_stderr_file("splinter-$(cluster).err")
5133
5134 if rundir != None:
5135 self.set_sub_file(os.path.join(rundir, "splinter.sub"))
5136 else:
5137 self.set_sub_file("splinter.sub")
5138
5139
5140class splinterNode(pipeline.CondorDAGNode, pipeline.AnalysisNode):
5141 """
5142 A splinterNode runs an instance of lalpulsar_Splinter in a condor DAG.
5143 """
5144
5145 def __init__(self, job):
5146 """
5147 job = A CondorDAGJob that can run an instance of lalpulsar_SplInter
5148 """
5149 pipeline.CondorDAGNode.__init__(self, job)
5150 pipeline.AnalysisNode.__init__(self)
5151
5152 # initilise job variables
5153 self.__ifo = None
5154 self.__param_file = None
5155 self.__param_dir = None
5156 self.__start_freq = None
5157 self.__end_freq = None
5158 self.__freq_factor = None
5159 self.__data_file = None
5160 self.__seg_list = None
5161 self.__sft_cache = None
5162 self.__sft_loc = None
5163 self.__sft_lalcache = None
5164 self.__output_dir = None
5165 self.__stddev_thresh = None
5166 self.__bandwidth = None
5167 self.__min_seg_length = None
5168 self.__max_seg_length = None
5169 self.__starttime = None
5170 self.__endtime = None
5171 self.__ephem_dir = None
5172 self.__gzip = False
5173
5174 def set_start_freq(self, f):
5175 # set the start frequency of the SFTs
5176 self.add_var_opt("start-freq", f)
5177 self.__start_freq = f
5178
5179 def set_end_freq(self, f):
5180 # set the end frequency of the SFTs
5181 self.add_var_opt("end-freq", f)
5182
5183 def set_sft_cache(self, f):
5184 # set file containing list of SFTs
5185 self.add_var_opt("sft-cache", f)
5186 self.__sft_cache = f
5187
5188 def set_sft_lalcache(self, f):
5189 # set file containing list of SFTs in LALCache format
5190 self.add_var_opt("sft-lalcache", f)
5191 self.__sft_lalcache = f
5192
5193 def set_sft_loc(self, f):
5194 # set directory of files containing list of SFTs
5195 self.add_var_opt("sft-loc", f)
5196 self.__sft_loc = f
5197
5198 def set_output_dir(self, f):
5199 # set output directory
5200 self.add_var_opt("output-dir", f)
5201 self.__output_dir = f
5202
5203 def set_seg_list(self, f):
5204 # set segment list to be used
5205 self.add_var_opt("seg-file", f)
5206 self.__seg_list = f
5207
5208 def set_ifo(self, ifo):
5209 # set detector
5210 self.add_var_opt("ifo", ifo)
5211 self.__ifo = ifo
5212
5213 def set_param_file(self, f):
5214 # set pulsar parameter file
5215 self.add_var_opt("param-file", f)
5216 self.__param_file = f
5217
5218 def set_param_dir(self, f):
5219 # set pulsar parameter directory
5220 self.add_var_opt("param-dir", f)
5221 self.__param_dir = f
5222
5223 def set_freq_factor(self, f):
5224 # set the factor by which to muliply the pulsar spin frequency (normally 2.0)
5225 self.add_var_opt("freq-factor", f)
5226 self.__freq_factor = f
5227
5228 def set_bandwidth(self, f):
5229 # set the bandwidth to use in the interpolation
5230 self.add_var_opt("bandwidth", f)
5231 self.__bandwidth = f
5232
5234 # set the minimum science segment length to use
5235 self.add_var_opt("min-seg-length", f)
5236 self.__min_seg_length = f
5237
5239 # set the maximum length of a science segment to use
5240 self.add_var_opt("max-seg-length", f)
5241 self.__max_seg_length = f
5242
5243 def set_ephem_dir(self, f):
5244 # set the directory containing the solar system ephemeris files
5245 self.add_var_opt("ephem-dir", f)
5246 self.__ephem_dir = f
5247
5248 def set_stddev_thresh(self, f):
5249 # set standard deviation threshold at which to remove outliers
5250 self.add_var_opt("stddevthresh", f)
5251 self.__stddev_thresh = f
5252
5254 # set gzipped output file flag
5255 self.add_var_opt("gzip", "") # no variable required
5256 self.__gzip = True
5257
5258 def set_starttime(self, f):
5259 # set the start time of data to use
5260 self.add_var_opt("starttime", f)
5261 self.__starttime = f
5262
5263 def set_endtime(self, f):
5264 # set the end time of data to use
5265 self.add_var_opt("endtime", f)
5266 self.__endtime = f
5267
5268
5269"""
5270 Job for concatenating processed (heterodyned or spectrally interpolated) files
5271"""
5272
5273
5274class concatJob(pipeline.CondorDAGJob, pipeline.AnalysisJob):
5275 """
5276 A concatenation job (using "cat" and output to stdout - a new job is needed for each pulsar)
5277 """
5278
5280 self, subfile=None, output=None, accgroup=None, accuser=None, logdir=None
5281 ):
5282 self.__executable = "/bin/cat" # use cat
5283 self.__universe = "local" # use local as "cat" should not be compute intensive
5284
5285 pipeline.CondorDAGJob.__init__(self, self.__universe, self.__executable)
5286 pipeline.AnalysisJob.__init__(self, None)
5287
5288 if accgroup != None:
5289 self.add_condor_cmd("accounting_group", accgroup)
5290 if accuser != None:
5291 self.add_condor_cmd("accounting_group_user", accuser)
5292
5293 if subfile == None:
5294 print(
5295 "Error... Condor sub file required for concatenation job",
5296 file=sys.stderr,
5297 )
5298 sys.exit(1)
5299 else:
5300 self.set_sub_file(subfile)
5301
5302 if output == None:
5303 print(
5304 "Error... output file required for concatenation job", file=sys.stderr
5305 )
5306 sys.exit(1)
5307 else:
5308 self.set_stdout_file(output)
5309
5310 if logdir != None:
5311 self.set_stderr_file(os.path.join(logdir, "concat-$(cluster).err"))
5312 else:
5313 self.set_stderr_file("concat-$(cluster).err")
5314
5315 self.add_arg("$(macrocatfiles)") # macro for input files to be concatenated
5316
5317
5318class concatNode(pipeline.CondorDAGNode, pipeline.AnalysisNode):
5319 """
5320 A node for a concatJob
5321 """
5322
5323 def __init__(self, job):
5324 """
5325 job = A CondorDAGJob that can run an instance of parameter estimation code.
5326 """
5327 pipeline.CondorDAGNode.__init__(self, job)
5328 pipeline.AnalysisNode.__init__(self)
5329
5330 self.__files = None
5331
5332 def set_files(self, files):
5333 # set the files to be concatenated ("files" is a list of files to be concatenated)
5334 self.add_macro("macrocatfiles", " ".join(files))
5335 self.__files = files
5336
5337
5338"""
5339 Job for removing files
5340"""
5341
5342
5343class removeJob(pipeline.CondorDAGJob, pipeline.AnalysisJob):
5344 """
5345 A remove job (using "rm" to remove files)
5346 """
5347
5348 def __init__(self, accgroup=None, accuser=None, logdir=None, rundir=None):
5349 self.__executable = "/bin/rm" # use "rm"
5350 self.__universe = (
5351 "local" # rm should not be compute intensive, so use local universe
5352 )
5353
5354 pipeline.CondorDAGJob.__init__(self, self.__universe, self.__executable)
5355 pipeline.AnalysisJob.__init__(self, None)
5356
5357 if accgroup != None:
5358 self.add_condor_cmd("accounting_group", accgroup)
5359 if accuser != None:
5360 self.add_condor_cmd("accounting_group_user", accuser)
5361
5362 # set log files for job
5363 if logdir != None:
5364 self.set_stdout_file(os.path.join(logdir, "rm-$(cluster).out"))
5365 self.set_stderr_file(os.path.join(logdir, "rm-$(cluster).err"))
5366 else:
5367 self.set_stdout_file("rm-$(cluster).out")
5368 self.set_stderr_file("rm-$(cluster).err")
5369
5370 self.add_arg(
5371 "-f $(macrormfiles)"
5372 ) # macro for input files to be removed (use "-f" to force removal)
5373
5374 if rundir != None:
5375 self.set_sub_file(os.path.join(rundir, "rm.sub"))
5376 else:
5377 self.set_sub_file("rm.sub")
5378
5379
5380class removeNode(pipeline.CondorDAGNode, pipeline.AnalysisNode):
5381 """
5382 An instance of a removeJob in a condor DAG.
5383 """
5384
5385 def __init__(self, job):
5386 """
5387 job = A CondorDAGJob that can run an instance of rm.
5388 """
5389 pipeline.CondorDAGNode.__init__(self, job)
5390 pipeline.AnalysisNode.__init__(self)
5391
5392 self.__files = None
5393 self.set_retry(1) # retry the node once
5394
5395 def set_files(self, files):
5396 # set file(s) to be removed, where "files" is a list containing all files
5397 self.add_macro("macrormfiles", " ".join(files))
5398 self.__files = files
5399
5400
5401"""
5402 Job for moving files
5403"""
5404
5405
5406class moveJob(pipeline.CondorDAGJob, pipeline.AnalysisJob):
5407 """
5408 A move job (using "mv" to move files)
5409 """
5410
5411 def __init__(self, accgroup=None, accuser=None, logdir=None, rundir=None):
5412 self.__executable = "/bin/mv" # use "mv"
5413 self.__universe = (
5414 "local" # mv should not be compute intensive, so use local universe
5415 )
5416
5417 pipeline.CondorDAGJob.__init__(self, self.__universe, self.__executable)
5418 pipeline.AnalysisJob.__init__(self, None)
5419
5420 if accgroup != None:
5421 self.add_condor_cmd("accounting_group", accgroup)
5422 if accuser != None:
5423 self.add_condor_cmd("accounting_group_user", accuser)
5424
5425 # set log files for job
5426 if logdir != None:
5427 self.set_stdout_file(os.path.join(logdir, "mv-$(cluster).out"))
5428 self.set_stderr_file(os.path.join(logdir, "mv-$(cluster).err"))
5429 else:
5430 self.set_stdout_file("mv-$(cluster).out")
5431 self.set_stderr_file("mv-$(cluster).err")
5432
5433 self.add_arg(
5434 "$(macrosource) $(macrodestination)"
5435 ) # macro for source and destination files
5436
5437 if rundir != None:
5438 self.set_sub_file(os.path.join(rundir, "mv.sub"))
5439 else:
5440 self.set_sub_file("mv.sub")
5441
5442
5443class moveNode(pipeline.CondorDAGNode, pipeline.AnalysisNode):
5444 """
5445 An instance of a moveJob in a condor DAG.
5446 """
5447
5448 def __init__(self, job):
5449 """
5450 job = A CondorDAGJob that can run an instance of mv.
5451 """
5452 pipeline.CondorDAGNode.__init__(self, job)
5453 pipeline.AnalysisNode.__init__(self)
5454
5455 self.__sourcefile = None
5456 self.__destinationfile = None
5457 self.set_retry(1) # retry the node once
5458
5459 def set_source(self, sfile):
5460 # set file to be moved
5461 self.add_macro("macrosource", sfile)
5462 self.__sourcefiles = sfile
5463
5464 def set_destination(self, dfile):
5465 # set destination of file to be moved
5466 self.add_macro("macrodestination", dfile)
5467 self.__destinationfile = dfile
5468
5469
5470"""
5471 Job for copying files
5472"""
5473
5474
5475class copyJob(pipeline.CondorDAGJob, pipeline.AnalysisJob):
5476 """
5477 A copy job (using "cp" to copy files)
5478 """
5479
5480 def __init__(self, accgroup=None, accuser=None, logdir=None, rundir=None):
5481 self.__executable = "/bin/cp" # use "cp"
5482 self.__universe = (
5483 "local" # cp should not be compute intensive, so use local universe
5484 )
5485
5486 pipeline.CondorDAGJob.__init__(self, self.__universe, self.__executable)
5487 pipeline.AnalysisJob.__init__(self, None)
5488
5489 if accgroup != None:
5490 self.add_condor_cmd("accounting_group", accgroup)
5491 if accuser != None:
5492 self.add_condor_cmd("accounting_group_user", accuser)
5493
5494 # set log files for job
5495 if logdir != None:
5496 self.set_stdout_file(os.path.join(logdir, "cp-$(cluster).out"))
5497 self.set_stderr_file(os.path.join(logdir, "cp-$(cluster).err"))
5498 else:
5499 self.set_stdout_file("cp-$(cluster).out")
5500 self.set_stderr_file("cp-$(cluster).err")
5501
5502 self.add_arg(
5503 "$(macrosource) $(macrodestination)"
5504 ) # macro for source and destination files
5505
5506 if rundir != None:
5507 self.set_sub_file(os.path.join(rundir, "cp.sub"))
5508 else:
5509 self.set_sub_file("cp.sub")
5510
5511
5512class copyNode(pipeline.CondorDAGNode, pipeline.AnalysisNode):
5513 """
5514 An instance of a copyJob in a condor DAG.
5515 """
5516
5517 def __init__(self, job):
5518 """
5519 job = A CondorDAGJob that can run an instance of mv.
5520 """
5521 pipeline.CondorDAGNode.__init__(self, job)
5522 pipeline.AnalysisNode.__init__(self)
5523
5524 self.__sourcefile = None
5525 self.__destinationfile = None
5526 self.set_retry(1) # retry the node once
5527
5528 def set_source(self, sfile):
5529 # set file to be moved
5530 self.add_macro("macrosource", sfile)
5531 self.__sourcefiles = sfile
5532
5533 def set_destination(self, dfile):
5534 # set destination of file to be moved
5535 self.add_macro("macrodestination", dfile)
5536 self.__destinationfile = dfile
5537
5538
5539"""
5540 Pulsar parameter estimation pipeline utilities
5541"""
5542
5543
5544class ppeJob(pipeline.CondorDAGJob, pipeline.AnalysisJob):
5545 """
5546 A parameter estimation job
5547 """
5548
5550 self,
5551 execu,
5552 univ="vanilla",
5553 accgroup=None,
5554 accuser=None,
5555 logdir=None,
5556 rundir=None,
5557 requestmemory=None,
5558 ):
5559 self.__executable = execu
5560 self.__universe = univ
5561 pipeline.CondorDAGJob.__init__(self, self.__universe, self.__executable)
5562 pipeline.AnalysisJob.__init__(self, None)
5563
5564 if accgroup != None:
5565 self.add_condor_cmd("accounting_group", accgroup)
5566 if accuser != None:
5567 self.add_condor_cmd("accounting_group_user", accuser)
5568
5569 self.add_condor_cmd("getenv", "True")
5570
5571 if requestmemory is not None:
5572 if isinstance(requestmemory, int):
5573 self.add_condor_cmd("request_memory", requestmemory)
5574
5575 # set log files for job
5576 if logdir != None:
5577 self.set_stdout_file(os.path.join(logdir, "ppe$(logname)-$(cluster).out"))
5578 self.set_stderr_file(os.path.join(logdir, "ppe$(logname)-$(cluster).err"))
5579 else:
5580 self.set_stdout_file("ppe$(logname)-$(cluster).out")
5581 self.set_stderr_file("ppe$(logname)-$(cluster).err")
5582
5583 if rundir != None:
5584 self.set_sub_file(os.path.join(rundir, "ppe.sub"))
5585 else:
5586 self.set_sub_file("ppe.sub")
5587
5588 self.add_arg(
5589 "$(macroargs)"
5590 ) # macro for additional command line arguments that will not alway be used
5591
5592
5593class ppeNode(pipeline.CondorDAGNode, pipeline.AnalysisNode):
5594 """
5595 A pes runs an instance of the parameter estimation code in a condor DAG.
5596 """
5597
5598 def __init__(self, job, psrname=""):
5599 """
5600 job = A CondorDAGJob that can run an instance of parameter estimation code.
5601 """
5602 pipeline.CondorDAGNode.__init__(self, job)
5603 pipeline.AnalysisNode.__init__(self)
5604
5605 # initilise job variables
5606 self.__detectors = None
5607 self.__par_file = None
5608 self.__cor_file = None
5609 self.__input_files = None
5610 self.__outfile = None
5611 self.__chunk_min = None
5612 self.__chunk_max = None
5613 self.__psi_bins = None
5614 self.__time_bins = None
5615 self.__prior_file = None
5616 self.__ephem_earth = None
5617 self.__ephem_sun = None
5618 self.__ephem_time = None
5619 self.__harmonics = None
5620 self.__biaxial = False
5621 self.__gaussian_like = False
5622 self.__randomise = None
5623 self.__starttime = None
5624 self.__endtime = None
5625 self.__truncate_time = None
5626 self.__truncate_samples = None
5627 self.__truncate_fraction = None
5628 self.__veto_threshold = None
5629
5630 self.__Nlive = None
5631 self.__Nmcmc = None
5632 self.__Nmcmcinitial = None
5633 self.__Nruns = None
5634 self.__tolerance = None
5635 self.__randomseed = None
5636
5637 self.__use_roq = False
5638 self.__roq_ntraining = None
5639 self.__roq_tolerance = None
5640 self.__roq_uniform = False
5641 self.__roq_output_weights = None
5642 self.__roq_input_weights = None
5643
5644 self.__temperature = None
5645 self.__ensmeble_walk = None
5646 self.__ensemble_stretch = None
5647 self.__diffev = None
5648
5649 self.__inject_file = None
5650 self.__inject_output = None
5651 self.__fake_data = None
5652 self.__fake_psd = None
5653 self.__fake_starts = None
5654 self.__fake_lengths = None
5655 self.__fake_dt = None
5656 self.__scale_snr = None
5657
5658 self.__sample_files = None
5659 self.__sample_nlives = None
5660 self.__prior_cell = None
5661
5662 # legacy inputs
5663 self.__oldChunks = None
5664 self.__sourceModel = False
5665
5666 self.__verbose = False
5667
5668 # set macroargs to be empty
5669 self.add_macro("macroargs", "")
5670
5671 # set unique log name (set to pulsar name)
5672 if len(psrname) > 0:
5673 self.add_macro("logname", "-" + psrname)
5674 else:
5675 self.add_macro("logname", "")
5676
5677 def set_detectors(self, detectors):
5678 # set detectors
5679 self.add_var_opt("detectors", detectors)
5680 self.__detectors = detectors
5681
5682 # at detector names to logname
5683 if "logname" in self.get_opts():
5684 curmacroval = self.get_opts()["logname"]
5685 else:
5686 curmacroval = ""
5687 curmacroval = curmacroval + "-" + detectors.replace(",", "")
5688 self.add_macro("logname", curmacroval)
5689
5690 def set_verbose(self):
5691 # set to run code in verbose mode
5692 self.add_var_opt("verbose", "")
5693 self.__verbose = True
5694
5695 def set_par_file(self, parfile):
5696 # set pulsar parameter file
5697 self.add_var_opt("par-file", parfile)
5698 self.__par_file = parfile
5699
5700 def set_cor_file(self, corfile):
5701 # set pulsar parameter correlation matrix file
5702
5703 # add this into the generic 'macroargs' macro as it is not a value that is always required
5704 if "macroargs" in self.get_opts():
5705 curmacroval = self.get_opts()["macroargs"]
5706 else:
5707 curmacroval = ""
5708 curmacroval = curmacroval + " --cor-file " + corfile
5709 self.add_macro("macroargs", curmacroval)
5710 # self.add_var_opt('cor-file',corfile)
5711 self.__cor_file = corfile
5712
5713 def set_input_files(self, inputfiles):
5714 # set data files for analysing
5715 self.add_var_opt("input-files", inputfiles)
5716 self.__input_files = inputfiles
5717
5718 def set_outfile(self, of):
5719 # set the output file
5720 self.add_var_opt("outfile", of)
5721 self.__outfile = of
5722
5723 def set_chunk_min(self, cmin):
5724 # set the minimum chunk length
5725 self.add_var_opt("chunk-min", cmin)
5726 self.__chunk_min = cmin
5727
5728 def set_chunk_max(self, cmax):
5729 # set the maximum chunk length
5730 self.add_var_opt("chunk-max", cmax)
5731 self.__chunk_max = cmax
5732
5733 def set_start_time(self, starttime):
5734 # set the start time of data to use
5735 self.add_var_opt("start-time", starttime)
5736 self.__starttime = starttime
5737
5738 def set_end_time(self, endtime):
5739 # set the start time of data to use
5740 self.add_var_opt("end-time", endtime)
5741 self.__endtime = endtime
5742 self.__truncate_time = endtime
5743
5744 def set_truncate_time(self, trunc):
5745 # set the truncation time to use (same as end time)
5746 if self.__endtime is None:
5747 self.add_var_opt("truncate-time", trunc)
5748 self.__truncate_time = trunc
5749 self.__endtime = trunc
5750
5751 def set_truncate_samples(self, trunc):
5752 # set the truncation based on number of samples
5753 self.add_var_opt("truncate-samples", trunc)
5754 self.__truncate_samples = trunc
5755
5756 def set_truncate_fraction(self, trunc):
5757 # set truncation based on fraction of the data
5758 self.add_var_opt("truncate-fraction", trunc)
5759 self.__truncate_fraction = trunc
5760
5761 def set_veto_threshold(self, veto):
5762 # set value above which to veto data samples
5763 self.add_var_opt("veto-threshold", veto)
5764 self.__veto_threshold = veto
5765
5766 def set_psi_bins(self, pb):
5767 # set the number of bins in psi for the psi vs time lookup table
5768 self.add_var_opt("psi-bins", pb)
5769 self.__psi_bins = pb
5770
5771 def set_time_bins(self, tb):
5772 # set the number of bins in time for the psi vs time lookup table
5773 self.add_var_opt("time-bins", tb)
5774 self.__time_bins = tb
5775
5776 def set_prior_file(self, pf):
5777 # set the prior ranges file
5778 self.add_var_opt("prior-file", pf)
5779 self.__prior_file = pf
5780
5781 def set_ephem_earth(self, ee):
5782 # set earth ephemeris file
5783 self.add_var_opt("ephem-earth", ee)
5784 self.__ephem_earth = ee
5785
5786 def set_ephem_sun(self, es):
5787 # set sun ephemeris file
5788 self.add_var_opt("ephem-sun", es)
5789 self.__ephem_sun = es
5790
5791 def set_ephem_time(self, et):
5792 # set time correction ephemeris file
5793 self.add_var_opt("ephem-timecorr", et)
5794 self.__ephem_time = et
5795
5796 def set_harmonics(self, h):
5797 # set model frequency harmonics
5798 self.add_var_opt("harmonics", h)
5799 self.__harmonics = h
5800
5801 def set_Nlive(self, nl):
5802 # set number of live points
5803 self.add_var_opt("Nlive", nl)
5804 self.__Nlive = nl
5805
5806 def set_Nmcmc(self, nm):
5807 # set number of MCMC iterations
5808 self.add_var_opt("Nmcmc", nm)
5809 self.__Nmcmc = nm
5810
5811 def set_Nmcmcinitial(self, nm):
5812 # set number of MCMC iterations
5813 self.add_var_opt("Nmcmcinitial", nm)
5814 self.__Nmcmcinitial = nm
5815
5816 def set_Nruns(self, nr):
5817 # set number of internal nested sample runs
5818 self.add_var_opt("Nruns", nr)
5819 self.__Nruns = nr
5820
5821 def set_tolerance(self, tol):
5822 # set tolerance criterion for finishing nested sampling
5823 self.add_var_opt("tolerance", tol)
5824 self.__tolerance = tol
5825
5826 def set_randomseed(self, rs):
5827 # set random number generator seed
5828 self.add_var_opt("randomseed", rs)
5829 self.__randomseed = rs
5830
5832 # set the fraction of time to use ensemble stretch moves as proposal
5833 self.add_var_opt("ensembleStretch", f)
5834 self.__ensemble_stretch = f
5835
5836 def set_ensemble_walk(self, f):
5837 # set the fraction of time to use ensemble walk moves as proposal
5838 self.add_var_opt("ensembleWalk", f)
5839 self.__ensemble_walk = f
5840
5841 def set_temperature(self, temp):
5842 # set temperature scale for covariance proposal
5843 self.add_var_opt("temperature", temp)
5844 self.__temperature = temp
5845
5846 def set_diffev(self, de):
5847 # set fraction of time to use differential evolution as proposal
5848 self.add_var_opt("diffev", de)
5849 self.__diffev = de
5850
5851 def set_inject_file(self, ifil):
5852 # set a pulsar parameter file from which to make an injection
5853 self.add_var_opt("inject-file", ifil)
5854 self.__inject_file = ifil
5855
5856 def set_inject_output(self, iout):
5857 # set filename in which to output the injected signal
5858 self.add_var_opt("inject-output", iout)
5859 self.__inject_output = iout
5860
5861 def set_fake_data(self, fd):
5862 # set the detectors from which to generate fake data
5863 self.add_var_opt("fake-data", fd)
5864 self.__fake_data = fd
5865
5866 def set_fake_psd(self, fp):
5867 # set the PSDs of the fake data
5868 self.add_var_opt("fake-psd", fp)
5869 self.__fake_psd = fp
5870
5871 def set_fake_starts(self, fs):
5872 # set the start times of the fake data
5873 self.add_var_opt("fake-starts", fs)
5874 self.__fake_starts = fs
5875
5876 def set_fake_lengths(self, fl):
5877 # set the lengths of the fake data
5878 self.add_var_opt("fake-lengths", fl)
5879 self.__fake_lengths = fl
5880
5881 def set_fake_dt(self, fdt):
5882 # set the sample interval of the fake data
5883 self.add_var_opt("fake-dt", fdt)
5884 self.__fake_dt = fdt
5885
5886 def set_scale_snr(self, ssnr):
5887 # set the SNR of the injected signal
5888 self.add_var_opt("scale-snr", ssnr)
5889 self.__scale_snr = ssnr
5890
5891 def set_sample_files(self, ssf):
5892 # set the nested sample files to be used as a prior
5893 self.add_var_opt("sample-files", ssf)
5894 self.__sample_files = ssf
5895
5896 def set_sample_nlives(self, snl):
5897 # set the number of live points for the nested sample files
5898 self.add_var_opt("sample-nlives", snl)
5899 self.__sample_nlives = snl
5900
5901 def set_prior_cell(self, pc):
5902 # set the k-d tree cell size for the prior
5903 self.add_var_opt("prior-cell", pc)
5904 self.__prior_cell = pc
5905
5906 def set_OldChunks(self):
5907 # use the old data segmentation routine i.e. 30 min segments
5908 self.add_var_opt("oldChunks", "")
5909 self.__oldChunks = True
5910
5912 # use the physical parameter model from Jones
5913
5914 # add this into the generic 'macroargs' macro as it is not a value that is always required
5915 if "macroargs" in self.get_opts():
5916 curmacroval = self.get_opts()["macroargs"]
5917 else:
5918 curmacroval = ""
5919 curmacroval = curmacroval + " --source-model"
5920 self.add_macro("macroargs", curmacroval)
5921 self.__sourceModel = True
5922
5923 def set_biaxial(self):
5924 # the model is a biaxial star using the amplitude/phase waveform parameterisation
5925
5926 # add this into the generic 'macroargs' macro as it is not a value that is always required
5927 if "macroargs" in self.get_opts():
5928 curmacroval = self.get_opts()["macroargs"]
5929 else:
5930 curmacroval = ""
5931 curmacroval = curmacroval + " --biaxial"
5932 self.add_macro("macroargs", curmacroval)
5933 self.__biaxial = True
5934
5936 # use a Gaussian likelihood rather than the Students-t likelihood
5937 self.add_var_opt("gaussian-like", "")
5938 self.__gaussian_like = True
5939
5940 def set_randomise(self, f):
5941 # set flag to randomise the data times stamps for "background" analyses
5942
5943 # add this into the generic 'macroargs' macro as it is not a value that is always required
5944 if "macroargs" in self.get_opts():
5945 curmacroval = self.get_opts()["macroargs"]
5946 else:
5947 curmacroval = ""
5948 curmacroval = curmacroval + " --randomise " + f
5949 self.add_macro("macroargs", curmacroval)
5950 self.__randomise = f
5951
5952 def set_roq(self):
5953 # set to use Reduced Order Quadrature (ROQ)
5954
5955 # add this into the generic 'macroargs' macro as it is not a value that is always required
5956 if "macroargs" in self.get_opts():
5957 curmacroval = self.get_opts()["macroargs"]
5958 else:
5959 curmacroval = ""
5960 curmacroval = curmacroval + " --roq"
5961 self.add_macro("macroargs", curmacroval)
5962 self.__use_roq = True
5963
5964 def set_roq_ntraining(self, f):
5965 # set the number of training waveforms to use in ROQ basis generation
5966 # add this into the generic 'macroargs' macro as it is not a value that is always required
5967 if "macroargs" in self.get_opts():
5968 curmacroval = self.get_opts()["macroargs"]
5969 else:
5970 curmacroval = ""
5971 curmacroval = curmacroval + " --ntraining " + f
5972 self.add_macro("macroargs", curmacroval)
5973 self.__roq_ntraining = f
5974
5975 def set_roq_tolerance(self, f):
5976 # set the tolerance to use in ROQ basis generation
5977 # add this into the generic 'macroargs' macro as it is not a value that is always required
5978 if "macroargs" in self.get_opts():
5979 curmacroval = self.get_opts()["macroargs"]
5980 else:
5981 curmacroval = ""
5982 curmacroval = curmacroval + " --roq-tolerance " + f
5983 self.add_macro("macroargs", curmacroval)
5984 self.__roq_tolerance = f
5985
5987 # set to use uniform distributions when sprinkling (phase) parameters for ROQ training basis generation
5988 # add this into the generic 'macroargs' macro as it is not a value that is always required
5989 if "macroargs" in self.get_opts():
5990 curmacroval = self.get_opts()["macroargs"]
5991 else:
5992 curmacroval = ""
5993 curmacroval = curmacroval + " --roq-uniform"
5994 self.add_macro("macroargs", curmacroval)
5995 self.__roq_uniform = True
5996
5998 # set the location of the file containing pregenerated ROQ interpolants
5999 # add this into the generic 'macroargs' macro as it is not a value that is always required
6000 if "macroargs" in self.get_opts():
6001 curmacroval = self.get_opts()["macroargs"]
6002 else:
6003 curmacroval = ""
6004 curmacroval = curmacroval + " --input-weights " + f
6005 self.add_macro("macroargs", curmacroval)
6006 self.__roq_input_weights = f
6007
6009 # set the location of the file to output ROQ interpolants
6010 # add this into the generic 'macroargs' macro as it is not a value that is always required
6011 if "macroargs" in self.get_opts():
6012 curmacroval = self.get_opts()["macroargs"]
6013 else:
6014 curmacroval = ""
6015 curmacroval = curmacroval + " --output-weights " + f
6016 self.add_macro("macroargs", curmacroval)
6017 self.__roq_output_weights = f
6018
6019 def set_roq_chunkmax(self, f):
6020 # set the maximum chunk length for if using ROQ (just adding using the chunk-max argument)
6021 # add this into the generic 'macroargs' macro as it is not a value that is always required
6022 if "macroargs" in self.get_opts():
6023 curmacroval = self.get_opts()["macroargs"]
6024 else:
6025 curmacroval = ""
6026 curmacroval = curmacroval + " --chunk-max " + f
6027 self.add_macro("macroargs", curmacroval)
6028 self.__chunk_max = int(f)
6029
6030
6031"""
6032 Job for creating the result page for a particular source
6033"""
6034
6035
6036class resultpageJob(pipeline.CondorDAGJob, pipeline.AnalysisJob):
6038 self, execu, univ="local", accgroup=None, accuser=None, logdir=None, rundir=None
6039 ):
6040 self.__executable = execu
6041 self.__universe = univ
6042 pipeline.CondorDAGJob.__init__(self, self.__universe, self.__executable)
6043 pipeline.AnalysisJob.__init__(self, None)
6044
6045 if accgroup != None:
6046 self.add_condor_cmd("accounting_group", accgroup)
6047 if accuser != None:
6048 self.add_condor_cmd("accounting_group_user", accuser)
6049
6050 self.add_condor_cmd("getenv", "True")
6051
6052 # set log files for job
6053 if logdir != None:
6054 self.set_stdout_file(os.path.join(logdir, "resultpage-$(cluster).out"))
6055 self.set_stderr_file(os.path.join(logdir, "resultpage-$(cluster).err"))
6056 else:
6057 self.set_stdout_file("resultpage-$(cluster).out")
6058 self.set_stderr_file("resultpage-$(cluster).err")
6059
6060 if rundir != None:
6061 self.set_sub_file(os.path.join(rundir, "resultpage.sub"))
6062 else:
6063 self.set_sub_file("resultpage.sub")
6064
6065 self.add_arg("$(macroconfigfile)") # macro for input configuration file
6066
6067
6068class resultpageNode(pipeline.CondorDAGNode, pipeline.AnalysisNode):
6069 """
6070 A resultpageNode runs an instance of the result page script in a condor DAG.
6071 """
6072
6073 def __init__(self, job):
6074 """
6075 job = A CondorDAGJob that can run an instance of lalpulsar_knope_result_page
6076 """
6077 pipeline.CondorDAGNode.__init__(self, job)
6078 pipeline.AnalysisNode.__init__(self)
6079
6080 self.__configfile = None
6081
6082 def set_config(self, configfile):
6083 self.add_macro("macroconfigfile", configfile)
6084 self.__configfile = configfile
6085
6086
6087"""
6088 Job for creating the collated results page for all sources
6089"""
6090
6091
6092class collateJob(pipeline.CondorDAGJob, pipeline.AnalysisJob):
6094 self, execu, univ="local", accgroup=None, accuser=None, logdir=None, rundir=None
6095 ):
6096 self.__executable = execu
6097 self.__universe = univ
6098 pipeline.CondorDAGJob.__init__(self, self.__universe, self.__executable)
6099 pipeline.AnalysisJob.__init__(self, None)
6100
6101 if accgroup != None:
6102 self.add_condor_cmd("accounting_group", accgroup)
6103 if accuser != None:
6104 self.add_condor_cmd("accounting_group_user", accuser)
6105
6106 self.add_condor_cmd("getenv", "True")
6107
6108 # set log files for job
6109 if logdir != None:
6110 self.set_stdout_file(os.path.join(logdir, "collate-$(cluster).out"))
6111 self.set_stderr_file(os.path.join(logdir, "collate-$(cluster).err"))
6112 else:
6113 self.set_stdout_file("collate-$(cluster).out")
6114 self.set_stderr_file("collate-$(cluster).err")
6115
6116 if rundir != None:
6117 self.set_sub_file(os.path.join(rundir, "collate.sub"))
6118 else:
6119 self.set_sub_file("collate.sub")
6120
6121 self.add_arg("$(macroconfigfile)") # macro for input configuration file
6122
6123
6124class collateNode(pipeline.CondorDAGNode, pipeline.AnalysisNode):
6125 """
6126 A collateNode runs an instance of the result page collation script in a condor DAG.
6127 """
6128
6129 def __init__(self, job):
6130 """
6131 job = A CondorDAGJob that can run an instance of lalpulsar_knope_collate_results
6132 """
6133 pipeline.CondorDAGNode.__init__(self, job)
6134 pipeline.AnalysisNode.__init__(self)
6135
6136 self.__configfile = None
6137
6138 def set_config(self, configfile):
6139 self.add_macro("macroconfigfile", configfile)
6140 self.__configfile = configfile
6141
6142
6143class nest2posJob(pipeline.CondorDAGJob, pipeline.AnalysisJob):
6144 """
6145 A merge nested sampling files job to use lalinference_nest2pos
6146 """
6147
6149 self, execu, univ="local", accgroup=None, accuser=None, logdir=None, rundir=None
6150 ):
6151 self.__executable = execu
6152 self.__universe = univ
6153 pipeline.CondorDAGJob.__init__(self, self.__universe, self.__executable)
6154 pipeline.AnalysisJob.__init__(self, None)
6155
6156 if accgroup != None:
6157 self.add_condor_cmd("accounting_group", accgroup)
6158 if accuser != None:
6159 self.add_condor_cmd("accounting_group_user", accuser)
6160
6161 self.add_condor_cmd("getenv", "True")
6162
6163 # set log files for job
6164 if logdir != None:
6165 self.set_stdout_file(os.path.join(logdir, "n2p-$(cluster).out"))
6166 self.set_stderr_file(os.path.join(logdir, "n2p-$(cluster).err"))
6167 else:
6168 self.set_stdout_file("n2p-$(cluster).out")
6169 self.set_stderr_file("n2p-$(cluster).err")
6170
6171 self.add_arg("--non-strict-versions") # force use of --non-strict-versions flag
6172 self.add_arg("$(macroinputfiles)") # macro for input nested sample files
6173
6174 if rundir != None:
6175 self.set_sub_file(os.path.join(rundir, "n2p.sub"))
6176 else:
6177 self.set_sub_file("n2p.sub")
6178
6179
6180class nest2posNode(pipeline.CondorDAGNode, pipeline.AnalysisNode):
6181 """
6182 A nest2posNode runs a instance of the lalinference_nest2pos to combine individual nested
6183 sample files in a condor DAG.
6184 """
6185
6186 def __init__(self, job):
6187 """
6188 job = A CondorDAGJob that can run an of the nested sample combination script
6189 """
6190 pipeline.CondorDAGNode.__init__(self, job)
6191 pipeline.AnalysisNode.__init__(self)
6192 self.set_retry(1) # retry the node once
6193
6194 # initilise job variables
6195 self.__nest_files = None
6196 self.__nest_live = None
6197 self.__outfile = None
6198 self.__header = None
6199 self.__npos = None
6200 self.__gzip = False
6201
6202 def set_nest_files(self, nestfiles):
6203 # set all the nested sample files
6204 self.__nest_files = nestfiles
6205
6206 fe = os.path.splitext(nestfiles[0])[-1].lower()
6207 # set header file (only if not using hdf5 output)
6208 if fe != ".hdf" and fe != ".h5":
6209 header = nestfiles[0].rstrip(".gz") + "_params.txt"
6210 self.__header = header
6211 self.add_var_opt("headers", header)
6212 self.add_macro("macroinputfiles", " ".join(nestfiles))
6213
6214 def set_nest_live(self, nestlive):
6215 # set the number of live points from each file
6216 self.add_var_opt("Nlive", nestlive)
6217 self.__nest_live = nestlive
6218
6219 def set_outfile(self, outfile):
6220 # set the output file for the posterior file
6221 self.add_var_opt("pos", outfile)
6222 self.__outfile = outfile
6223
6224 def set_numpos(self, npos):
6225 # set the number of posterior samples for the posterior file
6226 self.add_var_opt("npos", npos)
6227 self.__npos = npos
6228
6229 def set_gzip(self):
6230 self.add_var_opt("gzip", "")
6231 self.__gzip = True
6232
6233
6234# DEPRECATED
6235class createresultspageJob(pipeline.CondorDAGJob, pipeline.AnalysisJob):
6236 """
6237 A job to create an individual pulsar results page
6238 """
6239
6240 def __init__(self, execu, logpath, accgroup, accuser):
6241 self.__executable = execu
6242 self.__universe = "vanilla"
6243 pipeline.CondorDAGJob.__init__(self, self.__universe, self.__executable)
6244 pipeline.AnalysisJob.__init__(self, None)
6245
6246 self.add_condor_cmd("getenv", "True")
6247 self.add_condor_cmd("accounting_group", accgroup)
6248 self.add_condor_cmd("accounting_group_user", accuser)
6249
6250 self.set_stdout_file(logpath + "/create_results_page-$(cluster).out")
6251 self.set_stderr_file(logpath + "/create_results_page-$(cluster).err")
6252 self.set_sub_file("create_results_page.sub")
6253
6254 # additional required args
6255 self.add_arg("$(macrom)") # macro for MCMC directories
6256 self.add_arg("$(macrobk)") # macro for Bk (fine heterodyne file) directories
6257 self.add_arg("$(macroi)") # macro for IFOs
6258 self.add_arg("$(macrof)") # macro for nested sampling files
6259 self.add_arg("$(macrow)") # macro to say if a hardware injection
6260 self.add_arg("$(macros)") # macro to say if a software injection
6261
6262
6263class createresultspageNode(pipeline.CondorDAGNode, pipeline.AnalysisNode):
6264 """
6265 A createresultspage node to run as part of a condor DAG.
6266 """
6267
6268 def __init__(self, job):
6269 """
6270 job = A CondorDAGJob that can run the segment list finding script
6271 """
6272 pipeline.CondorDAGNode.__init__(self, job)
6273 pipeline.AnalysisNode.__init__(self)
6274
6275 # initilise job variables
6276 self.__outpath = None
6277 self.__domcmc = False
6278 self.__mcmcdirs = []
6279 self.__donested = False
6280 self.__nestedfiles = []
6281 self.__parfile = None
6282 self.__Bkfiles = []
6283 self.__priorfile = None
6284 self.__ifos = []
6285 self.__histbins = None
6286 self.__epsout = False
6287
6288 def set_outpath(self, val):
6289 # set the detector
6290 self.add_var_opt("o", val, short=True)
6291 self.__outpath = val
6292
6293 def set_domcmc(self):
6294 # set to say using MCMC chains as input
6295 self.add_var_opt("M", "", short=True)
6296 self.__domcmc = True
6297
6298 def set_mcmcdir(self, val):
6299 # set the MCMC file directories
6300 macroval = ""
6301 for f in val:
6302 macroval = "%s-m %s " % (macroval, f)
6303
6304 self.add_macro("macrom", macroval)
6305 self.add_macro("macrof", "") # empty macro for nested files
6306 self.__mcmcdirs = val
6307
6308 def set_donested(self):
6309 # set to say using nested sampling results as input
6310 self.add_var_opt("nested", "")
6311 self.__donested = True
6312
6313 def set_nestedfiles(self, val):
6314 # set the nested sampling files
6315 macroval = ""
6316 for f in val:
6317 macroval = "%s-f %s " % (macroval, f)
6318
6319 self.add_macro("macrof", macroval)
6320 self.add_macro("macrom", "") # empty macro for mcmc directories
6321 self.__nestedfiles = val
6322
6323 def set_parfile(self, val):
6324 # set the pulsar parameter file
6325 self.add_var_opt("p", val, short=True)
6326 self.__parfile = val
6327
6328 def set_bkfiles(self, val):
6329 # set the fine heterodyned data files
6330 macroval = ""
6331 for f in val:
6332 macroval = "%s-b %s " % (macroval, f)
6333
6334 self.add_macro("macrobk", macroval)
6335 self.__Bkfiles = val
6336
6337 def set_priordir(self, val):
6338 # set the prior file directory
6339 self.add_var_opt("r", val, short=True)
6340 self.__priordir = None
6341
6342 def set_ifos(self, val):
6343 # set the IFOs to analyse
6344 macroval = ""
6345 for f in val:
6346 macroval = "%s-i %s " % (macroval, f)
6347
6348 self.add_macro("macroi", macroval)
6349 self.__ifos = val
6350
6351 def set_histbins(self, val):
6352 # set the number of histogram bins
6353 self.add_var_opt("n", val, short=True)
6354 self.__histbins = val
6355
6356 def set_epsout(self):
6357 # set to output eps figs
6358 self.add_var_opt("e", "", short=True)
6359 self.__epsout = True
6360
6361 def set_swinj(self, isswinj):
6362 # set to say that analysing software injection
6363 if isswinj:
6364 self.add_macro("macros", "--sw-inj")
6365 else:
6366 self.add_macro("macros", "")
6367
6368 def set_hwinj(self, ishwinj):
6369 # set to say that analysing hardware injection
6370 if ishwinj:
6371 self.add_macro("macrow", "--hw-inj")
6372 else:
6373 self.add_macro("macrow", "")
6374
6375
6376class collateresultsJob(pipeline.CondorDAGJob, pipeline.AnalysisJob):
6377 """
6378 A job to collate all the individual pulsar results pages
6379 """
6380
6381 def __init__(self, execu, logpath, accgroup, accuser):
6382 self.__executable = execu
6383 self.__universe = "vanilla"
6384 pipeline.CondorDAGJob.__init__(self, self.__universe, self.__executable)
6385 pipeline.AnalysisJob.__init__(self, None)
6386
6387 self.add_condor_cmd("getenv", "True")
6388 self.add_condor_cmd("accounting_group", accgroup)
6389 self.add_condor_cmd("accounting_group_user", accuser)
6390
6391 self.set_stdout_file(logpath + "/collate_results-$(cluster).out")
6392 self.set_stderr_file(logpath + "/collate_results-$(cluster).err")
6393 self.set_sub_file("collate_results.sub")
6394
6395 # some required argument macros
6396 self.add_arg("$(macroifo)") # for IFOs
6397 # self.add_arg('$(macrou)') # for output upper limits
6398 # self.add_arg('$(macron)') # for output pulsar values
6399
6400
6401class collateresultsNode(pipeline.CondorDAGNode, pipeline.AnalysisNode):
6402 """
6403 A collateresults node to run as part of a condor DAG.
6404 """
6405
6406 def __init__(self, job):
6407 """
6408 job = A CondorDAGJob that can run the segment list finding script
6409 """
6410 pipeline.CondorDAGNode.__init__(self, job)
6411 pipeline.AnalysisNode.__init__(self)
6412
6413 # initilise job variables
6414 self.__outpath = None
6415 self.__inpath = None
6416 self.__parfile = None
6417 self.__compilelatex = False
6418 self.__sorttype = None
6419 self.__ifos = []
6420 self.__outputlims = []
6421 self.__outputvals = []
6422 self.__outputhist = False
6423 self.__outputulplot = False
6424 self.__withprior = False
6425 self.__epsout = False
6426
6427 def set_outpath(self, val):
6428 # set the detector
6429 self.add_var_opt("o", val, short=True)
6430 self.__outpath = val
6431
6432 def set_inpath(self, val):
6433 # set the input path
6434 self.add_var_opt("z", val, short=True)
6435 self.__inpath = val
6436
6437 def set_parfile(self, val):
6438 # set the pulsar parameter file directory
6439 self.add_var_opt("p", val, short=True)
6440 self.__parfile = val
6441
6443 # set to compile LaTeX results table
6444 self.add_var_opt("l", "", short=True)
6445 self.__compilelatex = True
6446
6447 def set_sorttype(self, val):
6448 # set the sorting order of the output results
6449 self.add_var_opt("s", val, short=True)
6450 self.__sorttype = val
6451
6452 def set_ifos(self, val):
6453 # set the list if IFOs to output results for
6454 macroval = ""
6455 for f in val:
6456 macroval = "%s-i %s " % (macroval, f)
6457
6458 self.add_macro("macroifo", macroval)
6459 self.__ifos = val
6460
6461 def set_outputlims(self, val):
6462 # set the upper limit results to output
6463 macroval = ""
6464 for f in val:
6465 macroval = "%s-u %s " % (macroval, f)
6466
6467 self.add_macro("macrou", macroval)
6468 self.__outputlims = val
6469
6470 def set_outputvals(self, val):
6471 # set the pulsar parameter values to output
6472 macroval = ""
6473 for f in val:
6474 macroval = "%s-n %s " % (macroval, f)
6475
6476 self.add_macro("macron", macroval)
6477 self.__outputvals = val
6478
6480 # set to output histograms of the results
6481 self.add_var_opt("k", "", short=True)
6482 self.__outputhist = True
6483
6485 # set to output a plot of the ULs
6486 self.add_var_opt("t", "", short=True)
6487 self.__outputulplot = True
6488
6489 def set_withprior(self):
6490 # set to output prior values with the hist and UL plots
6491 self.add_var_opt("w", "", short=True)
6492 self.__withprior = True
6493
6494 def set_epsout(self):
6495 # set to output plots in eps format as well as png
6496 self.add_var_opt("e", "", short=True)
6497 self.__epsout = True
def __init__(self, execu, univ="local", accgroup=None, accuser=None, logdir=None, rundir=None)
A collateNode runs an instance of the result page collation script in a condor DAG.
def set_config(self, configfile)
def __init__(self, job)
job = A CondorDAGJob that can run an instance of lalpulsar_knope_collate_results
A job to collate all the individual pulsar results pages.
def __init__(self, execu, logpath, accgroup, accuser)
A collateresults node to run as part of a condor DAG.
def __init__(self, job)
job = A CondorDAGJob that can run the segment list finding script
A concatenation job (using "cat" and output to stdout - a new job is needed for each pulsar)
def __init__(self, subfile=None, output=None, accgroup=None, accuser=None, logdir=None)
A node for a concatJob.
def __init__(self, job)
job = A CondorDAGJob that can run an instance of parameter estimation code.
A copy job (using "cp" to copy files)
def __init__(self, accgroup=None, accuser=None, logdir=None, rundir=None)
An instance of a copyJob in a condor DAG.
def __init__(self, job)
job = A CondorDAGJob that can run an instance of mv.
def set_destination(self, dfile)
A job to create an individual pulsar results page.
def __init__(self, execu, logpath, accgroup, accuser)
A createresultspage node to run as part of a condor DAG.
def __init__(self, job)
job = A CondorDAGJob that can run the segment list finding script
A lalpulsar_heterodyne job to heterodyne the data.
def __init__(self, execu, univ="vanilla", accgroup=None, accuser=None, logdir=None, rundir=None, subprefix="", requestmemory=None)
A heterodyneNode runs an instance of lalpulsar_heterodyne in a condor DAG.
def set_freq_factor(self, freq_factor)
def set_ephem_earth_file(self, ephem_earth_file)
def set_scale_fac(self, scale_fac)
def set_ephem_time_file(self, ephem_time_file)
def set_resample_rate(self, resample_rate)
def set_open_loop_gain(self, open_loop_gain)
def set_output_file(self, output_file)
def set_coefficient_file(self, coefficient_file)
def set_sample_rate(self, sample_rate)
def set_manual_epoch(self, manual_epoch)
def set_filter_knee(self, filter_knee)
def set_param_file_update(self, param_file_update)
def set_data_file(self, data_file)
def __init__(self, job)
job = A CondorDAGJob that can run an instance of lalpulsar_heterodyne
def set_response_function(self, response_function)
def set_param_file(self, param_file)
def set_sensing_function(self, sensing_function)
def set_ephem_sun_file(self, ephem_sun_file)
def set_max_data_length(self, maxdatalength)
def set_high_pass(self, high_pass)
def set_stddev_thresh(self, stddev_thresh)
def find_exec_file(self, filename)
Search through the PATH environment for the first instance of the executable file "filename".
def gmm_prior(self, prevpostfile, pardict, ncomps=20, taper=None, decaywidth=5.0)
Create an ND Gaussian Mixture Model for use as a prior.
def setup_preprocessing(self)
Setup the preprocessing analysis: data finding, segment finding and heterodyne/splinter data processi...
def fermidirac_rsigma(self, ul, mufrac=0.4, cdf=0.95)
Calculate the r and sigma parameter of the Fermi-Dirac distribution to be used.
def setup_heterodyne(self)
Setup the coarse and fine heterodyne jobs/nodes.
def get_ephemeris(self, psr)
Get the ephemeris file information based on the content of the psr file.
def find_data_segments(self, starttime, endtime, ifo, outfile)
Find data segments and output them to an acsii text segment file containing the start and end times o...
def setup_datafind(self)
Create and setup the data find jobs.
def mkdirs(self, path)
Helper function.
def setup_parameter_estimation(self)
Setup parameter estimation jobs/nodes for signal and background analyses.
def setup_results_pages(self)
Setup the results webpage creation.
Definition: knope_utils.py:759
def check_universe(self, universe)
Check Condor universe is 'local', 'vanilla' or 'standard'.
def create_prior_file(self, psr, psrdir, detectors, freqfactors, outputpath, scalefactor=25.0)
Create the prior file to use for a particular job defined by a set of detectors, or single detector,...
def setup_splinter(self)
Setup the Spectral Interpolation jobs/nodes.
def concatenate_files(self)
Create jobs to concatenate fine heterodyned/Splinter data files in cases where multiple files exist (...
def get_config_option(self, section, option, cftype=None, default=None, allownone=False)
Get a value of type cftype ('string', 'int', 'float', 'boolean', 'list', 'dict' or 'dir') from the co...
def __init__(self, cp, configfilename, pulsarlist=None)
Initialise with ConfigParser cp object and the filename of the config file.
Definition: knope_utils.py:62
def set_datafind_nodes(self)
Add data find nodes to a dictionary of nodes.
A move job (using "mv" to move files)
def __init__(self, accgroup=None, accuser=None, logdir=None, rundir=None)
An instance of a moveJob in a condor DAG.
def __init__(self, job)
job = A CondorDAGJob that can run an instance of mv.
def set_destination(self, dfile)
A merge nested sampling files job to use lalinference_nest2pos.
def __init__(self, execu, univ="local", accgroup=None, accuser=None, logdir=None, rundir=None)
A nest2posNode runs a instance of the lalinference_nest2pos to combine individual nested sample files...
def __init__(self, job)
job = A CondorDAGJob that can run an of the nested sample combination script
def set_nest_live(self, nestlive)
def set_nest_files(self, nestfiles)
A parameter estimation job.
def __init__(self, execu, univ="vanilla", accgroup=None, accuser=None, logdir=None, rundir=None, requestmemory=None)
A pes runs an instance of the parameter estimation code in a condor DAG.
def set_cor_file(self, corfile)
def set_start_time(self, starttime)
def set_end_time(self, endtime)
def set_input_files(self, inputfiles)
def set_par_file(self, parfile)
def __init__(self, job, psrname="")
job = A CondorDAGJob that can run an instance of parameter estimation code.
def set_detectors(self, detectors)
def set_temperature(self, temp)
def set_truncate_time(self, trunc)
def set_inject_output(self, iout)
def set_veto_threshold(self, veto)
def set_truncate_samples(self, trunc)
def set_truncate_fraction(self, trunc)
def set_sample_nlives(self, snl)
def set_inject_file(self, ifil)
A remove job (using "rm" to remove files)
def __init__(self, accgroup=None, accuser=None, logdir=None, rundir=None)
An instance of a removeJob in a condor DAG.
def __init__(self, job)
job = A CondorDAGJob that can run an instance of rm.
def __init__(self, execu, univ="local", accgroup=None, accuser=None, logdir=None, rundir=None)
A resultpageNode runs an instance of the result page script in a condor DAG.
def set_config(self, configfile)
def __init__(self, job)
job = A CondorDAGJob that can run an instance of lalpulsar_knope_result_page
A lalpulsar_SplInter job to process SFT data.
def __init__(self, execu, univ="vanilla", accgroup=None, accuser=None, logdir=None, rundir=None, requestmemory=None)
A splinterNode runs an instance of lalpulsar_Splinter in a condor DAG.
def __init__(self, job)
job = A CondorDAGJob that can run an instance of lalpulsar_SplInter