Loading [MathJax]/extensions/TeX/AMSsymbols.js
LALPulsar 7.1.1.1-5e288d3
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Modules Pages
lalpulsar_knope_automation_script.py
Go to the documentation of this file.
1##python
2## \file
3## \ingroup lalpulsar_bin_HeterodyneSearch
4"""
5The KNOwn Pulsar pipelinE - lalpulsar_knope_automation_script
6
7Script to be setup and launch a cron job for the automated running of the known pulsar analysis.
8
9The script requires an initial configuration file. Each time the script will be re-run by cron it will itself
10automatically update the times in the configuration file.
11
12(C) Matthew Pitkin 2015
13"""
14
15# make print statements python 3-proof
16from __future__ import print_function, division
17
18import sys
19import os
20import ast
21import calendar
22import numpy as np
23import subprocess as sp
24import socket
25import smtplib
26import stat
27
28import argparse
29from configparser import RawConfigParser
30
31from lalpulsar import git_version
32
33# try importing astropy
34try:
35 from astropy.time import Time
36except ImportError:
37 print(
38 "Could not import astropy: make sure astropy is installed (e.g. 'pip install astropy') and in the PYTHONPATH",
39 file=sys.stderr,
40 )
41 sys.exit(1)
42
43
44# try import python-crontab
45try:
46 from crontab import CronTab
47except ImportError:
48 print(
49 "Could not import python-crontab: make sure it is installed (e.g. 'pip install python-crontab') and in the PYTHONPATH",
50 file=sys.stderr,
51 )
52 sys.exit(1)
53
54
55__author__ = "Matthew Pitkin <matthew.pitkin@ligo.org>"
56__version__ = "git id %s" % git_version.id
57__date__ = git_version.date
58
59
60# number of seconds for each allowed time step value
61tsdic = {
62 "hourly": 1440.0,
63 "daily": 86400.0,
64 "weekly": 7.0 * 86400.0,
65 "monthly": 86400.0
66 * calendar.monthrange(Time.now().datetime.year, Time.now().datetime.month)[1],
67}
68
69
70# function to remove crontab job
71def remove_cron(cronid):
72 try:
73 cron = CronTab(user=True)
74 cron.remove_all(comment=cronid)
75 cron.write()
76 except:
77 print("Warning... count not remove crontab job with comment ID '%s'" % cronid)
78
79 return
80
81
82# function to send an email
83def send_email(FROM, TO, SUBJECT, MESSAGE, server, quitserver=True):
84 # set a notication email template
85 emailtemplate = "From: {0}\nTo: {1}\nSubject: {2}\n\n{3}"
86 message = emailtemplate.format(FROM, TO, SUBJECT, MESSAGE)
87
88 try:
89 server.sendmail(FROM, TO, message)
90 except:
91 print("Warning... unable to send email.")
92
93 if quitserver:
94 server.quit()
95
96
97# main function
98if __name__ == "__main__":
99 description = """This script will setup and run a cron job for an automated known pulsar pipeline.
100A configuration .ini file is required.
101"""
102
103 parser = argparse.ArgumentParser(description=description)
104 parser.add_argument("--version", action="version", version=__version__)
105 parser.add_argument("inifile", help="The configuration (.ini) file")
106
107 # parse input options
108 opts = parser.parse_args()
109
110 inifile = opts.inifile
111
112 # check that inifile contains full absolute path
113 if not os.path.isabs(inifile):
114 print(
115 "Error... must supply the full absolute path of the configuration file.",
116 file=sys.stderr,
117 )
118 sys.exit(1)
119
120 startcron = False # variable to say whether to create the crontab (if this is the first time the script is run then this will be changed to True later)
121 cronid = "knopeJob" # default ID for the crontab job
122
123 # open and parse config file
124 cp = RawConfigParser()
125 try:
126 cp.read(inifile)
127 except:
128 print(
129 "Error... cannot parse configuration file '%s'" % inifile, file=sys.stderr
130 )
131 sys.exit(1)
132
133 # if configuration file has previous_endtimes option then the cronjob must have started
134 if not cp.has_option(
135 "times", "previous_endtimes"
136 ): # make sure to start the crontab job
137 startcron = True
138
139 # open and parse the run configuration file
140 if cp.has_option("configuration", "file"):
141 runconfig = cp.get(
142 "configuration", "file"
143 ) # Get main configuration ini template for the run
144
145 if not os.path.isfile(runconfig):
146 print(
147 "Error... run configuration file '%s' does not exist!" % runconfig,
148 file=sys.stderr,
149 )
150 if not startcron:
151 remove_cron(cronid) # remove cron job
152 sys.exit(1)
153 else:
154 print("Error... must specify a run configuration '.ini' file", file=sys.stderr)
155 if startcron:
156 remove_cron(cronid)
157 sys.exit(1)
158
159 # check for cron ID
160 if cp.has_option("configuration", "cronid"):
161 cronid = cp.get("configuration", "cronid")
162
163 # check for kerberos keytab and certificate
164 keytab = None
165 if cp.has_option("kerberos", "keytab"):
166 keytab = cp.get("kerberos", "keytab")
167
168 if cp.has_option("kerberos", "certificate"):
169 certificate = cp.get("kerberos", "certificate")
170
171 if cp.has_option("kerberos", "auth_princ"):
172 authprinc = cp.get("kerberos", "auth_princ")
173 else:
174 raise RuntimeError("No kerberos authentication principle")
175 else:
176 raise RuntimeError("Problem with kerberos certificate")
177
178 cprun = RawConfigParser()
179 try:
180 cprun.read(runconfig)
181 except:
182 print("Error... could not read run configuration '.ini' file", file=sys.stderr)
183 sys.exit(1)
184
185 # get the time increment for re-running the pipeline on (can be "hourly", "daily", "weekly" or "monthly")
186 if cp.has_option("times", "steps"):
187 timestep = cp.get("times", "steps")
188
189 if timestep not in ["hourly", "daily", "weekly", "monthly"]:
190 errmsg = "Error... 'steps' value in '[times'] must be 'hourly', 'daily', 'weekly' or 'monthly'"
191 print(errmsg, file=sys.stderr)
192 sys.exit(1)
193 else:
194 errmsg = "Error... must specify a time step 'steps' value in '[times]'"
195 print(errmsg, file=sys.stderr)
196 sys.exit(1)
197
198 # try getting email address information (for notifications of e.g. job failures, re-running rescue DAGs, etc)
199 email = None
200 server = None
201 if cp.has_option("notification", "email"):
202 try:
203 email = cp.get("notification", "email")
204 except:
205 print(
206 "Warning... could not get email address from configuration file. No notifications will be sent."
207 )
208
209 if "@" not in email:
210 print(
211 "Warning... '%s' is not a valid email address. No notifications will be sent."
212 )
213 email = None
214
215 # set email server
216 if email != None:
217 try:
218 server = smtplib.SMTP("localhost")
219 except:
220 print(
221 "Warning... could not get SMTP server. No notication emails will be sent."
222 )
223 email = None
224
225 # set email sender (if this fails maybe just hardcode sender to be 'matthew.pitkin@ligo.org')
226 FROM = None
227 if email != None:
228 try:
229 HOST = socket.getfqdn()
230 USER = os.environ["USER"]
231 FROM = USER + "@" + HOST
232 except:
233 FROM = "matthew.pitkin@ligo.org"
234
235 # Get the current time
236 now = Time.now()
237 gpsnow = int(now.replicate("gps").value)
238
239 # check for DAG completion in previous analyses
240 prevdags = None
241 rescuedags = None
242 try:
243 rundir = cprun.get(
244 "analysis", "run_dir"
245 ) # get run directory where DAG will have been created
246 except:
247 rundir = (
248 os.getcwd()
249 ) # if 'run_dir' was not set then the current working directory will have been used
250 if not startcron:
251 try:
252 prevdags = ast.literal_eval(cp.get("configuration", "previous_dags"))
253 except:
254 errmsg = "Error... no previous DAG file(s) have been set."
255 print(errmsg, file=sys.stderr)
256 remove_cron(cronid) # remove cron job
257 if email != None:
258 subject = sys.argv[0] + ": Error message"
259 send_email(FROM, email, subject, errmsg, server)
260 sys.exit(1)
261
262 # check if the last dag has completed yet
263 lockfile = prevdags[-1] + ".lock"
264 if os.path.isfile(lockfile):
265 # DAG has not yet complete, so wait before trying again (by updating the crontab entry)
266 try:
267 # reset to run again later
268 if timestep in [
269 "hourly",
270 "daily",
271 ]: # if hourly or daily just wait until the next run
272 print("Previous DAG not finished. Re-running later")
273 os._exit(
274 0
275 ) # don't use sys.exit(0) as this throws an exception that is caught by "except": https://stackoverflow.com/a/173323/1862861
276 else: # add a day to the crontab job and re-run then
277 cron = CronTab(user=True)
278 for job in cron.find_comment(cronid):
279 thisjob = job # cron job
280
281 # get a detlaT for a day
282 t1 = Time("2010-01-01 00:00:00")
283 t2 = Time("2010-01-02 00:00:00")
284 dt = t2 - t1
285 newcrontime = now + dt
286 if timestep == "weekly":
287 thisjob.dow.on(
288 newcrontime.datetime.strftime("%a").upper()
289 ) # get new day of the week
290 else:
291 thisjob.day.on(
292 newcrontime.datetime.day
293 ) # get new month of the year
294 cron.write()
295 os._exit(
296 0
297 ) # don't use sys.exit(0) as this throws an exception that is caught by "except": https://stackoverflow.com/a/173323/1862861
298 except:
299 errmsg = (
300 "Error... could not reset the crontab to wait for DAG completion."
301 )
302 print(errmsg, file=sys.stderr)
303 remove_cron(cronid)
304 if email != None:
305 subject = sys.argv[0] + ": Error message"
306 send_email(FROM, email, subject, errmsg, server)
307 sys.exit(1)
308
309 # get any previous rescue DAGs - held in a dictionary and keyed to the original DAG name
310 try:
311 rescuedags = ast.literal_eval(cp.get("configuration", "rescue_dags"))
312 Nrescues = rescuedags[prevdags[-1]] # get previous number of rescue DAGs
313 except:
314 rescuedags = None
315 Nrescues = 0 # previous number of rescues
316
317 # check if there is a rescue DAG, and if so run it, and wait
318 rescuefile = prevdags[-1] + ".rescue"
319 if os.path.basename(rescuefile + "%03d" % (Nrescues + 1)) in os.listdir(
320 os.path.dirname(prevdags[-1])
321 ):
322 # if 2 rescue DAGs have already been run then just abort as there's probably some problem
323 if Nrescues == 3:
324 errmsg = "Error... rescue DAG has been run twice and there are still failures. Automation code is aborting. Fix the problem and then retry"
325 print(errmsg, file=sys.stderr)
326 remove_cron(cronid) # remove cron job
327 if email != None:
328 subject = sys.argv[0] + ": Error message"
329 send_email(FROM, email, subject, errmsg, server)
330 sys.exit(1)
331
332 # update number of previous rescues
333 if Nrescues == 0:
334 rescuedags = {}
335 rescuedags[prevdags[-1]] = Nrescues + 1
336
337 # run rescue DAG
338 from subprocess import Popen
339
340 x = Popen(["condor_submit_dag", prevdags[-1]])
341 x.wait()
342 if x.returncode != 0:
343 errmsg = (
344 "Error... unable to submit rescue DAG for '%s'. Automation code is aborting."
345 % prevdags[-1]
346 )
347 remove_cron(cronid) # remove cron job
348 if email != None:
349 subject = sys.argv[0] + ": Error message"
350 send_email(FROM, email, subject, errmsg, server)
351 sys.exit(1)
352
353 # add number of rescue DAGs to configuration file
354 cp.set("configuration", "rescue_dags", str(rescuedags))
355
356 # Write out updated configuration file
357 fc = open(inifile, "w")
358 cp.write(fc)
359 fc.close()
360
361 # wait until re-running
362 try:
363 # reset to run again later
364 if timestep in [
365 "hourly",
366 "daily",
367 ]: # if hourly or daily just wait until the next run
368 print("Running rescue DAG")
369 os._exit(
370 0
371 ) # don't use sys.exit(0) as this throws an exception that is caught by "except": https://stackoverflow.com/a/173323/1862861
372 else: # add a day to the crontab job and re-run then
373 cron = CronTab(user=True)
374 for job in cron.find_comment(cronid):
375 thisjob = job # cron job
376
377 # get a detlaT for a day
378 t1 = Time("2010-01-01 00:00:00")
379 t2 = Time("2010-01-02 00:00:00")
380 dt = t2 - t1
381 newcrontime = now + dt
382 if timestep == "weekly":
383 thisjob.dow.on(
384 newcrontime.datetime.strftime("%a").upper()
385 ) # get new day of the week
386 else:
387 thisjob.day.on(
388 newcrontime.datetime.day
389 ) # get new month of the year
390 cron.write()
391 os._exit(
392 0
393 ) # don't use sys.exit(0) as this throws an exception that is caught by "except": https://stackoverflow.com/a/173323/1862861
394 except:
395 errmsg = "Error... could not reset the crontab to wait for rescue DAG completion."
396 print(errmsg, file=sys.stderr)
397 remove_cron(cronid)
398 if email != None:
399 subject = sys.argv[0] + ": Error message"
400 send_email(FROM, email, subject, errmsg, server)
401 sys.exit(1)
402
403 # Get the start time of the automated analysis - if not present default to the current time
404 if cp.has_option("times", "starttime"):
405 try:
406 starttime = cp.getint("times", "starttime")
407 except:
408 errmsg = "Error... could not parse 'starttime' in '[times]'. A start time is required."
409 print(errmsg, file=sys.stderr)
410 if email != None:
411 subject = sys.argv[0] + ": Error message"
412 send_email(FROM, email, subject, errmsg, server)
413 if not startcron:
414 remove_cron(cronid)
415 sys.exit(1)
416
417 # check start time is in the past
418 if starttime >= gpsnow:
419 errmsg = "Error... start time (%f) must be in the past!" % starttime
420 print(errmsg, file=sys.stderr)
421 if email != None:
422 subject = sys.argv[0] + ": Error message"
423 send_email(FROM, email, subject, errmsg, server)
424 if not startcron:
425 remove_cron(cronid)
426 sys.exit(1)
427
428 # Get the end time of the automated analysis - if not present default to infinity (never stop!)
429 if cp.has_option("times", "endtime"):
430 try:
431 endtime = cp.getint("times", "endtime")
432 except:
433 print(
434 "Warning... could not parse 'endtime' in '[times]'. Defaulting to Infinity."
435 )
436 endtime = np.inf
437 else:
438 # defaulting to infinity
439 endtime = np.inf
440
441 # check end time is after start time
442 if endtime <= starttime:
443 errmsg = "Error... start time is after end time!"
444 print(errmsg, file=sys.stderr)
445 if not startcron:
446 remove_cron(cronid) # remove cron job
447 if email != None:
448 subject = sys.argv[0] + ": Error message"
449 send_email(FROM, email, subject, errmsg, server)
450 sys.exit(1)
451
452 # get a lag time (this will add a lag to gpsnow - if there is a lag between data creation and replication on the various sites a lag may be required, so that the data exists)
453 if cp.has_option("times", "lag"):
454 try:
455 timelag = cp.getint("times", "lag")
456 except:
457 timelag = 0
458 else:
459 timelag = 0 # default to no lag
460
461 # check if this is not the first run of the script
462 if not startcron:
463 try:
464 prev_ends = ast.literal_eval(cp.get("times", "previous_endtimes"))
465 except:
466 errmsg = "Error... cannot parse previous end times list"
467 print(errmsg, file=sys.stderr)
468 if startcron:
469 remove_cron(cronid) # remove cron job
470 if email != None:
471 subject = sys.argv[0] + ": Error message"
472 send_email(FROM, email, subject, errmsg, server)
473 sys.exit(1)
474
475 # update start time in the configuration file to the previous end time
476 newstart = prev_ends[-1]
477
478 # if new start time is after end time stop the cronjob and exit
479 if newstart >= endtime:
480 remove_cron(cronid)
481 sys.exit(0) # end the script
482
483 newend = newstart + int(tsdic[timestep])
484
485 # check if new end time is past the overall end time
486 if newend >= endtime:
487 newend = endtime
488 else:
489 # check if the current time is later than the new end time
490 if newend < gpsnow - timelag:
491 newend = gpsnow - timelag # set end time to now
492
493 prev_ends.append(newend)
494
495 cp.set(
496 "times",
497 "previous_endtimes",
498 "[" + ", ".join(str(z) for z in prev_ends) + "]",
499 ) # output as list
500 else: # create previous end times
501 newstart = starttime
502 newend = newstart + int(tsdic[timestep])
503
504 # check if the current time is later than the new end time
505 if newend >= endtime:
506 newend = endtime
507 else:
508 if newend < gpsnow - timelag:
509 newend = gpsnow - timelag # set end time to now
510
511 cp.set("times", "previous_endtimes", "[" + str(newend) + "]")
512
513 # Get the script for running the full pipeline
514 if cp.has_option("configuration", "exec"):
515 runscript = cp.get("configuration", "exec")
516
517 if not (os.path.isfile(runscript) and os.access(runscript, os.X_OK)):
518 errmsg = (
519 "Error... run script '%s' does not exist or is not executable"
520 % runscript
521 )
522 print(errmsg, file=sys.stderr)
523 if email != None:
524 subject = sys.argv[0] + ": Error message"
525 send_email(FROM, email, subject, errmsg, server)
526 sys.exit(1)
527 else:
528 errmsg = "Error... a run script executable 'exec' is required in the '[configuration]' section."
529 print(errmsg, file=sys.stderr)
530 if not startcron:
531 remove_cron(cronid) # remove cron job
532 if email != None:
533 subject = sys.argv[0] + ": Error message"
534 send_email(FROM, email, subject, errmsg, server)
535 sys.exit(1)
536
537 # edit start and end times for the main run configuration script
538 if cprun.has_section("analysis"):
539 cprun.set("analysis", "starttime", str(newstart)) # set start time
540 cprun.set("analysis", "endtime", str(newend)) # set end time
541 cprun.set("analysis", "autonomous", "True") # set 'autonomous' to true
542 cprun.set(
543 "analysis", "submit_dag", "True"
544 ) # set to make sure Condor DAG is submitted
545
546 # create file name for DAG
547 dagname = "automated_run_%s-%s" % (str(newstart), str(newend))
548 cprun.set(
549 "analysis", "dag_name", dagname
550 ) # add this dag file name to the automation code configuration script (to be used to check for DAG completion)
551
552 if prevdags != None:
553 # add on new DAG file to list
554 prevdags.append(os.path.join(rundir, dagname + ".dag"))
555 cp.set(
556 "configuration",
557 "previous_dags",
558 "[" + ", ".join(['"%s"' % z for z in prevdags]) + "]",
559 ) # output as list
560 else: # output DAG file to previous_dags list
561 cp.set(
562 "configuration",
563 "previous_dags",
564 '["' + os.path.join(rundir, dagname + ".dag") + '"]',
565 )
566
567 # add the initial start time
568 cprun.set("analysis", "autonomous_initial_start", str(starttime))
569
570 # write updated parameters to the file
571 fc = open(runconfig, "w")
572 cprun.write(fc)
573 fc.close()
574 else:
575 errmsg = (
576 "Error... run configuration file '%s' has no '[analysis]' section!"
577 % runconfig
578 )
579 print(errmsg, file=sys.stderr)
580 if not startcron:
581 remove_cron(cronid)
582 if email != None:
583 subject = sys.argv[0] + ": Error message"
584 send_email(FROM, email, subject, errmsg, server)
585 sys.exit(1)
586
587 # create crontab job
588 if startcron:
589 # check for a virtual environment to run code under
590 wov = ""
591 if cp.has_option(
592 "configuration", "virtualenv"
593 ): # assumes using virtualenvwrapper.sh
594 virtualenv = cp.get("configuration", "virtualenv")
595 try:
596 woh = os.environ["WORKON_HOME"]
597 if not os.path.isdir(os.path.join(woh, virtualenv)):
598 print(
599 "Error... if specifying a virtualenv the environment must exist",
600 file=sys.stderr,
601 )
602 sys.exit(1)
603 else:
604 wov = "workon " + virtualenv
605 except:
606 print(
607 "Error... if specifying a virtualenv the 'WORKON_HOME' environment must exist",
608 file=sys.stderr,
609 )
610 sys.exit(1)
611 elif cp.has_option("configuration", "conda"): # assumes using conda
612 virtualenv = cp.get("configuration", "conda")
613 wov = "conda activate {}".format(virtualenv)
614
615 # check for .bash_profile, or similar file, to invoke
616 profile = None
617 if cp.has_option("configuration", "profile"):
618 profile = cp.get("configuration", "profile")
619 else:
620 # default to ${HOME}/.bash_profile
621 profile = os.path.join(os.environ["HOME"], ".bash_profile")
622 if not os.path.isfile(profile):
623 print("Error... no profile file is given", file=sys.stderr)
624 sys.exit(1)
625
626 if keytab is not None:
627 krbcert = "export KRB5CCNAME={}".format(certificate)
628 kinit = "/usr/bin/kinit -a -P -F -k -t {} {}".format(keytab, authprinc)
629 ligoproxyinit = "/usr/bin/ligo-proxy-init -k"
630 else:
631 krbcert = ""
632 kinit = ""
633 ligoproxyinit = ""
634
635 # output wrapper script
636 try:
637 # set the cron wrapper script (which will re-run this script)
638 cronwrapperscript = os.path.splitext(inifile)[0] + ".sh"
639 cronwrapper = (
640 """#!/bin/bash
641source {0} # source profile
642{1} # enable virtual environment (assumes you have virtualenvwrapper.sh/conda)
643{2} # export kerberos certificate location (if required)
644{3} # generate kerberos certificate (if required)
645{4} # create proxy (if required)
646%s {5} # re-run this script
647"""
648 % sys.argv[0]
649 )
650
651 fp = open(cronwrapperscript, "w")
652 fp.write(
653 cronwrapper.format(profile, wov, krbcert, kinit, ligoproxyinit, inifile)
654 )
655 fp.close()
656 os.chmod(
657 cronwrapperscript, stat.S_IRWXU | stat.S_IRWXG | stat.S_IXOTH
658 ) # make executable
659 except:
660 print(
661 "Error... could not output cron wrapper script '%s'."
662 % cronwrapperscript,
663 file=sys.stderr,
664 )
665 sys.exit(1)
666
667 try:
668 cron = CronTab(user=True)
669 job = cron.new(command=cronwrapperscript, comment=cronid)
670
671 # set job time - this will start at the next time step (as we've just run the first step)
672 day = now.datetime.day
673 month = now.datetime.month
674 year = now.datetime.year
675 hour = now.datetime.hour
676 minute = now.datetime.minute
677 dow = now.datetime.strftime("%a").upper() # day of the week
678
679 if timestep == "hourly": # required for 'hourly'
680 job.minute.on(minute)
681 elif timestep == "daily": # required for 'daily'
682 job.minute.on(minute)
683 job.hour.on(hour)
684 elif timestep == "weekly": # required for 'weekly'
685 job.minute.on(minute)
686 job.hour.on(hour)
687 job.dow.on(dow)
688 elif timestep == "monthly": # required for 'monthly'
689 job.minute.on(minute)
690 job.hour.on(hour)
691 job.day.on(day)
692 else:
693 print(
694 "Error... unrecognised 'timestep' option '%s'" % timestep,
695 file=sys.stderr,
696 )
697 sys.exit(1)
698
699 cron.write()
700 except:
701 errmsg = "Error... could not create crontab job"
702 print(errmsg, file=sys.stderr)
703 if email != None:
704 subject = sys.argv[0] + ": Error message"
705 send_email(FROM, email, subject, errmsg, server)
706 sys.exit(1)
707
708 ### RUN ANALYSIS SCRIPT ###
709 p = sp.Popen("{0} {1}".format(runscript, runconfig), shell=True)
710 out, err = p.communicate()
711 if p.returncode != 0:
712 errmsg = "Error... problem running main script '%s'.: %s, %s" % (
713 runscript,
714 out,
715 err,
716 )
717 print(errmsg, file=sys.stderr)
718 if not startcron:
719 remove_cron(cronid)
720 if email != None:
721 subject = sys.argv[0] + ": Error message"
722 send_email(FROM, email, subject, errmsg, server)
723 sys.exit(1)
724 ###########################
725
726 # Write out updated configuration file
727 fc = open(inifile, "w")
728 cp.write(fc)
729 fc.close()
730
731 sys.exit(0)
def send_email(FROM, TO, SUBJECT, MESSAGE, server, quitserver=True)