From 840c584cbdf8128c3e8715324bf27c59bd7f5bda Mon Sep 17 00:00:00 2001 From: Peter Eckersley Date: Tue, 20 Dec 2016 09:29:05 -0800 Subject: [PATCH] Make the OCSP checker a class (Since it contains a reasonable amount of system state) --- certbot/cert_manager.py | 7 ++- certbot/cli.py | 4 ++ certbot/ocsp.py | 108 ++++++++++++++++++++++++---------------- 3 files changed, 73 insertions(+), 46 deletions(-) diff --git a/certbot/cert_manager.py b/certbot/cert_manager.py index 4b2fc069f..59b46fe07 100644 --- a/certbot/cert_manager.py +++ b/certbot/cert_manager.py @@ -169,11 +169,14 @@ def _report_lines(msgs): def _report_human_readable(parsed_certs): """Format a results report for a parsed cert""" certinfo = [] + checker = ocsp.RevocationChecker() for cert in parsed_certs: now = pytz.UTC.fromutc(datetime.datetime.utcnow()) expiration_text = "" - revoked = ocsp.revoked_status(cert.cert, cert.chain) - if cert.is_test_cert: + revoked = checker.check_ocsp(cert.cert, cert.chain) + if revoked: + expiration_text = "INVALID: " + revoked + elif cert.is_test_cert: expiration_text = "INVALID: TEST CERT" elif cert.target_expiry <= now: expiration_text = "INVALID: EXPIRED" diff --git a/certbot/cli.py b/certbot/cli.py index 0adb7a4b5..fbcc8ff42 100644 --- a/certbot/cli.py +++ b/certbot/cli.py @@ -943,6 +943,10 @@ def prepare_and_parse_args(plugins, args, detect_defaults=False): # pylint: dis "testing", "--no-verify-ssl", action="store_true", help=config_help("no_verify_ssl"), default=flag_default("no_verify_ssl")) + helpful.add( + ["testing", "certificates"], "--check-ocsp", default="if otherwise valid", + help='Whether to check OCSP for listed certs. Can be set to "never", "always",' + 'or "if otherwise valid".') helpful.add( ["testing", "standalone", "apache", "nginx"], "--tls-sni-01-port", type=int, default=flag_default("tls_sni_01_port"), diff --git a/certbot/ocsp.py b/certbot/ocsp.py index f1b2d7d32..f2fe1188f 100644 --- a/certbot/ocsp.py +++ b/certbot/ocsp.py @@ -13,50 +13,43 @@ REV_LABEL = "REVOKED" INSTALL_LABEL = "(Installed)" +class RevocationChecker(object): + "This class figures out OCSP checking on this system, and performs it." -def revoked_status(cert_path, chain_path): - """Get revoked status for a particular cert version. + def __init__(self): + self.broken = False - .. todo:: Make this a non-blocking call + if not util.exe_exists("openssl"): + logging.info("openssl not installed, can't check revocation") + self.broken = True - :param str cert_path: Path to certificate - :param str chain_path: Path to chain certificate + # New versions of openssl want -header var=val, old ones want -header var val + test_host_format = Popen(["openssl", "ocsp", "-header", "var", "val"], + stdout=PIPE, stderr=PIPE) + _out, err = test_host_format.communicate() + if "Missing =" in err: + self.host_args = lambda host: ["Host=" + host] + else: + self.host_args = lambda host: ["Host", host] - """ - if revoked_status.broken: - return False + def check_ocsp(self, cert_path, chain_path): + """Get revoked status for a particular cert version. - if not util.exe_exists("openssl"): - logging.info("openssl not installed, can't check revocation") - revoked_status.broken = True - return False + .. todo:: Make this a non-blocking call - try: - url, err = util.run_script( - ["openssl", "x509", "-in", cert_path, "-noout", "-ocsp_uri"], - log=logging.debug) - except errors.SubprocessError: - logger.info("Cannot extract OCSP URI from %s", cert_path) - return False + :param str cert_path: Path to certificate + :param str chain_path: Path to intermediate cert - url = url.rstrip() - host = url.partition("://")[2].rstrip("/") - if not host: - logger.info("Cannot process OCSP host from URL (%s) in cert at %s", url, cert_path) - return False + """ + if self.broken: + return False - # New versions of openssl want -header var=val, old ones want -header var val - test_host_format = Popen(["openssl", "ocsp", "-header", "var", "val"], - stdout=PIPE, stderr=PIPE) - _out, err = test_host_format.communicate() - if "Missing =" in err: - host_args = ["Host=" + host] - else: - host_args = ["Host", host] - - # jdkasten thanks "Bulletproof SSL and TLS - Ivan Ristic" for documenting this! - try: + logger.debug("Querying OCSP for %s", cert_path) + url, host = self.determine_ocsp_server(cert_path) + if not host: + return False + # jdkasten thanks "Bulletproof SSL and TLS - Ivan Ristic" for documenting this! cmd = ["openssl", "ocsp", "-no_nonce", "-issuer", chain_path, @@ -64,16 +57,43 @@ def revoked_status(cert_path, chain_path): "-url", url, "-CAfile", chain_path, "-verify_other", chain_path, - "-header"] + host_args - output, err = util.run_script(cmd, log=logging.debug) - except errors.SubprocessError: - logger.info("OCSP querying seems to be broken, assuming nothing is revoked...") - logger.debug("Command was:\n%s\nError was:\n%s", " ".join(cmd), err) - revoked_status.broken = True - return False + "-header"] + self.host_args(host) + try: + output, err = util.run_script(cmd, log=logging.debug) + except errors.SubprocessError, e: + logger.info("We're offline, or OCSP querying is broken. " + "Assuming nothing is revoked...") + logger.debug("Command was:\n%s\nError was:\n%s", " ".join(cmd), e) + self.broken = True + return False - return _translate_ocsp_query(cert_path, output, err) -revoked_status.broken = False + return _translate_ocsp_query(cert_path, output, err) + + + def determine_ocsp_server(self, cert_path): + """Extract the OCSP server host from a certificate. + + :param str cert_path: Path to the cert we're checking OCSP for + :rtype tuple: + :returns: (OCSP server URL or None, OCSP server host or None) + + """ + try: + url, err = util.run_script( + ["openssl", "x509", "-in", cert_path, "-noout", "-ocsp_uri"], + log=logging.debug) + except errors.SubprocessError: + logger.info("Cannot extract OCSP URI from %s", cert_path) + logger.debug("Error was:\n%s", err) + return None, None + + url = url.rstrip() + host = url.partition("://")[2].rstrip("/") + if host: + return url, host + else: + logger.info("Cannot process OCSP host from URL (%s) in cert at %s", url, cert_path) + return None, None def _translate_ocsp_query(cert_path, ocsp_output, ocsp_errors):