From 16f66ffdf6a13ef7cd0cf9ddd2126ea426f34539 Mon Sep 17 00:00:00 2001 From: Tom Date: Wed, 14 Sep 2016 14:14:07 -0500 Subject: Add TLS certificate expiration checker --- jobs/TLSCertExpiration.py | 58 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100755 jobs/TLSCertExpiration.py (limited to 'jobs') diff --git a/jobs/TLSCertExpiration.py b/jobs/TLSCertExpiration.py new file mode 100755 index 0000000..873a53e --- /dev/null +++ b/jobs/TLSCertExpiration.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python + +import ssl +import time +import logging +import httplib +import OpenSSL +import datetime + +import JobBase +import JobSpawner + +class TLSCertExpiration(JobSpawner.JobSpawner): + servers = [ + + ] + + class CertChecker(JobBase.JobBase): + def __init__(self, config, url, frequency, failureNotificationFrequency): + self.config = config + self.url = url + self.frequency = frequency + self.failureNotificationFrequency = failureNotificationFrequency + super(TLSCertExpiration.CertChecker, self).__init__(config, url, frequency, failureNotificationFrequency) + + def getName(self): + return str(self.__class__) + " for " + self.url + def executeEvery(self): + return self.frequency + def notifyOnFailureEvery(self): + return self.failureNotificationFrequency + def execute(self): + try: + context = ssl._create_unverified_context() + c = httplib.HTTPSConnection(self.url, context=context) + c.request("GET", "/") + asn1 = c.sock.getpeercert(True) + x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, asn1) + na = time.mktime(time.strptime(x509.get_notAfter()[:-1], '%Y%m%d%H%M%S')) + now = time.time() + delta = datetime.timedelta(seconds=(na - now)) + if delta < datetime.timedelta(days=30): + self.failuremsg = "Server Certificate for " + self.url + " expires in " + str(delta.days) + " days" + return False + return True + except Exception as e: + self.failuremsg = "Could not get server certificate " + self.url + "\n" + str(e) + logging.warn(self.failuremsg) + return False + def onFailure(self): + return self.sendEmail(self.failuremsg, "") + def onStateChangeSuccess(self): + return self.sendEmail("Successfully hit " + self.url, "") + + def get_sub_jobs(self, config): + for s in self.servers: + yield self.CertChecker(config, s[0], s[1], s[2]) + -- cgit v1.2.3