2 Script to be setup and launch a cron job for the automated running of the known pulsar analysis.
4 The script requires an initial configuration file. Each time the script will be re-run by cron it will itself
5 automatically update the times in the configuration file.
7 (C) Matthew Pitkin 2015
11 from __future__
import print_function, division
18 import subprocess
as sp
24 from six.moves.configparser
import RawConfigParser
26 from lalpulsar
import git_version
30 from astropy.time
import Time
33 "Could not import astropy: make sure astropy is installed (e.g. 'pip install astropy') and in the PYTHONPATH",
41 from crontab
import CronTab
44 "Could not import python-crontab: make sure it is installed (e.g. 'pip install python-crontab') and in the PYTHONPATH",
50 __author__ =
"Matthew Pitkin <matthew.pitkin@ligo.org>"
51 __version__ =
"git id %s" % git_version.id
52 __date__ = git_version.date
59 "weekly": 7.0 * 86400.0,
61 * calendar.monthrange(Time.now().datetime.year, Time.now().datetime.month)[1],
68 cron = CronTab(user=
True)
69 cron.remove_all(comment=cronid)
72 print(
"Warning... count not remove crontab job with comment ID '%s'" % cronid)
78 def send_email(FROM, TO, SUBJECT, MESSAGE, server, quitserver=True):
80 emailtemplate =
"From: {0}\nTo: {1}\nSubject: {2}\n\n{3}"
81 message = emailtemplate.format(FROM, TO, SUBJECT, MESSAGE)
84 server.sendmail(FROM, TO, message)
86 print(
"Warning... unable to send email.")
93 if __name__ ==
"__main__":
94 description =
"""This script will setup and run a cron job for an automated known pulsar pipeline.
95 A configuration .ini file is required.
98 parser = argparse.ArgumentParser(description=description)
99 parser.add_argument(
"--version", action=
"version", version=__version__)
100 parser.add_argument(
"inifile", help=
"The configuration (.ini) file")
103 opts = parser.parse_args()
105 inifile = opts.inifile
108 if not os.path.isabs(inifile):
110 "Error... must supply the full absolute path of the configuration file.",
119 cp = RawConfigParser()
124 "Error... cannot parse configuration file '%s'" % inifile, file=sys.stderr
129 if not cp.has_option(
130 "times",
"previous_endtimes"
135 if cp.has_option(
"configuration",
"file"):
137 "configuration",
"file"
140 if not os.path.isfile(runconfig):
142 "Error... run configuration file '%s' does not exist!" % runconfig,
149 print(
"Error... must specify a run configuration '.ini' file", file=sys.stderr)
155 if cp.has_option(
"configuration",
"cronid"):
156 cronid = cp.get(
"configuration",
"cronid")
160 if cp.has_option(
"kerberos",
"keytab"):
161 keytab = cp.get(
"kerberos",
"keytab")
163 if cp.has_option(
"kerberos",
"certificate"):
164 certificate = cp.get(
"kerberos",
"certificate")
166 if cp.has_option(
"kerberos",
"auth_princ"):
167 authprinc = cp.get(
"kerberos",
"auth_princ")
169 raise RuntimeError(
"No kerberos authentication principle")
171 raise RuntimeError(
"Problem with kerberos certificate")
173 cprun = RawConfigParser()
175 cprun.read(runconfig)
177 print(
"Error... could not read run configuration '.ini' file", file=sys.stderr)
181 if cp.has_option(
"times",
"steps"):
182 timestep = cp.get(
"times",
"steps")
184 if timestep
not in [
"hourly",
"daily",
"weekly",
"monthly"]:
185 errmsg =
"Error... 'steps' value in '[times'] must be 'hourly', 'daily', 'weekly' or 'monthly'"
186 print(errmsg, file=sys.stderr)
189 errmsg =
"Error... must specify a time step 'steps' value in '[times]'"
190 print(errmsg, file=sys.stderr)
196 if cp.has_option(
"notification",
"email"):
198 email = cp.get(
"notification",
"email")
201 "Warning... could not get email address from configuration file. No notifications will be sent."
206 "Warning... '%s' is not a valid email address. No notifications will be sent."
213 server = smtplib.SMTP(
"localhost")
216 "Warning... could not get SMTP server. No notication emails will be sent."
224 HOST = socket.getfqdn()
225 USER = os.environ[
"USER"]
226 FROM = USER +
"@" + HOST
228 FROM =
"matthew.pitkin@ligo.org"
232 gpsnow =
int(now.replicate(
"gps").value)
239 "analysis",
"run_dir"
247 prevdags = ast.literal_eval(cp.get(
"configuration",
"previous_dags"))
249 errmsg =
"Error... no previous DAG file(s) have been set."
250 print(errmsg, file=sys.stderr)
253 subject = sys.argv[0] +
": Error message"
254 send_email(FROM, email, subject, errmsg, server)
258 lockfile = prevdags[-1] +
".lock"
259 if os.path.isfile(lockfile):
267 print(
"Previous DAG not finished. Re-running later")
272 cron = CronTab(user=
True)
273 for job
in cron.find_comment(cronid):
277 t1 = Time(
"2010-01-01 00:00:00")
278 t2 = Time(
"2010-01-02 00:00:00")
280 newcrontime = now + dt
281 if timestep ==
"weekly":
283 newcrontime.datetime.strftime(
"%a").upper()
287 newcrontime.datetime.day
295 "Error... could not reset the crontab to wait for DAG completion."
297 print(errmsg, file=sys.stderr)
300 subject = sys.argv[0] +
": Error message"
301 send_email(FROM, email, subject, errmsg, server)
306 rescuedags = ast.literal_eval(cp.get(
"configuration",
"rescue_dags"))
307 Nrescues = rescuedags[prevdags[-1]]
313 rescuefile = prevdags[-1] +
".rescue"
314 if os.path.basename(rescuefile +
"%03d" % (Nrescues + 1))
in os.listdir(
315 os.path.dirname(prevdags[-1])
319 errmsg =
"Error... rescue DAG has been run twice and there are still failures. Automation code is aborting. Fix the problem and then retry"
320 print(errmsg, file=sys.stderr)
323 subject = sys.argv[0] +
": Error message"
324 send_email(FROM, email, subject, errmsg, server)
330 rescuedags[prevdags[-1]] = Nrescues + 1
333 from subprocess
import Popen
335 x = Popen([
"condor_submit_dag", prevdags[-1]])
337 if x.returncode != 0:
339 "Error... unable to submit rescue DAG for '%s'. Automation code is aborting."
344 subject = sys.argv[0] +
": Error message"
345 send_email(FROM, email, subject, errmsg, server)
349 cp.set(
"configuration",
"rescue_dags",
str(rescuedags))
352 fc = open(inifile,
"w")
363 print(
"Running rescue DAG")
368 cron = CronTab(user=
True)
369 for job
in cron.find_comment(cronid):
373 t1 = Time(
"2010-01-01 00:00:00")
374 t2 = Time(
"2010-01-02 00:00:00")
376 newcrontime = now + dt
377 if timestep ==
"weekly":
379 newcrontime.datetime.strftime(
"%a").upper()
383 newcrontime.datetime.day
390 errmsg =
"Error... could not reset the crontab to wait for rescue DAG completion."
391 print(errmsg, file=sys.stderr)
394 subject = sys.argv[0] +
": Error message"
395 send_email(FROM, email, subject, errmsg, server)
399 if cp.has_option(
"times",
"starttime"):
401 starttime = cp.getint(
"times",
"starttime")
403 errmsg =
"Error... could not parse 'starttime' in '[times]'. A start time is required."
404 print(errmsg, file=sys.stderr)
406 subject = sys.argv[0] +
": Error message"
407 send_email(FROM, email, subject, errmsg, server)
413 if starttime >= gpsnow:
414 errmsg =
"Error... start time (%f) must be in the past!" % starttime
415 print(errmsg, file=sys.stderr)
417 subject = sys.argv[0] +
": Error message"
418 send_email(FROM, email, subject, errmsg, server)
424 if cp.has_option(
"times",
"endtime"):
426 endtime = cp.getint(
"times",
"endtime")
429 "Warning... could not parse 'endtime' in '[times]'. Defaulting to Infinity."
437 if endtime <= starttime:
438 errmsg =
"Error... start time is after end time!"
439 print(errmsg, file=sys.stderr)
443 subject = sys.argv[0] +
": Error message"
444 send_email(FROM, email, subject, errmsg, server)
448 if cp.has_option(
"times",
"lag"):
450 timelag = cp.getint(
"times",
"lag")
459 prev_ends = ast.literal_eval(cp.get(
"times",
"previous_endtimes"))
461 errmsg =
"Error... cannot parse previous end times list"
462 print(errmsg, file=sys.stderr)
466 subject = sys.argv[0] +
": Error message"
467 send_email(FROM, email, subject, errmsg, server)
471 newstart = prev_ends[-1]
474 if newstart >= endtime:
478 newend = newstart +
int(tsdic[timestep])
481 if newend >= endtime:
485 if newend < gpsnow - timelag:
486 newend = gpsnow - timelag
488 prev_ends.append(newend)
493 "[" +
", ".join(
str(z)
for z
in prev_ends) +
"]",
497 newend = newstart +
int(tsdic[timestep])
500 if newend >= endtime:
503 if newend < gpsnow - timelag:
504 newend = gpsnow - timelag
506 cp.set(
"times",
"previous_endtimes",
"[" +
str(newend) +
"]")
509 if cp.has_option(
"configuration",
"exec"):
510 runscript = cp.get(
"configuration",
"exec")
512 if not (os.path.isfile(runscript)
and os.access(runscript, os.X_OK)):
514 "Error... run script '%s' does not exist or is not executable"
517 print(errmsg, file=sys.stderr)
519 subject = sys.argv[0] +
": Error message"
520 send_email(FROM, email, subject, errmsg, server)
523 errmsg =
"Error... a run script executable 'exec' is required in the '[configuration]' section."
524 print(errmsg, file=sys.stderr)
528 subject = sys.argv[0] +
": Error message"
529 send_email(FROM, email, subject, errmsg, server)
533 if cprun.has_section(
"analysis"):
534 cprun.set(
"analysis",
"starttime",
str(newstart))
535 cprun.set(
"analysis",
"endtime",
str(newend))
536 cprun.set(
"analysis",
"autonomous",
"True")
538 "analysis",
"submit_dag",
"True"
542 dagname =
"automated_run_%s-%s" % (
str(newstart),
str(newend))
544 "analysis",
"dag_name", dagname
549 prevdags.append(os.path.join(rundir, dagname +
".dag"))
553 "[" +
", ".join([
'"%s"' % z
for z
in prevdags]) +
"]",
559 '["' + os.path.join(rundir, dagname +
".dag") +
'"]',
563 cprun.set(
"analysis",
"autonomous_initial_start",
str(starttime))
566 fc = open(runconfig,
"w")
571 "Error... run configuration file '%s' has no '[analysis]' section!"
574 print(errmsg, file=sys.stderr)
578 subject = sys.argv[0] +
": Error message"
579 send_email(FROM, email, subject, errmsg, server)
587 "configuration",
"virtualenv"
589 virtualenv = cp.get(
"configuration",
"virtualenv")
591 woh = os.environ[
"WORKON_HOME"]
592 if not os.path.isdir(os.path.join(woh, virtualenv)):
594 "Error... if specifying a virtualenv the environment must exist",
599 wov =
"workon " + virtualenv
602 "Error... if specifying a virtualenv the 'WORKON_HOME' environment must exist",
606 elif cp.has_option(
"configuration",
"conda"):
607 virtualenv = cp.get(
"configuration",
"conda")
608 wov =
"conda activate {}".format(virtualenv)
612 if cp.has_option(
"configuration",
"profile"):
613 profile = cp.get(
"configuration",
"profile")
616 profile = os.path.join(os.environ[
"HOME"],
".bash_profile")
617 if not os.path.isfile(profile):
618 print(
"Error... no profile file is given", file=sys.stderr)
621 if keytab
is not None:
622 krbcert =
"export KRB5CCNAME={}".format(certificate)
623 kinit =
"/usr/bin/kinit -a -P -F -k -t {} {}".format(keytab, authprinc)
624 ligoproxyinit =
"/usr/bin/ligo-proxy-init -k"
633 cronwrapperscript = os.path.splitext(inifile)[0] +
".sh"
636 source {0} # source profile
637 {1} # enable virtual environment (assumes you have virtualenvwrapper.sh/conda)
638 {2} # export kerberos certificate location (if required)
639 {3} # generate kerberos certificate (if required)
640 {4} # create proxy (if required)
641 %s {5} # re-run this script
646 fp = open(cronwrapperscript,
"w")
648 cronwrapper.format(profile, wov, krbcert, kinit, ligoproxyinit, inifile)
652 cronwrapperscript, stat.S_IRWXU | stat.S_IRWXG | stat.S_IXOTH
656 "Error... could not output cron wrapper script '%s'."
663 cron = CronTab(user=
True)
664 job = cron.new(command=cronwrapperscript, comment=cronid)
667 day = now.datetime.day
668 month = now.datetime.month
669 year = now.datetime.year
670 hour = now.datetime.hour
671 minute = now.datetime.minute
672 dow = now.datetime.strftime(
"%a").upper()
674 if timestep ==
"hourly":
675 job.minute.on(minute)
676 elif timestep ==
"daily":
677 job.minute.on(minute)
679 elif timestep ==
"weekly":
680 job.minute.on(minute)
683 elif timestep ==
"monthly":
684 job.minute.on(minute)
689 "Error... unrecognised 'timestep' option '%s'" % timestep,
696 errmsg =
"Error... could not create crontab job"
697 print(errmsg, file=sys.stderr)
699 subject = sys.argv[0] +
": Error message"
700 send_email(FROM, email, subject, errmsg, server)
704 p = sp.Popen(
"{0} {1}".format(runscript, runconfig), shell=
True)
705 out, err = p.communicate()
706 if p.returncode != 0:
707 errmsg =
"Error... problem running main script '%s'.: %s, %s" % (
712 print(errmsg, file=sys.stderr)
716 subject = sys.argv[0] +
": Error message"
717 send_email(FROM, email, subject, errmsg, server)
722 fc = open(inifile,
"w")
def send_email(FROM, TO, SUBJECT, MESSAGE, server, quitserver=True)