Make the OCSP checker a class

(Since it contains a reasonable amount of system state)
This commit is contained in:
Peter Eckersley 2016-12-20 09:29:05 -08:00
parent 7a18a124ce
commit 840c584cbd
3 changed files with 73 additions and 46 deletions

View file

@ -169,11 +169,14 @@ def _report_lines(msgs):
def _report_human_readable(parsed_certs):
"""Format a results report for a parsed cert"""
certinfo = []
checker = ocsp.RevocationChecker()
for cert in parsed_certs:
now = pytz.UTC.fromutc(datetime.datetime.utcnow())
expiration_text = ""
revoked = ocsp.revoked_status(cert.cert, cert.chain)
if cert.is_test_cert:
revoked = checker.check_ocsp(cert.cert, cert.chain)
if revoked:
expiration_text = "INVALID: " + revoked
elif cert.is_test_cert:
expiration_text = "INVALID: TEST CERT"
elif cert.target_expiry <= now:
expiration_text = "INVALID: EXPIRED"

View file

@ -943,6 +943,10 @@ def prepare_and_parse_args(plugins, args, detect_defaults=False): # pylint: dis
"testing", "--no-verify-ssl", action="store_true",
help=config_help("no_verify_ssl"),
default=flag_default("no_verify_ssl"))
helpful.add(
["testing", "certificates"], "--check-ocsp", default="if otherwise valid",
help='Whether to check OCSP for listed certs. Can be set to "never", "always",'
'or "if otherwise valid".')
helpful.add(
["testing", "standalone", "apache", "nginx"], "--tls-sni-01-port", type=int,
default=flag_default("tls_sni_01_port"),

View file

@ -13,50 +13,43 @@ REV_LABEL = "REVOKED"
INSTALL_LABEL = "(Installed)"
class RevocationChecker(object):
"This class figures out OCSP checking on this system, and performs it."
def revoked_status(cert_path, chain_path):
"""Get revoked status for a particular cert version.
def __init__(self):
self.broken = False
.. todo:: Make this a non-blocking call
if not util.exe_exists("openssl"):
logging.info("openssl not installed, can't check revocation")
self.broken = True
:param str cert_path: Path to certificate
:param str chain_path: Path to chain certificate
# New versions of openssl want -header var=val, old ones want -header var val
test_host_format = Popen(["openssl", "ocsp", "-header", "var", "val"],
stdout=PIPE, stderr=PIPE)
_out, err = test_host_format.communicate()
if "Missing =" in err:
self.host_args = lambda host: ["Host=" + host]
else:
self.host_args = lambda host: ["Host", host]
"""
if revoked_status.broken:
return False
def check_ocsp(self, cert_path, chain_path):
"""Get revoked status for a particular cert version.
if not util.exe_exists("openssl"):
logging.info("openssl not installed, can't check revocation")
revoked_status.broken = True
return False
.. todo:: Make this a non-blocking call
try:
url, err = util.run_script(
["openssl", "x509", "-in", cert_path, "-noout", "-ocsp_uri"],
log=logging.debug)
except errors.SubprocessError:
logger.info("Cannot extract OCSP URI from %s", cert_path)
return False
:param str cert_path: Path to certificate
:param str chain_path: Path to intermediate cert
url = url.rstrip()
host = url.partition("://")[2].rstrip("/")
if not host:
logger.info("Cannot process OCSP host from URL (%s) in cert at %s", url, cert_path)
return False
"""
if self.broken:
return False
# New versions of openssl want -header var=val, old ones want -header var val
test_host_format = Popen(["openssl", "ocsp", "-header", "var", "val"],
stdout=PIPE, stderr=PIPE)
_out, err = test_host_format.communicate()
if "Missing =" in err:
host_args = ["Host=" + host]
else:
host_args = ["Host", host]
# jdkasten thanks "Bulletproof SSL and TLS - Ivan Ristic" for documenting this!
try:
logger.debug("Querying OCSP for %s", cert_path)
url, host = self.determine_ocsp_server(cert_path)
if not host:
return False
# jdkasten thanks "Bulletproof SSL and TLS - Ivan Ristic" for documenting this!
cmd = ["openssl", "ocsp",
"-no_nonce",
"-issuer", chain_path,
@ -64,16 +57,43 @@ def revoked_status(cert_path, chain_path):
"-url", url,
"-CAfile", chain_path,
"-verify_other", chain_path,
"-header"] + host_args
output, err = util.run_script(cmd, log=logging.debug)
except errors.SubprocessError:
logger.info("OCSP querying seems to be broken, assuming nothing is revoked...")
logger.debug("Command was:\n%s\nError was:\n%s", " ".join(cmd), err)
revoked_status.broken = True
return False
"-header"] + self.host_args(host)
try:
output, err = util.run_script(cmd, log=logging.debug)
except errors.SubprocessError, e:
logger.info("We're offline, or OCSP querying is broken. "
"Assuming nothing is revoked...")
logger.debug("Command was:\n%s\nError was:\n%s", " ".join(cmd), e)
self.broken = True
return False
return _translate_ocsp_query(cert_path, output, err)
revoked_status.broken = False
return _translate_ocsp_query(cert_path, output, err)
def determine_ocsp_server(self, cert_path):
"""Extract the OCSP server host from a certificate.
:param str cert_path: Path to the cert we're checking OCSP for
:rtype tuple:
:returns: (OCSP server URL or None, OCSP server host or None)
"""
try:
url, err = util.run_script(
["openssl", "x509", "-in", cert_path, "-noout", "-ocsp_uri"],
log=logging.debug)
except errors.SubprocessError:
logger.info("Cannot extract OCSP URI from %s", cert_path)
logger.debug("Error was:\n%s", err)
return None, None
url = url.rstrip()
host = url.partition("://")[2].rstrip("/")
if host:
return url, host
else:
logger.info("Cannot process OCSP host from URL (%s) in cert at %s", url, cert_path)
return None, None
def _translate_ocsp_query(cert_path, ocsp_output, ocsp_errors):