diff --git a/certbot/cert_manager.py b/certbot/cert_manager.py index 35b12e1bb..09798e3bc 100644 --- a/certbot/cert_manager.py +++ b/certbot/cert_manager.py @@ -8,6 +8,7 @@ import zope.component from certbot import errors from certbot import interfaces +from certbot import ocsp from certbot import storage from certbot import util @@ -80,7 +81,7 @@ def certificates(config): parse_failures.append(renewal_file) # Describe all the certs - _describe_certs(parsed_certs, parse_failures) + _describe_certs(config, parsed_certs, parse_failures) def delete(config): """Delete Certbot files associated with a certificate lineage.""" @@ -165,24 +166,37 @@ def _report_lines(msgs): """Format a results report for a category of single-line renewal outcomes""" return " " + "\n ".join(str(msg) for msg in msgs) -def _report_human_readable(parsed_certs): +def _report_human_readable(config, parsed_certs): """Format a results report for a parsed cert""" certinfo = [] + checker = ocsp.RevocationChecker() for cert in parsed_certs: + if config.certname and cert.lineagename != config.certname: + continue + if config.domains and not set(config.domains).issubset(cert.names()): + continue now = pytz.UTC.fromutc(datetime.datetime.utcnow()) + + reasons = [] if cert.is_test_cert: - expiration_text = "INVALID: TEST CERT" - elif cert.target_expiry <= now: - expiration_text = "INVALID: EXPIRED" + reasons.append('TEST_CERT') + if cert.target_expiry <= now: + reasons.append('EXPIRED') + if checker.ocsp_revoked(cert.cert, cert.chain): + reasons.append('REVOKED') + + if reasons: + status = "INVALID: " + ", ".join(reasons) else: diff = cert.target_expiry - now if diff.days == 1: - expiration_text = "VALID: 1 day" + status = "VALID: 1 day" elif diff.days < 1: - expiration_text = "VALID: {0} hour(s)".format(diff.seconds // 3600) + status = "VALID: {0} hour(s)".format(diff.seconds // 3600) else: - expiration_text = "VALID: {0} days".format(diff.days) - valid_string = "{0} ({1})".format(cert.target_expiry, expiration_text) + status = "VALID: {0} days".format(diff.days) + + valid_string = "{0} ({1})".format(cert.target_expiry, status) certinfo.append(" Certificate Name: {0}\n" " Domains: {1}\n" " Expiry Date: {2}\n" @@ -195,7 +209,7 @@ def _report_human_readable(parsed_certs): cert.privkey)) return "\n".join(certinfo) -def _describe_certs(parsed_certs, parse_failures): +def _describe_certs(config, parsed_certs, parse_failures): """Print information about the certs we know about""" out = [] @@ -205,8 +219,9 @@ def _describe_certs(parsed_certs, parse_failures): notify("No certs found.") else: if parsed_certs: - notify("Found the following certs:") - notify(_report_human_readable(parsed_certs)) + match = "matching " if config.certname or config.domains else "" + notify("Found the following {0}certs:".format(match)) + notify(_report_human_readable(config, parsed_certs)) if parse_failures: notify("\nThe following renewal configuration files " "were invalid:") diff --git a/certbot/cli.py b/certbot/cli.py index 8b26568c6..d0964e4a2 100644 --- a/certbot/cli.py +++ b/certbot/cli.py @@ -350,8 +350,10 @@ VERB_HELP = [ "usage": "\n\n certbot renew [--cert-name NAME] [options]\n\n" }), ("certificates", { - "short": "List all certificates managed by Certbot", - "opts": "List all certificates managed by Certbot" + "short": "List certificates managed by Certbot", + "opts": "List certificates managed by Certbot", + "usage": ("\n\n certbot certificates [options] ...\n\n" + "Print information about the status of certificates managed by Certbot.") }), ("delete", { "short": "Clean up all files related to a certificate", @@ -824,14 +826,14 @@ def prepare_and_parse_args(plugins, args, detect_defaults=False): # pylint: dis "being run in a terminal. This flag cannot be used with the " "renew subcommand.") helpful.add( - [None, "run", "certonly"], + [None, "run", "certonly", "certificates"], "-d", "--domains", "--domain", dest="domains", metavar="DOMAIN", action=_DomainsAction, default=[], help="Domain names to apply. For multiple domains you can use " "multiple -d flags or enter a comma separated list of domains " "as a parameter. (default: Ask)") helpful.add( - [None, "run", "certonly", "manage", "rename", "delete"], + [None, "run", "certonly", "manage", "rename", "delete", "certificates"], "--cert-name", dest="certname", metavar="CERTNAME", default=None, help="Certificate name to apply. Only one certificate name can be used " diff --git a/certbot/ocsp.py b/certbot/ocsp.py new file mode 100644 index 000000000..2e0514a44 --- /dev/null +++ b/certbot/ocsp.py @@ -0,0 +1,109 @@ +"""Tools for checking certificate revocation.""" +import logging + +from subprocess import Popen, PIPE + +from certbot import errors +from certbot import util + +logger = logging.getLogger(__name__) + +class RevocationChecker(object): + "This class figures out OCSP checking on this system, and performs it." + + def __init__(self): + self.broken = False + + if not util.exe_exists("openssl"): + logging.info("openssl not installed, can't check revocation") + self.broken = True + return + + # New versions of openssl want -header var=val, old ones want -header var val + test_host_format = Popen(["openssl", "ocsp", "-header", "var", "val"], + stdout=PIPE, stderr=PIPE, universal_newlines=True) + _out, err = test_host_format.communicate() + if "Missing =" in err: + self.host_args = lambda host: ["Host=" + host] + else: + self.host_args = lambda host: ["Host", host] + + + def ocsp_revoked(self, cert_path, chain_path): + """Get revoked status for a particular cert version. + + .. todo:: Make this a non-blocking call + + :param str cert_path: Path to certificate + :param str chain_path: Path to intermediate cert + :rtype bool or None: + :returns: True if revoked; False if valid or the check failed + + """ + if self.broken: + return False + + + logger.debug("Querying OCSP for %s", cert_path) + url, host = self.determine_ocsp_server(cert_path) + if not host: + return False + # jdkasten thanks "Bulletproof SSL and TLS - Ivan Ristic" for documenting this! + cmd = ["openssl", "ocsp", + "-no_nonce", + "-issuer", chain_path, + "-cert", cert_path, + "-url", url, + "-CAfile", chain_path, + "-verify_other", chain_path, + "-header"] + self.host_args(host) + try: + output, err = util.run_script(cmd, log=logging.debug) + except errors.SubprocessError as e: + logger.info("OCSP check failed for %s (are we offline?)", cert_path) + logger.debug("Command was:\n%s\nError was:\n%s", " ".join(cmd), e) + return False + + return _translate_ocsp_query(cert_path, output, err) + + + def determine_ocsp_server(self, cert_path): + """Extract the OCSP server host from a certificate. + + :param str cert_path: Path to the cert we're checking OCSP for + :rtype tuple: + :returns: (OCSP server URL or None, OCSP server host or None) + + """ + try: + url, _err = util.run_script( + ["openssl", "x509", "-in", cert_path, "-noout", "-ocsp_uri"], + log=logging.debug) + except errors.SubprocessError as e: + logger.info("Cannot extract OCSP URI from %s", cert_path) + logger.debug("Error was:\n%s", e) + return None, None + + url = url.rstrip() + host = url.partition("://")[2].rstrip("/") + if host: + return url, host + else: + logger.info("Cannot process OCSP host from URL (%s) in cert at %s", url, cert_path) + return None, None + +def _translate_ocsp_query(cert_path, ocsp_output, ocsp_errors): + """Parse openssl's weird output to work out what it means.""" + + if not "Response verify OK" in ocsp_errors: + logger.info("Revocation status for %s is unknown", cert_path) + logger.debug("Uncertain ouput:\n%s\nstderr:\n%s", ocsp_output, ocsp_errors) + return False + if cert_path + ": good" in ocsp_output: + return False + elif cert_path + ": revoked" in ocsp_output: + return True + else: + logger.warn("Unable to properly parse OCSP output: %s", ocsp_output) + return False + diff --git a/certbot/tests/cert_manager_test.py b/certbot/tests/cert_manager_test.py index ae673fba1..07f7cedaa 100644 --- a/certbot/tests/cert_manager_test.py +++ b/certbot/tests/cert_manager_test.py @@ -1,6 +1,7 @@ """Tests for certbot.cert_manager.""" -# pylint disable=protected-access +# pylint: disable=protected-access import os +import re import shutil import tempfile import unittest @@ -179,7 +180,9 @@ class CertificatesTest(BaseCertManagerTest): self.assertTrue(mock_utility.called) shutil.rmtree(tempdir) - def test_report_human_readable(self): + @mock.patch('certbot.cert_manager.ocsp.RevocationChecker.ocsp_revoked') + def test_report_human_readable(self, mock_revoked): + mock_revoked.return_value = None from certbot import cert_manager import datetime, pytz expiry = pytz.UTC.fromutc(datetime.datetime.utcnow()) @@ -189,32 +192,59 @@ class CertificatesTest(BaseCertManagerTest): cert.names.return_value = ["nameone", "nametwo"] cert.is_test_cert = False parsed_certs = [cert] + # pylint: disable=protected-access - out = cert_manager._report_human_readable(parsed_certs) + get_report = lambda: cert_manager._report_human_readable(mock_config, parsed_certs) + + mock_config = mock.MagicMock(certname=None, lineagename=None) + # pylint: disable=protected-access + out = get_report() self.assertTrue("INVALID: EXPIRED" in out) cert.target_expiry += datetime.timedelta(hours=2) # pylint: disable=protected-access - out = cert_manager._report_human_readable(parsed_certs) + out = get_report() self.assertTrue('1 hour(s)' in out) self.assertTrue('VALID' in out and not 'INVALID' in out) cert.target_expiry += datetime.timedelta(days=1) # pylint: disable=protected-access - out = cert_manager._report_human_readable(parsed_certs) + out = get_report() self.assertTrue('1 day' in out) self.assertFalse('under' in out) self.assertTrue('VALID' in out and not 'INVALID' in out) cert.target_expiry += datetime.timedelta(days=2) # pylint: disable=protected-access - out = cert_manager._report_human_readable(parsed_certs) + out = get_report() self.assertTrue('3 days' in out) self.assertTrue('VALID' in out and not 'INVALID' in out) cert.is_test_cert = True - out = cert_manager._report_human_readable(parsed_certs) - self.assertTrue('INVALID: TEST CERT' in out) + mock_revoked.return_value = True + out = get_report() + self.assertTrue('INVALID: TEST_CERT, REVOKED' in out) + + cert = mock.MagicMock(lineagename="indescribable") + cert.target_expiry = expiry + cert.names.return_value = ["nameone", "thrice.named"] + cert.is_test_cert = True + parsed_certs.append(cert) + + out = get_report() + self.assertEqual(len(re.findall("INVALID:", out)), 2) + mock_config.domains = ["thrice.named"] + out = get_report() + self.assertEqual(len(re.findall("INVALID:", out)), 1) + mock_config.domains = ["nameone"] + out = get_report() + self.assertEqual(len(re.findall("INVALID:", out)), 2) + mock_config.certname = "indescribable" + out = get_report() + self.assertEqual(len(re.findall("INVALID:", out)), 1) + mock_config.certname = "horror" + out = get_report() + self.assertEqual(len(re.findall("INVALID:", out)), 0) class SearchLineagesTest(BaseCertManagerTest): diff --git a/certbot/tests/ocsp_test.py b/certbot/tests/ocsp_test.py new file mode 100644 index 000000000..ff79bb01e --- /dev/null +++ b/certbot/tests/ocsp_test.py @@ -0,0 +1,137 @@ +"""Tests for ocsp.py""" +# pylint: disable=protected-access + +import unittest + +import mock + +from certbot import errors + +out = """Missing = in header key=value +ocsp: Use -help for summary. +""" + +class OCSPTest(unittest.TestCase): + + _multiprocess_can_split_ = True + + def setUp(self): + from certbot import ocsp + with mock.patch('certbot.ocsp.Popen') as mock_popen: + with mock.patch('certbot.util.exe_exists') as mock_exists: + mock_communicate = mock.MagicMock() + mock_communicate.communicate.return_value = (None, out) + mock_popen.return_value = mock_communicate + mock_exists.return_value = True + self.checker = ocsp.RevocationChecker() + + def tearDown(self): + pass + + @mock.patch('certbot.ocsp.logging.info') + @mock.patch('certbot.ocsp.Popen') + @mock.patch('certbot.util.exe_exists') + def test_init(self, mock_exists, mock_popen, mock_log): + mock_communicate = mock.MagicMock() + mock_communicate.communicate.return_value = (None, out) + mock_popen.return_value = mock_communicate + mock_exists.return_value = True + + from certbot import ocsp + checker = ocsp.RevocationChecker() + self.assertEqual(mock_popen.call_count, 1) + self.assertEqual(checker.host_args("x"), ["Host=x"]) + + mock_communicate.communicate.return_value = (None, out.partition("\n")[2]) + checker = ocsp.RevocationChecker() + self.assertEqual(checker.host_args("x"), ["Host", "x"]) + self.assertEqual(checker.broken, False) + + mock_exists.return_value = False + mock_popen.call_count = 0 + checker = ocsp.RevocationChecker() + self.assertEqual(mock_popen.call_count, 0) + self.assertEqual(mock_log.call_count, 1) + self.assertEqual(checker.broken, True) + + @mock.patch('certbot.ocsp.RevocationChecker.determine_ocsp_server') + @mock.patch('certbot.util.run_script') + def test_ocsp_revoked(self, mock_run, mock_determine): + self.checker.broken = True + mock_determine.return_value = ("", "") + self.assertEqual(self.checker.ocsp_revoked("x", "y"), False) + + self.checker.broken = False + mock_run.return_value = tuple(openssl_happy[1:]) + self.assertEqual(self.checker.ocsp_revoked("x", "y"), False) + self.assertEqual(mock_run.call_count, 0) + + mock_determine.return_value = ("http://x.co", "x.co") + self.assertEqual(self.checker.ocsp_revoked("blah.pem", "chain.pem"), False) + mock_run.side_effect = errors.SubprocessError("Unable to load certificate launcher") + self.assertEqual(self.checker.ocsp_revoked("x", "y"), False) + self.assertEqual(mock_run.call_count, 2) + + + @mock.patch('certbot.ocsp.logger.debug') + @mock.patch('certbot.ocsp.logger.info') + @mock.patch('certbot.util.run_script') + def test_determine_ocsp_server(self, mock_run, mock_info, mock_debug): + uri = "http://ocsp.stg-int-x1.letsencrypt.org/" + host = "ocsp.stg-int-x1.letsencrypt.org" + mock_run.return_value = uri, "" + self.assertEqual(self.checker.determine_ocsp_server("beep"), (uri, host)) + mock_run.return_value = "ftp:/" + host + "/", "" + self.assertEqual(self.checker.determine_ocsp_server("beep"), (None, None)) + self.assertEqual(mock_info.call_count, 1) + + c = "confusion" + mock_run.side_effect = errors.SubprocessError(c) + self.assertEqual(self.checker.determine_ocsp_server("beep"), (None, None)) + self.assertTrue(c in repr(mock_debug.call_args[0][1])) + + @mock.patch('certbot.ocsp.logger') + @mock.patch('certbot.util.run_script') + def test_translate_ocsp(self, mock_run, mock_log): + # pylint: disable=protected-access,star-args + mock_run.return_value = openssl_confused + from certbot import ocsp + self.assertEqual(ocsp._translate_ocsp_query(*openssl_happy), False) + self.assertEqual(ocsp._translate_ocsp_query(*openssl_confused), False) + self.assertEqual(mock_log.debug.call_count, 1) + self.assertEqual(mock_log.warn.call_count, 0) + self.assertEqual(ocsp._translate_ocsp_query(*openssl_broken), False) + self.assertEqual(mock_log.warn.call_count, 1) + self.assertEqual(ocsp._translate_ocsp_query(*openssl_revoked), True) + + +# pylint: disable=line-too-long +openssl_confused = ("", """ +/etc/letsencrypt/live/example.org/cert.pem: good + This Update: Dec 17 00:00:00 2016 GMT + Next Update: Dec 24 00:00:00 2016 GMT +""", +""" +Response Verify Failure +139903674214048:error:27069065:OCSP routines:OCSP_basic_verify:certificate verify error:ocsp_vfy.c:138:Verify error:unable to get local issuer certificate +""") + +openssl_happy = ("blah.pem", """ +blah.pem: good + This Update: Dec 20 18:00:00 2016 GMT + Next Update: Dec 27 18:00:00 2016 GMT +""", +"Response verify OK") + +openssl_revoked = ("blah.pem", """ +blah.pem: revoked + This Update: Dec 20 01:00:00 2016 GMT + Next Update: Dec 27 01:00:00 2016 GMT + Revocation Time: Dec 20 01:46:34 2016 GMT +""", +"""Response verify OK""") + +openssl_broken = ("", "tentacles", "Response verify OK") + +if __name__ == '__main__': + unittest.main() # pragma: no cover diff --git a/certbot/util.py b/certbot/util.py index cbcfa3314..cc0a74bd2 100644 --- a/certbot/util.py +++ b/certbot/util.py @@ -38,20 +38,22 @@ ANSI_SGR_RED = "\033[31m" ANSI_SGR_RESET = "\033[0m" -def run_script(params): +def run_script(params, log=logger.error): """Run the script with the given params. :param list params: List of parameters to pass to Popen + :param logging.Logger log: Logger to use for errors """ try: proc = subprocess.Popen(params, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + stderr=subprocess.PIPE, + universal_newlines=True) except (OSError, ValueError): msg = "Unable to run the command: %s" % " ".join(params) - logger.error(msg) + log(msg) raise errors.SubprocessError(msg) stdout, stderr = proc.communicate() @@ -60,7 +62,7 @@ def run_script(params): msg = "Error while running %s.\n%s\n%s" % ( " ".join(params), stdout, stderr) # Enter recovery routine... - logger.error(msg) + log(msg) raise errors.SubprocessError(msg) return stdout, stderr