5The KNOwn Pulsar pipelinE - lalpulsar_knope_automation_script
7Script to be setup and launch a cron job for the automated running of the known pulsar analysis.
9The script requires an initial configuration file. Each time the script will be re-run by cron it will itself
10automatically update the times in the configuration file.
12(C) Matthew Pitkin 2015
16from __future__
import print_function, division
23import subprocess
as sp
29from configparser
import RawConfigParser
31from lalpulsar
import git_version
35 from astropy.time
import Time
38 "Could not import astropy: make sure astropy is installed (e.g. 'pip install astropy') and in the PYTHONPATH",
46 from crontab
import CronTab
49 "Could not import python-crontab: make sure it is installed (e.g. 'pip install python-crontab') and in the PYTHONPATH",
55__author__ =
"Matthew Pitkin <matthew.pitkin@ligo.org>"
56__version__ =
"git id %s" % git_version.id
57__date__ = git_version.date
64 "weekly": 7.0 * 86400.0,
66 * calendar.monthrange(Time.now().datetime.year, Time.now().datetime.month)[1],
73 cron = CronTab(user=
True)
74 cron.remove_all(comment=cronid)
77 print(
"Warning... count not remove crontab job with comment ID '%s'" % cronid)
83def send_email(FROM, TO, SUBJECT, MESSAGE, server, quitserver=True):
85 emailtemplate =
"From: {0}\nTo: {1}\nSubject: {2}\n\n{3}"
86 message = emailtemplate.format(FROM, TO, SUBJECT, MESSAGE)
89 server.sendmail(FROM, TO, message)
91 print(
"Warning... unable to send email.")
98if __name__ ==
"__main__":
99 description =
"""This script will setup and run a cron job for an automated known pulsar pipeline.
100A configuration .ini file is required.
103 parser = argparse.ArgumentParser(description=description)
104 parser.add_argument(
"--version", action=
"version", version=__version__)
105 parser.add_argument(
"inifile", help=
"The configuration (.ini) file")
108 opts = parser.parse_args()
110 inifile = opts.inifile
113 if not os.path.isabs(inifile):
115 "Error... must supply the full absolute path of the configuration file.",
124 cp = RawConfigParser()
129 "Error... cannot parse configuration file '%s'" % inifile, file=sys.stderr
134 if not cp.has_option(
135 "times",
"previous_endtimes"
140 if cp.has_option(
"configuration",
"file"):
142 "configuration",
"file"
145 if not os.path.isfile(runconfig):
147 "Error... run configuration file '%s' does not exist!" % runconfig,
154 print(
"Error... must specify a run configuration '.ini' file", file=sys.stderr)
160 if cp.has_option(
"configuration",
"cronid"):
161 cronid = cp.get(
"configuration",
"cronid")
165 if cp.has_option(
"kerberos",
"keytab"):
166 keytab = cp.get(
"kerberos",
"keytab")
168 if cp.has_option(
"kerberos",
"certificate"):
169 certificate = cp.get(
"kerberos",
"certificate")
171 if cp.has_option(
"kerberos",
"auth_princ"):
172 authprinc = cp.get(
"kerberos",
"auth_princ")
174 raise RuntimeError(
"No kerberos authentication principle")
176 raise RuntimeError(
"Problem with kerberos certificate")
178 cprun = RawConfigParser()
180 cprun.read(runconfig)
182 print(
"Error... could not read run configuration '.ini' file", file=sys.stderr)
186 if cp.has_option(
"times",
"steps"):
187 timestep = cp.get(
"times",
"steps")
189 if timestep
not in [
"hourly",
"daily",
"weekly",
"monthly"]:
190 errmsg =
"Error... 'steps' value in '[times'] must be 'hourly', 'daily', 'weekly' or 'monthly'"
191 print(errmsg, file=sys.stderr)
194 errmsg =
"Error... must specify a time step 'steps' value in '[times]'"
195 print(errmsg, file=sys.stderr)
201 if cp.has_option(
"notification",
"email"):
203 email = cp.get(
"notification",
"email")
206 "Warning... could not get email address from configuration file. No notifications will be sent."
211 "Warning... '%s' is not a valid email address. No notifications will be sent."
218 server = smtplib.SMTP(
"localhost")
221 "Warning... could not get SMTP server. No notication emails will be sent."
229 HOST = socket.getfqdn()
230 USER = os.environ[
"USER"]
231 FROM = USER +
"@" + HOST
233 FROM =
"matthew.pitkin@ligo.org"
237 gpsnow =
int(now.replicate(
"gps").value)
244 "analysis",
"run_dir"
252 prevdags = ast.literal_eval(cp.get(
"configuration",
"previous_dags"))
254 errmsg =
"Error... no previous DAG file(s) have been set."
255 print(errmsg, file=sys.stderr)
258 subject = sys.argv[0] +
": Error message"
259 send_email(FROM, email, subject, errmsg, server)
263 lockfile = prevdags[-1] +
".lock"
264 if os.path.isfile(lockfile):
272 print(
"Previous DAG not finished. Re-running later")
277 cron = CronTab(user=
True)
278 for job
in cron.find_comment(cronid):
282 t1 = Time(
"2010-01-01 00:00:00")
283 t2 = Time(
"2010-01-02 00:00:00")
285 newcrontime = now + dt
286 if timestep ==
"weekly":
288 newcrontime.datetime.strftime(
"%a").upper()
292 newcrontime.datetime.day
300 "Error... could not reset the crontab to wait for DAG completion."
302 print(errmsg, file=sys.stderr)
305 subject = sys.argv[0] +
": Error message"
306 send_email(FROM, email, subject, errmsg, server)
311 rescuedags = ast.literal_eval(cp.get(
"configuration",
"rescue_dags"))
312 Nrescues = rescuedags[prevdags[-1]]
318 rescuefile = prevdags[-1] +
".rescue"
319 if os.path.basename(rescuefile +
"%03d" % (Nrescues + 1))
in os.listdir(
320 os.path.dirname(prevdags[-1])
324 errmsg =
"Error... rescue DAG has been run twice and there are still failures. Automation code is aborting. Fix the problem and then retry"
325 print(errmsg, file=sys.stderr)
328 subject = sys.argv[0] +
": Error message"
329 send_email(FROM, email, subject, errmsg, server)
335 rescuedags[prevdags[-1]] = Nrescues + 1
338 from subprocess
import Popen
340 x = Popen([
"condor_submit_dag", prevdags[-1]])
342 if x.returncode != 0:
344 "Error... unable to submit rescue DAG for '%s'. Automation code is aborting."
349 subject = sys.argv[0] +
": Error message"
350 send_email(FROM, email, subject, errmsg, server)
354 cp.set(
"configuration",
"rescue_dags",
str(rescuedags))
357 fc = open(inifile,
"w")
368 print(
"Running rescue DAG")
373 cron = CronTab(user=
True)
374 for job
in cron.find_comment(cronid):
378 t1 = Time(
"2010-01-01 00:00:00")
379 t2 = Time(
"2010-01-02 00:00:00")
381 newcrontime = now + dt
382 if timestep ==
"weekly":
384 newcrontime.datetime.strftime(
"%a").upper()
388 newcrontime.datetime.day
395 errmsg =
"Error... could not reset the crontab to wait for rescue DAG completion."
396 print(errmsg, file=sys.stderr)
399 subject = sys.argv[0] +
": Error message"
400 send_email(FROM, email, subject, errmsg, server)
404 if cp.has_option(
"times",
"starttime"):
406 starttime = cp.getint(
"times",
"starttime")
408 errmsg =
"Error... could not parse 'starttime' in '[times]'. A start time is required."
409 print(errmsg, file=sys.stderr)
411 subject = sys.argv[0] +
": Error message"
412 send_email(FROM, email, subject, errmsg, server)
418 if starttime >= gpsnow:
419 errmsg =
"Error... start time (%f) must be in the past!" % starttime
420 print(errmsg, file=sys.stderr)
422 subject = sys.argv[0] +
": Error message"
423 send_email(FROM, email, subject, errmsg, server)
429 if cp.has_option(
"times",
"endtime"):
431 endtime = cp.getint(
"times",
"endtime")
434 "Warning... could not parse 'endtime' in '[times]'. Defaulting to Infinity."
442 if endtime <= starttime:
443 errmsg =
"Error... start time is after end time!"
444 print(errmsg, file=sys.stderr)
448 subject = sys.argv[0] +
": Error message"
449 send_email(FROM, email, subject, errmsg, server)
453 if cp.has_option(
"times",
"lag"):
455 timelag = cp.getint(
"times",
"lag")
464 prev_ends = ast.literal_eval(cp.get(
"times",
"previous_endtimes"))
466 errmsg =
"Error... cannot parse previous end times list"
467 print(errmsg, file=sys.stderr)
471 subject = sys.argv[0] +
": Error message"
472 send_email(FROM, email, subject, errmsg, server)
476 newstart = prev_ends[-1]
479 if newstart >= endtime:
483 newend = newstart +
int(tsdic[timestep])
486 if newend >= endtime:
490 if newend < gpsnow - timelag:
491 newend = gpsnow - timelag
493 prev_ends.append(newend)
498 "[" +
", ".join(
str(z)
for z
in prev_ends) +
"]",
502 newend = newstart +
int(tsdic[timestep])
505 if newend >= endtime:
508 if newend < gpsnow - timelag:
509 newend = gpsnow - timelag
511 cp.set(
"times",
"previous_endtimes",
"[" +
str(newend) +
"]")
514 if cp.has_option(
"configuration",
"exec"):
515 runscript = cp.get(
"configuration",
"exec")
517 if not (os.path.isfile(runscript)
and os.access(runscript, os.X_OK)):
519 "Error... run script '%s' does not exist or is not executable"
522 print(errmsg, file=sys.stderr)
524 subject = sys.argv[0] +
": Error message"
525 send_email(FROM, email, subject, errmsg, server)
528 errmsg =
"Error... a run script executable 'exec' is required in the '[configuration]' section."
529 print(errmsg, file=sys.stderr)
533 subject = sys.argv[0] +
": Error message"
534 send_email(FROM, email, subject, errmsg, server)
538 if cprun.has_section(
"analysis"):
539 cprun.set(
"analysis",
"starttime",
str(newstart))
540 cprun.set(
"analysis",
"endtime",
str(newend))
541 cprun.set(
"analysis",
"autonomous",
"True")
543 "analysis",
"submit_dag",
"True"
547 dagname =
"automated_run_%s-%s" % (
str(newstart),
str(newend))
549 "analysis",
"dag_name", dagname
554 prevdags.append(os.path.join(rundir, dagname +
".dag"))
558 "[" +
", ".join([
'"%s"' % z
for z
in prevdags]) +
"]",
564 '["' + os.path.join(rundir, dagname +
".dag") +
'"]',
568 cprun.set(
"analysis",
"autonomous_initial_start",
str(starttime))
571 fc = open(runconfig,
"w")
576 "Error... run configuration file '%s' has no '[analysis]' section!"
579 print(errmsg, file=sys.stderr)
583 subject = sys.argv[0] +
": Error message"
584 send_email(FROM, email, subject, errmsg, server)
592 "configuration",
"virtualenv"
594 virtualenv = cp.get(
"configuration",
"virtualenv")
596 woh = os.environ[
"WORKON_HOME"]
597 if not os.path.isdir(os.path.join(woh, virtualenv)):
599 "Error... if specifying a virtualenv the environment must exist",
604 wov =
"workon " + virtualenv
607 "Error... if specifying a virtualenv the 'WORKON_HOME' environment must exist",
611 elif cp.has_option(
"configuration",
"conda"):
612 virtualenv = cp.get(
"configuration",
"conda")
613 wov =
"conda activate {}".
format(virtualenv)
617 if cp.has_option(
"configuration",
"profile"):
618 profile = cp.get(
"configuration",
"profile")
621 profile = os.path.join(os.environ[
"HOME"],
".bash_profile")
622 if not os.path.isfile(profile):
623 print(
"Error... no profile file is given", file=sys.stderr)
626 if keytab
is not None:
627 krbcert =
"export KRB5CCNAME={}".
format(certificate)
628 kinit =
"/usr/bin/kinit -a -P -F -k -t {} {}".
format(keytab, authprinc)
629 ligoproxyinit =
"/usr/bin/ligo-proxy-init -k"
638 cronwrapperscript = os.path.splitext(inifile)[0] +
".sh"
641source {0} # source profile
651 fp = open(cronwrapperscript, "w")
653 cronwrapper.format(profile, wov, krbcert, kinit, ligoproxyinit, inifile)
657 cronwrapperscript, stat.S_IRWXU | stat.S_IRWXG | stat.S_IXOTH
661 "Error... could not output cron wrapper script '%s'."
668 cron = CronTab(user=
True)
669 job = cron.new(command=cronwrapperscript, comment=cronid)
672 day = now.datetime.day
673 month = now.datetime.month
674 year = now.datetime.year
675 hour = now.datetime.hour
676 minute = now.datetime.minute
677 dow = now.datetime.strftime(
"%a").upper()
679 if timestep ==
"hourly":
680 job.minute.on(minute)
681 elif timestep ==
"daily":
682 job.minute.on(minute)
684 elif timestep ==
"weekly":
685 job.minute.on(minute)
688 elif timestep ==
"monthly":
689 job.minute.on(minute)
694 "Error... unrecognised 'timestep' option '%s'" % timestep,
701 errmsg =
"Error... could not create crontab job"
702 print(errmsg, file=sys.stderr)
704 subject = sys.argv[0] +
": Error message"
705 send_email(FROM, email, subject, errmsg, server)
709 p = sp.Popen(
"{0} {1}".
format(runscript, runconfig), shell=
True)
710 out, err = p.communicate()
711 if p.returncode != 0:
712 errmsg =
"Error... problem running main script '%s'.: %s, %s" % (
717 print(errmsg, file=sys.stderr)
721 subject = sys.argv[0] +
": Error message"
722 send_email(FROM, email, subject, errmsg, server)
727 fc = open(inifile,
"w")
def send_email(FROM, TO, SUBJECT, MESSAGE, server, quitserver=True)