LALPulsar  6.1.0.1-b72065a
lalpulsar_knope_automation_script.py
Go to the documentation of this file.
1 """
2 Script to be setup and launch a cron job for the automated running of the known pulsar analysis.
3 
4 The script requires an initial configuration file. Each time the script will be re-run by cron it will itself
5 automatically update the times in the configuration file.
6 
7 (C) Matthew Pitkin 2015
8 """
9 
10 # make print statements python 3-proof
11 from __future__ import print_function, division
12 
13 import sys
14 import os
15 import ast
16 import calendar
17 import numpy as np
18 import subprocess as sp
19 import socket
20 import smtplib
21 import stat
22 
23 import argparse
24 from six.moves.configparser import RawConfigParser
25 
26 from lalpulsar import git_version
27 
28 # try importing astropy
29 try:
30  from astropy.time import Time
31 except ImportError:
32  print(
33  "Could not import astropy: make sure astropy is installed (e.g. 'pip install astropy') and in the PYTHONPATH",
34  file=sys.stderr,
35  )
36  sys.exit(1)
37 
38 
39 # try import python-crontab
40 try:
41  from crontab import CronTab
42 except ImportError:
43  print(
44  "Could not import python-crontab: make sure it is installed (e.g. 'pip install python-crontab') and in the PYTHONPATH",
45  file=sys.stderr,
46  )
47  sys.exit(1)
48 
49 
50 __author__ = "Matthew Pitkin <matthew.pitkin@ligo.org>"
51 __version__ = "git id %s" % git_version.id
52 __date__ = git_version.date
53 
54 
55 # number of seconds for each allowed time step value
56 tsdic = {
57  "hourly": 1440.0,
58  "daily": 86400.0,
59  "weekly": 7.0 * 86400.0,
60  "monthly": 86400.0
61  * calendar.monthrange(Time.now().datetime.year, Time.now().datetime.month)[1],
62 }
63 
64 
65 # function to remove crontab job
66 def remove_cron(cronid):
67  try:
68  cron = CronTab(user=True)
69  cron.remove_all(comment=cronid)
70  cron.write()
71  except:
72  print("Warning... count not remove crontab job with comment ID '%s'" % cronid)
73 
74  return
75 
76 
77 # function to send an email
78 def send_email(FROM, TO, SUBJECT, MESSAGE, server, quitserver=True):
79  # set a notication email template
80  emailtemplate = "From: {0}\nTo: {1}\nSubject: {2}\n\n{3}"
81  message = emailtemplate.format(FROM, TO, SUBJECT, MESSAGE)
82 
83  try:
84  server.sendmail(FROM, TO, message)
85  except:
86  print("Warning... unable to send email.")
87 
88  if quitserver:
89  server.quit()
90 
91 
92 # main function
93 if __name__ == "__main__":
94  description = """This script will setup and run a cron job for an automated known pulsar pipeline.
95 A configuration .ini file is required.
96 """
97 
98  parser = argparse.ArgumentParser(description=description)
99  parser.add_argument("--version", action="version", version=__version__)
100  parser.add_argument("inifile", help="The configuration (.ini) file")
101 
102  # parse input options
103  opts = parser.parse_args()
104 
105  inifile = opts.inifile
106 
107  # check that inifile contains full absolute path
108  if not os.path.isabs(inifile):
109  print(
110  "Error... must supply the full absolute path of the configuration file.",
111  file=sys.stderr,
112  )
113  sys.exit(1)
114 
115  startcron = False # variable to say whether to create the crontab (if this is the first time the script is run then this will be changed to True later)
116  cronid = "knopeJob" # default ID for the crontab job
117 
118  # open and parse config file
119  cp = RawConfigParser()
120  try:
121  cp.read(inifile)
122  except:
123  print(
124  "Error... cannot parse configuration file '%s'" % inifile, file=sys.stderr
125  )
126  sys.exit(1)
127 
128  # if configuration file has previous_endtimes option then the cronjob must have started
129  if not cp.has_option(
130  "times", "previous_endtimes"
131  ): # make sure to start the crontab job
132  startcron = True
133 
134  # open and parse the run configuration file
135  if cp.has_option("configuration", "file"):
136  runconfig = cp.get(
137  "configuration", "file"
138  ) # Get main configuration ini template for the run
139 
140  if not os.path.isfile(runconfig):
141  print(
142  "Error... run configuration file '%s' does not exist!" % runconfig,
143  file=sys.stderr,
144  )
145  if not startcron:
146  remove_cron(cronid) # remove cron job
147  sys.exit(1)
148  else:
149  print("Error... must specify a run configuration '.ini' file", file=sys.stderr)
150  if startcron:
151  remove_cron(cronid)
152  sys.exit(1)
153 
154  # check for cron ID
155  if cp.has_option("configuration", "cronid"):
156  cronid = cp.get("configuration", "cronid")
157 
158  # check for kerberos keytab and certificate
159  keytab = None
160  if cp.has_option("kerberos", "keytab"):
161  keytab = cp.get("kerberos", "keytab")
162 
163  if cp.has_option("kerberos", "certificate"):
164  certificate = cp.get("kerberos", "certificate")
165 
166  if cp.has_option("kerberos", "auth_princ"):
167  authprinc = cp.get("kerberos", "auth_princ")
168  else:
169  raise RuntimeError("No kerberos authentication principle")
170  else:
171  raise RuntimeError("Problem with kerberos certificate")
172 
173  cprun = RawConfigParser()
174  try:
175  cprun.read(runconfig)
176  except:
177  print("Error... could not read run configuration '.ini' file", file=sys.stderr)
178  sys.exit(1)
179 
180  # get the time increment for re-running the pipeline on (can be "hourly", "daily", "weekly" or "monthly")
181  if cp.has_option("times", "steps"):
182  timestep = cp.get("times", "steps")
183 
184  if timestep not in ["hourly", "daily", "weekly", "monthly"]:
185  errmsg = "Error... 'steps' value in '[times'] must be 'hourly', 'daily', 'weekly' or 'monthly'"
186  print(errmsg, file=sys.stderr)
187  sys.exit(1)
188  else:
189  errmsg = "Error... must specify a time step 'steps' value in '[times]'"
190  print(errmsg, file=sys.stderr)
191  sys.exit(1)
192 
193  # try getting email address information (for notifications of e.g. job failures, re-running rescue DAGs, etc)
194  email = None
195  server = None
196  if cp.has_option("notification", "email"):
197  try:
198  email = cp.get("notification", "email")
199  except:
200  print(
201  "Warning... could not get email address from configuration file. No notifications will be sent."
202  )
203 
204  if "@" not in email:
205  print(
206  "Warning... '%s' is not a valid email address. No notifications will be sent."
207  )
208  email = None
209 
210  # set email server
211  if email != None:
212  try:
213  server = smtplib.SMTP("localhost")
214  except:
215  print(
216  "Warning... could not get SMTP server. No notication emails will be sent."
217  )
218  email = None
219 
220  # set email sender (if this fails maybe just hardcode sender to be 'matthew.pitkin@ligo.org')
221  FROM = None
222  if email != None:
223  try:
224  HOST = socket.getfqdn()
225  USER = os.environ["USER"]
226  FROM = USER + "@" + HOST
227  except:
228  FROM = "matthew.pitkin@ligo.org"
229 
230  # Get the current time
231  now = Time.now()
232  gpsnow = int(now.replicate("gps").value)
233 
234  # check for DAG completion in previous analyses
235  prevdags = None
236  rescuedags = None
237  try:
238  rundir = cprun.get(
239  "analysis", "run_dir"
240  ) # get run directory where DAG will have been created
241  except:
242  rundir = (
243  os.getcwd()
244  ) # if 'run_dir' was not set then the current working directory will have been used
245  if not startcron:
246  try:
247  prevdags = ast.literal_eval(cp.get("configuration", "previous_dags"))
248  except:
249  errmsg = "Error... no previous DAG file(s) have been set."
250  print(errmsg, file=sys.stderr)
251  remove_cron(cronid) # remove cron job
252  if email != None:
253  subject = sys.argv[0] + ": Error message"
254  send_email(FROM, email, subject, errmsg, server)
255  sys.exit(1)
256 
257  # check if the last dag has completed yet
258  lockfile = prevdags[-1] + ".lock"
259  if os.path.isfile(lockfile):
260  # DAG has not yet complete, so wait before trying again (by updating the crontab entry)
261  try:
262  # reset to run again later
263  if timestep in [
264  "hourly",
265  "daily",
266  ]: # if hourly or daily just wait until the next run
267  print("Previous DAG not finished. Re-running later")
268  os._exit(
269  0
270  ) # don't use sys.exit(0) as this throws an exception that is caught by "except": https://stackoverflow.com/a/173323/1862861
271  else: # add a day to the crontab job and re-run then
272  cron = CronTab(user=True)
273  for job in cron.find_comment(cronid):
274  thisjob = job # cron job
275 
276  # get a detlaT for a day
277  t1 = Time("2010-01-01 00:00:00")
278  t2 = Time("2010-01-02 00:00:00")
279  dt = t2 - t1
280  newcrontime = now + dt
281  if timestep == "weekly":
282  thisjob.dow.on(
283  newcrontime.datetime.strftime("%a").upper()
284  ) # get new day of the week
285  else:
286  thisjob.day.on(
287  newcrontime.datetime.day
288  ) # get new month of the year
289  cron.write()
290  os._exit(
291  0
292  ) # don't use sys.exit(0) as this throws an exception that is caught by "except": https://stackoverflow.com/a/173323/1862861
293  except:
294  errmsg = (
295  "Error... could not reset the crontab to wait for DAG completion."
296  )
297  print(errmsg, file=sys.stderr)
298  remove_cron(cronid)
299  if email != None:
300  subject = sys.argv[0] + ": Error message"
301  send_email(FROM, email, subject, errmsg, server)
302  sys.exit(1)
303 
304  # get any previous rescue DAGs - held in a dictionary and keyed to the original DAG name
305  try:
306  rescuedags = ast.literal_eval(cp.get("configuration", "rescue_dags"))
307  Nrescues = rescuedags[prevdags[-1]] # get previous number of rescue DAGs
308  except:
309  rescuedags = None
310  Nrescues = 0 # previous number of rescues
311 
312  # check if there is a rescue DAG, and if so run it, and wait
313  rescuefile = prevdags[-1] + ".rescue"
314  if os.path.basename(rescuefile + "%03d" % (Nrescues + 1)) in os.listdir(
315  os.path.dirname(prevdags[-1])
316  ):
317  # if 2 rescue DAGs have already been run then just abort as there's probably some problem
318  if Nrescues == 3:
319  errmsg = "Error... rescue DAG has been run twice and there are still failures. Automation code is aborting. Fix the problem and then retry"
320  print(errmsg, file=sys.stderr)
321  remove_cron(cronid) # remove cron job
322  if email != None:
323  subject = sys.argv[0] + ": Error message"
324  send_email(FROM, email, subject, errmsg, server)
325  sys.exit(1)
326 
327  # update number of previous rescues
328  if Nrescues == 0:
329  rescuedags = {}
330  rescuedags[prevdags[-1]] = Nrescues + 1
331 
332  # run rescue DAG
333  from subprocess import Popen
334 
335  x = Popen(["condor_submit_dag", prevdags[-1]])
336  x.wait()
337  if x.returncode != 0:
338  errmsg = (
339  "Error... unable to submit rescue DAG for '%s'. Automation code is aborting."
340  % prevdags[-1]
341  )
342  remove_cron(cronid) # remove cron job
343  if email != None:
344  subject = sys.argv[0] + ": Error message"
345  send_email(FROM, email, subject, errmsg, server)
346  sys.exit(1)
347 
348  # add number of rescue DAGs to configuration file
349  cp.set("configuration", "rescue_dags", str(rescuedags))
350 
351  # Write out updated configuration file
352  fc = open(inifile, "w")
353  cp.write(fc)
354  fc.close()
355 
356  # wait until re-running
357  try:
358  # reset to run again later
359  if timestep in [
360  "hourly",
361  "daily",
362  ]: # if hourly or daily just wait until the next run
363  print("Running rescue DAG")
364  os._exit(
365  0
366  ) # don't use sys.exit(0) as this throws an exception that is caught by "except": https://stackoverflow.com/a/173323/1862861
367  else: # add a day to the crontab job and re-run then
368  cron = CronTab(user=True)
369  for job in cron.find_comment(cronid):
370  thisjob = job # cron job
371 
372  # get a detlaT for a day
373  t1 = Time("2010-01-01 00:00:00")
374  t2 = Time("2010-01-02 00:00:00")
375  dt = t2 - t1
376  newcrontime = now + dt
377  if timestep == "weekly":
378  thisjob.dow.on(
379  newcrontime.datetime.strftime("%a").upper()
380  ) # get new day of the week
381  else:
382  thisjob.day.on(
383  newcrontime.datetime.day
384  ) # get new month of the year
385  cron.write()
386  os._exit(
387  0
388  ) # don't use sys.exit(0) as this throws an exception that is caught by "except": https://stackoverflow.com/a/173323/1862861
389  except:
390  errmsg = "Error... could not reset the crontab to wait for rescue DAG completion."
391  print(errmsg, file=sys.stderr)
392  remove_cron(cronid)
393  if email != None:
394  subject = sys.argv[0] + ": Error message"
395  send_email(FROM, email, subject, errmsg, server)
396  sys.exit(1)
397 
398  # Get the start time of the automated analysis - if not present default to the current time
399  if cp.has_option("times", "starttime"):
400  try:
401  starttime = cp.getint("times", "starttime")
402  except:
403  errmsg = "Error... could not parse 'starttime' in '[times]'. A start time is required."
404  print(errmsg, file=sys.stderr)
405  if email != None:
406  subject = sys.argv[0] + ": Error message"
407  send_email(FROM, email, subject, errmsg, server)
408  if not startcron:
409  remove_cron(cronid)
410  sys.exit(1)
411 
412  # check start time is in the past
413  if starttime >= gpsnow:
414  errmsg = "Error... start time (%f) must be in the past!" % starttime
415  print(errmsg, file=sys.stderr)
416  if email != None:
417  subject = sys.argv[0] + ": Error message"
418  send_email(FROM, email, subject, errmsg, server)
419  if not startcron:
420  remove_cron(cronid)
421  sys.exit(1)
422 
423  # Get the end time of the automated analysis - if not present default to infinity (never stop!)
424  if cp.has_option("times", "endtime"):
425  try:
426  endtime = cp.getint("times", "endtime")
427  except:
428  print(
429  "Warning... could not parse 'endtime' in '[times]'. Defaulting to Infinity."
430  )
431  endtime = np.inf
432  else:
433  # defaulting to infinity
434  endtime = np.inf
435 
436  # check end time is after start time
437  if endtime <= starttime:
438  errmsg = "Error... start time is after end time!"
439  print(errmsg, file=sys.stderr)
440  if not startcron:
441  remove_cron(cronid) # remove cron job
442  if email != None:
443  subject = sys.argv[0] + ": Error message"
444  send_email(FROM, email, subject, errmsg, server)
445  sys.exit(1)
446 
447  # get a lag time (this will add a lag to gpsnow - if there is a lag between data creation and replication on the various sites a lag may be required, so that the data exists)
448  if cp.has_option("times", "lag"):
449  try:
450  timelag = cp.getint("times", "lag")
451  except:
452  timelag = 0
453  else:
454  timelag = 0 # default to no lag
455 
456  # check if this is not the first run of the script
457  if not startcron:
458  try:
459  prev_ends = ast.literal_eval(cp.get("times", "previous_endtimes"))
460  except:
461  errmsg = "Error... cannot parse previous end times list"
462  print(errmsg, file=sys.stderr)
463  if startcron:
464  remove_cron(cronid) # remove cron job
465  if email != None:
466  subject = sys.argv[0] + ": Error message"
467  send_email(FROM, email, subject, errmsg, server)
468  sys.exit(1)
469 
470  # update start time in the configuration file to the previous end time
471  newstart = prev_ends[-1]
472 
473  # if new start time is after end time stop the cronjob and exit
474  if newstart >= endtime:
475  remove_cron(cronid)
476  sys.exit(0) # end the script
477 
478  newend = newstart + int(tsdic[timestep])
479 
480  # check if new end time is past the overall end time
481  if newend >= endtime:
482  newend = endtime
483  else:
484  # check if the current time is later than the new end time
485  if newend < gpsnow - timelag:
486  newend = gpsnow - timelag # set end time to now
487 
488  prev_ends.append(newend)
489 
490  cp.set(
491  "times",
492  "previous_endtimes",
493  "[" + ", ".join(str(z) for z in prev_ends) + "]",
494  ) # output as list
495  else: # create previous end times
496  newstart = starttime
497  newend = newstart + int(tsdic[timestep])
498 
499  # check if the current time is later than the new end time
500  if newend >= endtime:
501  newend = endtime
502  else:
503  if newend < gpsnow - timelag:
504  newend = gpsnow - timelag # set end time to now
505 
506  cp.set("times", "previous_endtimes", "[" + str(newend) + "]")
507 
508  # Get the script for running the full pipeline
509  if cp.has_option("configuration", "exec"):
510  runscript = cp.get("configuration", "exec")
511 
512  if not (os.path.isfile(runscript) and os.access(runscript, os.X_OK)):
513  errmsg = (
514  "Error... run script '%s' does not exist or is not executable"
515  % runscript
516  )
517  print(errmsg, file=sys.stderr)
518  if email != None:
519  subject = sys.argv[0] + ": Error message"
520  send_email(FROM, email, subject, errmsg, server)
521  sys.exit(1)
522  else:
523  errmsg = "Error... a run script executable 'exec' is required in the '[configuration]' section."
524  print(errmsg, file=sys.stderr)
525  if not startcron:
526  remove_cron(cronid) # remove cron job
527  if email != None:
528  subject = sys.argv[0] + ": Error message"
529  send_email(FROM, email, subject, errmsg, server)
530  sys.exit(1)
531 
532  # edit start and end times for the main run configuration script
533  if cprun.has_section("analysis"):
534  cprun.set("analysis", "starttime", str(newstart)) # set start time
535  cprun.set("analysis", "endtime", str(newend)) # set end time
536  cprun.set("analysis", "autonomous", "True") # set 'autonomous' to true
537  cprun.set(
538  "analysis", "submit_dag", "True"
539  ) # set to make sure Condor DAG is submitted
540 
541  # create file name for DAG
542  dagname = "automated_run_%s-%s" % (str(newstart), str(newend))
543  cprun.set(
544  "analysis", "dag_name", dagname
545  ) # add this dag file name to the automation code configuration script (to be used to check for DAG completion)
546 
547  if prevdags != None:
548  # add on new DAG file to list
549  prevdags.append(os.path.join(rundir, dagname + ".dag"))
550  cp.set(
551  "configuration",
552  "previous_dags",
553  "[" + ", ".join(['"%s"' % z for z in prevdags]) + "]",
554  ) # output as list
555  else: # output DAG file to previous_dags list
556  cp.set(
557  "configuration",
558  "previous_dags",
559  '["' + os.path.join(rundir, dagname + ".dag") + '"]',
560  )
561 
562  # add the initial start time
563  cprun.set("analysis", "autonomous_initial_start", str(starttime))
564 
565  # write updated parameters to the file
566  fc = open(runconfig, "w")
567  cprun.write(fc)
568  fc.close()
569  else:
570  errmsg = (
571  "Error... run configuration file '%s' has no '[analysis]' section!"
572  % runconfig
573  )
574  print(errmsg, file=sys.stderr)
575  if not startcron:
576  remove_cron(cronid)
577  if email != None:
578  subject = sys.argv[0] + ": Error message"
579  send_email(FROM, email, subject, errmsg, server)
580  sys.exit(1)
581 
582  # create crontab job
583  if startcron:
584  # check for a virtual environment to run code under
585  wov = ""
586  if cp.has_option(
587  "configuration", "virtualenv"
588  ): # assumes using virtualenvwrapper.sh
589  virtualenv = cp.get("configuration", "virtualenv")
590  try:
591  woh = os.environ["WORKON_HOME"]
592  if not os.path.isdir(os.path.join(woh, virtualenv)):
593  print(
594  "Error... if specifying a virtualenv the environment must exist",
595  file=sys.stderr,
596  )
597  sys.exit(1)
598  else:
599  wov = "workon " + virtualenv
600  except:
601  print(
602  "Error... if specifying a virtualenv the 'WORKON_HOME' environment must exist",
603  file=sys.stderr,
604  )
605  sys.exit(1)
606  elif cp.has_option("configuration", "conda"): # assumes using conda
607  virtualenv = cp.get("configuration", "conda")
608  wov = "conda activate {}".format(virtualenv)
609 
610  # check for .bash_profile, or similar file, to invoke
611  profile = None
612  if cp.has_option("configuration", "profile"):
613  profile = cp.get("configuration", "profile")
614  else:
615  # default to ${HOME}/.bash_profile
616  profile = os.path.join(os.environ["HOME"], ".bash_profile")
617  if not os.path.isfile(profile):
618  print("Error... no profile file is given", file=sys.stderr)
619  sys.exit(1)
620 
621  if keytab is not None:
622  krbcert = "export KRB5CCNAME={}".format(certificate)
623  kinit = "/usr/bin/kinit -a -P -F -k -t {} {}".format(keytab, authprinc)
624  ligoproxyinit = "/usr/bin/ligo-proxy-init -k"
625  else:
626  krbcert = ""
627  kinit = ""
628  ligoproxyinit = ""
629 
630  # output wrapper script
631  try:
632  # set the cron wrapper script (which will re-run this script)
633  cronwrapperscript = os.path.splitext(inifile)[0] + ".sh"
634  cronwrapper = (
635  """#!/bin/bash
636 source {0} # source profile
637 {1} # enable virtual environment (assumes you have virtualenvwrapper.sh/conda)
638 {2} # export kerberos certificate location (if required)
639 {3} # generate kerberos certificate (if required)
640 {4} # create proxy (if required)
641 %s {5} # re-run this script
642 """
643  % sys.argv[0]
644  )
645 
646  fp = open(cronwrapperscript, "w")
647  fp.write(
648  cronwrapper.format(profile, wov, krbcert, kinit, ligoproxyinit, inifile)
649  )
650  fp.close()
651  os.chmod(
652  cronwrapperscript, stat.S_IRWXU | stat.S_IRWXG | stat.S_IXOTH
653  ) # make executable
654  except:
655  print(
656  "Error... could not output cron wrapper script '%s'."
657  % cronwrapperscript,
658  file=sys.stderr,
659  )
660  sys.exit(1)
661 
662  try:
663  cron = CronTab(user=True)
664  job = cron.new(command=cronwrapperscript, comment=cronid)
665 
666  # set job time - this will start at the next time step (as we've just run the first step)
667  day = now.datetime.day
668  month = now.datetime.month
669  year = now.datetime.year
670  hour = now.datetime.hour
671  minute = now.datetime.minute
672  dow = now.datetime.strftime("%a").upper() # day of the week
673 
674  if timestep == "hourly": # required for 'hourly'
675  job.minute.on(minute)
676  elif timestep == "daily": # required for 'daily'
677  job.minute.on(minute)
678  job.hour.on(hour)
679  elif timestep == "weekly": # required for 'weekly'
680  job.minute.on(minute)
681  job.hour.on(hour)
682  job.dow.on(dow)
683  elif timestep == "monthly": # required for 'monthly'
684  job.minute.on(minute)
685  job.hour.on(hour)
686  job.day.on(day)
687  else:
688  print(
689  "Error... unrecognised 'timestep' option '%s'" % timestep,
690  file=sys.stderr,
691  )
692  sys.exit(1)
693 
694  cron.write()
695  except:
696  errmsg = "Error... could not create crontab job"
697  print(errmsg, file=sys.stderr)
698  if email != None:
699  subject = sys.argv[0] + ": Error message"
700  send_email(FROM, email, subject, errmsg, server)
701  sys.exit(1)
702 
703  ### RUN ANALYSIS SCRIPT ###
704  p = sp.Popen("{0} {1}".format(runscript, runconfig), shell=True)
705  out, err = p.communicate()
706  if p.returncode != 0:
707  errmsg = "Error... problem running main script '%s'.: %s, %s" % (
708  runscript,
709  out,
710  err,
711  )
712  print(errmsg, file=sys.stderr)
713  if not startcron:
714  remove_cron(cronid)
715  if email != None:
716  subject = sys.argv[0] + ": Error message"
717  send_email(FROM, email, subject, errmsg, server)
718  sys.exit(1)
719  ###########################
720 
721  # Write out updated configuration file
722  fc = open(inifile, "w")
723  cp.write(fc)
724  fc.close()
725 
726  sys.exit(0)
def send_email(FROM, TO, SUBJECT, MESSAGE, server, quitserver=True)