mirror of
https://github.com/certbot/certbot.git
synced 2026-05-28 04:34:11 -04:00
Merge pull request #3941 from certbot/ocsp
OCSP checking and other cool "certbot certificates" features
This commit is contained in:
commit
839ff7a265
6 changed files with 323 additions and 28 deletions
|
|
@ -8,6 +8,7 @@ import zope.component
|
|||
|
||||
from certbot import errors
|
||||
from certbot import interfaces
|
||||
from certbot import ocsp
|
||||
from certbot import storage
|
||||
from certbot import util
|
||||
|
||||
|
|
@ -80,7 +81,7 @@ def certificates(config):
|
|||
parse_failures.append(renewal_file)
|
||||
|
||||
# Describe all the certs
|
||||
_describe_certs(parsed_certs, parse_failures)
|
||||
_describe_certs(config, parsed_certs, parse_failures)
|
||||
|
||||
def delete(config):
|
||||
"""Delete Certbot files associated with a certificate lineage."""
|
||||
|
|
@ -165,24 +166,37 @@ def _report_lines(msgs):
|
|||
"""Format a results report for a category of single-line renewal outcomes"""
|
||||
return " " + "\n ".join(str(msg) for msg in msgs)
|
||||
|
||||
def _report_human_readable(parsed_certs):
|
||||
def _report_human_readable(config, parsed_certs):
|
||||
"""Format a results report for a parsed cert"""
|
||||
certinfo = []
|
||||
checker = ocsp.RevocationChecker()
|
||||
for cert in parsed_certs:
|
||||
if config.certname and cert.lineagename != config.certname:
|
||||
continue
|
||||
if config.domains and not set(config.domains).issubset(cert.names()):
|
||||
continue
|
||||
now = pytz.UTC.fromutc(datetime.datetime.utcnow())
|
||||
|
||||
reasons = []
|
||||
if cert.is_test_cert:
|
||||
expiration_text = "INVALID: TEST CERT"
|
||||
elif cert.target_expiry <= now:
|
||||
expiration_text = "INVALID: EXPIRED"
|
||||
reasons.append('TEST_CERT')
|
||||
if cert.target_expiry <= now:
|
||||
reasons.append('EXPIRED')
|
||||
if checker.ocsp_revoked(cert.cert, cert.chain):
|
||||
reasons.append('REVOKED')
|
||||
|
||||
if reasons:
|
||||
status = "INVALID: " + ", ".join(reasons)
|
||||
else:
|
||||
diff = cert.target_expiry - now
|
||||
if diff.days == 1:
|
||||
expiration_text = "VALID: 1 day"
|
||||
status = "VALID: 1 day"
|
||||
elif diff.days < 1:
|
||||
expiration_text = "VALID: {0} hour(s)".format(diff.seconds // 3600)
|
||||
status = "VALID: {0} hour(s)".format(diff.seconds // 3600)
|
||||
else:
|
||||
expiration_text = "VALID: {0} days".format(diff.days)
|
||||
valid_string = "{0} ({1})".format(cert.target_expiry, expiration_text)
|
||||
status = "VALID: {0} days".format(diff.days)
|
||||
|
||||
valid_string = "{0} ({1})".format(cert.target_expiry, status)
|
||||
certinfo.append(" Certificate Name: {0}\n"
|
||||
" Domains: {1}\n"
|
||||
" Expiry Date: {2}\n"
|
||||
|
|
@ -195,7 +209,7 @@ def _report_human_readable(parsed_certs):
|
|||
cert.privkey))
|
||||
return "\n".join(certinfo)
|
||||
|
||||
def _describe_certs(parsed_certs, parse_failures):
|
||||
def _describe_certs(config, parsed_certs, parse_failures):
|
||||
"""Print information about the certs we know about"""
|
||||
out = []
|
||||
|
||||
|
|
@ -205,8 +219,9 @@ def _describe_certs(parsed_certs, parse_failures):
|
|||
notify("No certs found.")
|
||||
else:
|
||||
if parsed_certs:
|
||||
notify("Found the following certs:")
|
||||
notify(_report_human_readable(parsed_certs))
|
||||
match = "matching " if config.certname or config.domains else ""
|
||||
notify("Found the following {0}certs:".format(match))
|
||||
notify(_report_human_readable(config, parsed_certs))
|
||||
if parse_failures:
|
||||
notify("\nThe following renewal configuration files "
|
||||
"were invalid:")
|
||||
|
|
|
|||
|
|
@ -350,8 +350,10 @@ VERB_HELP = [
|
|||
"usage": "\n\n certbot renew [--cert-name NAME] [options]\n\n"
|
||||
}),
|
||||
("certificates", {
|
||||
"short": "List all certificates managed by Certbot",
|
||||
"opts": "List all certificates managed by Certbot"
|
||||
"short": "List certificates managed by Certbot",
|
||||
"opts": "List certificates managed by Certbot",
|
||||
"usage": ("\n\n certbot certificates [options] ...\n\n"
|
||||
"Print information about the status of certificates managed by Certbot.")
|
||||
}),
|
||||
("delete", {
|
||||
"short": "Clean up all files related to a certificate",
|
||||
|
|
@ -824,14 +826,14 @@ def prepare_and_parse_args(plugins, args, detect_defaults=False): # pylint: dis
|
|||
"being run in a terminal. This flag cannot be used with the "
|
||||
"renew subcommand.")
|
||||
helpful.add(
|
||||
[None, "run", "certonly"],
|
||||
[None, "run", "certonly", "certificates"],
|
||||
"-d", "--domains", "--domain", dest="domains",
|
||||
metavar="DOMAIN", action=_DomainsAction, default=[],
|
||||
help="Domain names to apply. For multiple domains you can use "
|
||||
"multiple -d flags or enter a comma separated list of domains "
|
||||
"as a parameter. (default: Ask)")
|
||||
helpful.add(
|
||||
[None, "run", "certonly", "manage", "rename", "delete"],
|
||||
[None, "run", "certonly", "manage", "rename", "delete", "certificates"],
|
||||
"--cert-name", dest="certname",
|
||||
metavar="CERTNAME", default=None,
|
||||
help="Certificate name to apply. Only one certificate name can be used "
|
||||
|
|
|
|||
109
certbot/ocsp.py
Normal file
109
certbot/ocsp.py
Normal file
|
|
@ -0,0 +1,109 @@
|
|||
"""Tools for checking certificate revocation."""
|
||||
import logging
|
||||
|
||||
from subprocess import Popen, PIPE
|
||||
|
||||
from certbot import errors
|
||||
from certbot import util
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class RevocationChecker(object):
|
||||
"This class figures out OCSP checking on this system, and performs it."
|
||||
|
||||
def __init__(self):
|
||||
self.broken = False
|
||||
|
||||
if not util.exe_exists("openssl"):
|
||||
logging.info("openssl not installed, can't check revocation")
|
||||
self.broken = True
|
||||
return
|
||||
|
||||
# New versions of openssl want -header var=val, old ones want -header var val
|
||||
test_host_format = Popen(["openssl", "ocsp", "-header", "var", "val"],
|
||||
stdout=PIPE, stderr=PIPE, universal_newlines=True)
|
||||
_out, err = test_host_format.communicate()
|
||||
if "Missing =" in err:
|
||||
self.host_args = lambda host: ["Host=" + host]
|
||||
else:
|
||||
self.host_args = lambda host: ["Host", host]
|
||||
|
||||
|
||||
def ocsp_revoked(self, cert_path, chain_path):
|
||||
"""Get revoked status for a particular cert version.
|
||||
|
||||
.. todo:: Make this a non-blocking call
|
||||
|
||||
:param str cert_path: Path to certificate
|
||||
:param str chain_path: Path to intermediate cert
|
||||
:rtype bool or None:
|
||||
:returns: True if revoked; False if valid or the check failed
|
||||
|
||||
"""
|
||||
if self.broken:
|
||||
return False
|
||||
|
||||
|
||||
logger.debug("Querying OCSP for %s", cert_path)
|
||||
url, host = self.determine_ocsp_server(cert_path)
|
||||
if not host:
|
||||
return False
|
||||
# jdkasten thanks "Bulletproof SSL and TLS - Ivan Ristic" for documenting this!
|
||||
cmd = ["openssl", "ocsp",
|
||||
"-no_nonce",
|
||||
"-issuer", chain_path,
|
||||
"-cert", cert_path,
|
||||
"-url", url,
|
||||
"-CAfile", chain_path,
|
||||
"-verify_other", chain_path,
|
||||
"-header"] + self.host_args(host)
|
||||
try:
|
||||
output, err = util.run_script(cmd, log=logging.debug)
|
||||
except errors.SubprocessError as e:
|
||||
logger.info("OCSP check failed for %s (are we offline?)", cert_path)
|
||||
logger.debug("Command was:\n%s\nError was:\n%s", " ".join(cmd), e)
|
||||
return False
|
||||
|
||||
return _translate_ocsp_query(cert_path, output, err)
|
||||
|
||||
|
||||
def determine_ocsp_server(self, cert_path):
|
||||
"""Extract the OCSP server host from a certificate.
|
||||
|
||||
:param str cert_path: Path to the cert we're checking OCSP for
|
||||
:rtype tuple:
|
||||
:returns: (OCSP server URL or None, OCSP server host or None)
|
||||
|
||||
"""
|
||||
try:
|
||||
url, _err = util.run_script(
|
||||
["openssl", "x509", "-in", cert_path, "-noout", "-ocsp_uri"],
|
||||
log=logging.debug)
|
||||
except errors.SubprocessError as e:
|
||||
logger.info("Cannot extract OCSP URI from %s", cert_path)
|
||||
logger.debug("Error was:\n%s", e)
|
||||
return None, None
|
||||
|
||||
url = url.rstrip()
|
||||
host = url.partition("://")[2].rstrip("/")
|
||||
if host:
|
||||
return url, host
|
||||
else:
|
||||
logger.info("Cannot process OCSP host from URL (%s) in cert at %s", url, cert_path)
|
||||
return None, None
|
||||
|
||||
def _translate_ocsp_query(cert_path, ocsp_output, ocsp_errors):
|
||||
"""Parse openssl's weird output to work out what it means."""
|
||||
|
||||
if not "Response verify OK" in ocsp_errors:
|
||||
logger.info("Revocation status for %s is unknown", cert_path)
|
||||
logger.debug("Uncertain ouput:\n%s\nstderr:\n%s", ocsp_output, ocsp_errors)
|
||||
return False
|
||||
if cert_path + ": good" in ocsp_output:
|
||||
return False
|
||||
elif cert_path + ": revoked" in ocsp_output:
|
||||
return True
|
||||
else:
|
||||
logger.warn("Unable to properly parse OCSP output: %s", ocsp_output)
|
||||
return False
|
||||
|
||||
|
|
@ -1,6 +1,7 @@
|
|||
"""Tests for certbot.cert_manager."""
|
||||
# pylint disable=protected-access
|
||||
# pylint: disable=protected-access
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import tempfile
|
||||
import unittest
|
||||
|
|
@ -179,7 +180,9 @@ class CertificatesTest(BaseCertManagerTest):
|
|||
self.assertTrue(mock_utility.called)
|
||||
shutil.rmtree(tempdir)
|
||||
|
||||
def test_report_human_readable(self):
|
||||
@mock.patch('certbot.cert_manager.ocsp.RevocationChecker.ocsp_revoked')
|
||||
def test_report_human_readable(self, mock_revoked):
|
||||
mock_revoked.return_value = None
|
||||
from certbot import cert_manager
|
||||
import datetime, pytz
|
||||
expiry = pytz.UTC.fromutc(datetime.datetime.utcnow())
|
||||
|
|
@ -189,32 +192,59 @@ class CertificatesTest(BaseCertManagerTest):
|
|||
cert.names.return_value = ["nameone", "nametwo"]
|
||||
cert.is_test_cert = False
|
||||
parsed_certs = [cert]
|
||||
|
||||
# pylint: disable=protected-access
|
||||
out = cert_manager._report_human_readable(parsed_certs)
|
||||
get_report = lambda: cert_manager._report_human_readable(mock_config, parsed_certs)
|
||||
|
||||
mock_config = mock.MagicMock(certname=None, lineagename=None)
|
||||
# pylint: disable=protected-access
|
||||
out = get_report()
|
||||
self.assertTrue("INVALID: EXPIRED" in out)
|
||||
|
||||
cert.target_expiry += datetime.timedelta(hours=2)
|
||||
# pylint: disable=protected-access
|
||||
out = cert_manager._report_human_readable(parsed_certs)
|
||||
out = get_report()
|
||||
self.assertTrue('1 hour(s)' in out)
|
||||
self.assertTrue('VALID' in out and not 'INVALID' in out)
|
||||
|
||||
cert.target_expiry += datetime.timedelta(days=1)
|
||||
# pylint: disable=protected-access
|
||||
out = cert_manager._report_human_readable(parsed_certs)
|
||||
out = get_report()
|
||||
self.assertTrue('1 day' in out)
|
||||
self.assertFalse('under' in out)
|
||||
self.assertTrue('VALID' in out and not 'INVALID' in out)
|
||||
|
||||
cert.target_expiry += datetime.timedelta(days=2)
|
||||
# pylint: disable=protected-access
|
||||
out = cert_manager._report_human_readable(parsed_certs)
|
||||
out = get_report()
|
||||
self.assertTrue('3 days' in out)
|
||||
self.assertTrue('VALID' in out and not 'INVALID' in out)
|
||||
|
||||
cert.is_test_cert = True
|
||||
out = cert_manager._report_human_readable(parsed_certs)
|
||||
self.assertTrue('INVALID: TEST CERT' in out)
|
||||
mock_revoked.return_value = True
|
||||
out = get_report()
|
||||
self.assertTrue('INVALID: TEST_CERT, REVOKED' in out)
|
||||
|
||||
cert = mock.MagicMock(lineagename="indescribable")
|
||||
cert.target_expiry = expiry
|
||||
cert.names.return_value = ["nameone", "thrice.named"]
|
||||
cert.is_test_cert = True
|
||||
parsed_certs.append(cert)
|
||||
|
||||
out = get_report()
|
||||
self.assertEqual(len(re.findall("INVALID:", out)), 2)
|
||||
mock_config.domains = ["thrice.named"]
|
||||
out = get_report()
|
||||
self.assertEqual(len(re.findall("INVALID:", out)), 1)
|
||||
mock_config.domains = ["nameone"]
|
||||
out = get_report()
|
||||
self.assertEqual(len(re.findall("INVALID:", out)), 2)
|
||||
mock_config.certname = "indescribable"
|
||||
out = get_report()
|
||||
self.assertEqual(len(re.findall("INVALID:", out)), 1)
|
||||
mock_config.certname = "horror"
|
||||
out = get_report()
|
||||
self.assertEqual(len(re.findall("INVALID:", out)), 0)
|
||||
|
||||
|
||||
class SearchLineagesTest(BaseCertManagerTest):
|
||||
|
|
|
|||
137
certbot/tests/ocsp_test.py
Normal file
137
certbot/tests/ocsp_test.py
Normal file
|
|
@ -0,0 +1,137 @@
|
|||
"""Tests for ocsp.py"""
|
||||
# pylint: disable=protected-access
|
||||
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
|
||||
from certbot import errors
|
||||
|
||||
out = """Missing = in header key=value
|
||||
ocsp: Use -help for summary.
|
||||
"""
|
||||
|
||||
class OCSPTest(unittest.TestCase):
|
||||
|
||||
_multiprocess_can_split_ = True
|
||||
|
||||
def setUp(self):
|
||||
from certbot import ocsp
|
||||
with mock.patch('certbot.ocsp.Popen') as mock_popen:
|
||||
with mock.patch('certbot.util.exe_exists') as mock_exists:
|
||||
mock_communicate = mock.MagicMock()
|
||||
mock_communicate.communicate.return_value = (None, out)
|
||||
mock_popen.return_value = mock_communicate
|
||||
mock_exists.return_value = True
|
||||
self.checker = ocsp.RevocationChecker()
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
||||
|
||||
@mock.patch('certbot.ocsp.logging.info')
|
||||
@mock.patch('certbot.ocsp.Popen')
|
||||
@mock.patch('certbot.util.exe_exists')
|
||||
def test_init(self, mock_exists, mock_popen, mock_log):
|
||||
mock_communicate = mock.MagicMock()
|
||||
mock_communicate.communicate.return_value = (None, out)
|
||||
mock_popen.return_value = mock_communicate
|
||||
mock_exists.return_value = True
|
||||
|
||||
from certbot import ocsp
|
||||
checker = ocsp.RevocationChecker()
|
||||
self.assertEqual(mock_popen.call_count, 1)
|
||||
self.assertEqual(checker.host_args("x"), ["Host=x"])
|
||||
|
||||
mock_communicate.communicate.return_value = (None, out.partition("\n")[2])
|
||||
checker = ocsp.RevocationChecker()
|
||||
self.assertEqual(checker.host_args("x"), ["Host", "x"])
|
||||
self.assertEqual(checker.broken, False)
|
||||
|
||||
mock_exists.return_value = False
|
||||
mock_popen.call_count = 0
|
||||
checker = ocsp.RevocationChecker()
|
||||
self.assertEqual(mock_popen.call_count, 0)
|
||||
self.assertEqual(mock_log.call_count, 1)
|
||||
self.assertEqual(checker.broken, True)
|
||||
|
||||
@mock.patch('certbot.ocsp.RevocationChecker.determine_ocsp_server')
|
||||
@mock.patch('certbot.util.run_script')
|
||||
def test_ocsp_revoked(self, mock_run, mock_determine):
|
||||
self.checker.broken = True
|
||||
mock_determine.return_value = ("", "")
|
||||
self.assertEqual(self.checker.ocsp_revoked("x", "y"), False)
|
||||
|
||||
self.checker.broken = False
|
||||
mock_run.return_value = tuple(openssl_happy[1:])
|
||||
self.assertEqual(self.checker.ocsp_revoked("x", "y"), False)
|
||||
self.assertEqual(mock_run.call_count, 0)
|
||||
|
||||
mock_determine.return_value = ("http://x.co", "x.co")
|
||||
self.assertEqual(self.checker.ocsp_revoked("blah.pem", "chain.pem"), False)
|
||||
mock_run.side_effect = errors.SubprocessError("Unable to load certificate launcher")
|
||||
self.assertEqual(self.checker.ocsp_revoked("x", "y"), False)
|
||||
self.assertEqual(mock_run.call_count, 2)
|
||||
|
||||
|
||||
@mock.patch('certbot.ocsp.logger.debug')
|
||||
@mock.patch('certbot.ocsp.logger.info')
|
||||
@mock.patch('certbot.util.run_script')
|
||||
def test_determine_ocsp_server(self, mock_run, mock_info, mock_debug):
|
||||
uri = "http://ocsp.stg-int-x1.letsencrypt.org/"
|
||||
host = "ocsp.stg-int-x1.letsencrypt.org"
|
||||
mock_run.return_value = uri, ""
|
||||
self.assertEqual(self.checker.determine_ocsp_server("beep"), (uri, host))
|
||||
mock_run.return_value = "ftp:/" + host + "/", ""
|
||||
self.assertEqual(self.checker.determine_ocsp_server("beep"), (None, None))
|
||||
self.assertEqual(mock_info.call_count, 1)
|
||||
|
||||
c = "confusion"
|
||||
mock_run.side_effect = errors.SubprocessError(c)
|
||||
self.assertEqual(self.checker.determine_ocsp_server("beep"), (None, None))
|
||||
self.assertTrue(c in repr(mock_debug.call_args[0][1]))
|
||||
|
||||
@mock.patch('certbot.ocsp.logger')
|
||||
@mock.patch('certbot.util.run_script')
|
||||
def test_translate_ocsp(self, mock_run, mock_log):
|
||||
# pylint: disable=protected-access,star-args
|
||||
mock_run.return_value = openssl_confused
|
||||
from certbot import ocsp
|
||||
self.assertEqual(ocsp._translate_ocsp_query(*openssl_happy), False)
|
||||
self.assertEqual(ocsp._translate_ocsp_query(*openssl_confused), False)
|
||||
self.assertEqual(mock_log.debug.call_count, 1)
|
||||
self.assertEqual(mock_log.warn.call_count, 0)
|
||||
self.assertEqual(ocsp._translate_ocsp_query(*openssl_broken), False)
|
||||
self.assertEqual(mock_log.warn.call_count, 1)
|
||||
self.assertEqual(ocsp._translate_ocsp_query(*openssl_revoked), True)
|
||||
|
||||
|
||||
# pylint: disable=line-too-long
|
||||
openssl_confused = ("", """
|
||||
/etc/letsencrypt/live/example.org/cert.pem: good
|
||||
This Update: Dec 17 00:00:00 2016 GMT
|
||||
Next Update: Dec 24 00:00:00 2016 GMT
|
||||
""",
|
||||
"""
|
||||
Response Verify Failure
|
||||
139903674214048:error:27069065:OCSP routines:OCSP_basic_verify:certificate verify error:ocsp_vfy.c:138:Verify error:unable to get local issuer certificate
|
||||
""")
|
||||
|
||||
openssl_happy = ("blah.pem", """
|
||||
blah.pem: good
|
||||
This Update: Dec 20 18:00:00 2016 GMT
|
||||
Next Update: Dec 27 18:00:00 2016 GMT
|
||||
""",
|
||||
"Response verify OK")
|
||||
|
||||
openssl_revoked = ("blah.pem", """
|
||||
blah.pem: revoked
|
||||
This Update: Dec 20 01:00:00 2016 GMT
|
||||
Next Update: Dec 27 01:00:00 2016 GMT
|
||||
Revocation Time: Dec 20 01:46:34 2016 GMT
|
||||
""",
|
||||
"""Response verify OK""")
|
||||
|
||||
openssl_broken = ("", "tentacles", "Response verify OK")
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main() # pragma: no cover
|
||||
|
|
@ -38,20 +38,22 @@ ANSI_SGR_RED = "\033[31m"
|
|||
ANSI_SGR_RESET = "\033[0m"
|
||||
|
||||
|
||||
def run_script(params):
|
||||
def run_script(params, log=logger.error):
|
||||
"""Run the script with the given params.
|
||||
|
||||
:param list params: List of parameters to pass to Popen
|
||||
:param logging.Logger log: Logger to use for errors
|
||||
|
||||
"""
|
||||
try:
|
||||
proc = subprocess.Popen(params,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
stderr=subprocess.PIPE,
|
||||
universal_newlines=True)
|
||||
|
||||
except (OSError, ValueError):
|
||||
msg = "Unable to run the command: %s" % " ".join(params)
|
||||
logger.error(msg)
|
||||
log(msg)
|
||||
raise errors.SubprocessError(msg)
|
||||
|
||||
stdout, stderr = proc.communicate()
|
||||
|
|
@ -60,7 +62,7 @@ def run_script(params):
|
|||
msg = "Error while running %s.\n%s\n%s" % (
|
||||
" ".join(params), stdout, stderr)
|
||||
# Enter recovery routine...
|
||||
logger.error(msg)
|
||||
log(msg)
|
||||
raise errors.SubprocessError(msg)
|
||||
|
||||
return stdout, stderr
|
||||
|
|
|
|||
Loading…
Reference in a new issue