Define new interfaces

This commit is contained in:
Brad Warren 2020-05-21 16:33:26 -07:00
parent d135e6140b
commit 1a8b0df2f5
2 changed files with 77 additions and 3 deletions

View file

@ -1,5 +1,6 @@
"""Certbot client interfaces."""
import abc
import enum
import six
import zope.interface
@ -589,6 +590,43 @@ class RenewableCert(object):
"""
class OCSPCertStatus(enum.Enum):
"""Values for the certificate status field in an OCSP response."""
GOOD = 1
REVOKED = 2
UNKNOWN = 3
@six.add_metaclass(abc.ABCMeta)
class OCSPResponse(object):
"""Interface for an OCSP response."""
@abc.abstractproperty
def certificate_status(self):
"""Certificate status
:rtype: OCSPCertStatus
"""
@abc.abstractproperty
def next_update(self):
"""Next update
:rtype: datetime.datetime
"""
@abc.abstractproperty
def bytes(self):
"""Raw bytes of the OCSP response
:rtype: bytes
"""
# Updater interfaces
#
# When "certbot renew" is run, Certbot will iterate over each lineage and check

View file

@ -1,4 +1,11 @@
"""Tools for checking certificate revocation."""
"""Tools for checking certificate revocation.
.. data:: CRYPTOGRAPHY_OCSP_AVAILABLE
Boolean that is true if cryptography can be used for making OCSP
queries instead of shelling out to openssl.
"""
from datetime import datetime
from datetime import timedelta
import logging
@ -20,6 +27,7 @@ from acme.magic_typing import Optional
from acme.magic_typing import Tuple
from certbot import crypto_util
from certbot import errors
from certbot import interfaces
from certbot import util
from certbot.compat.os import getenv
from certbot.interfaces import RenewableCert # pylint: disable=unused-import
@ -29,8 +37,9 @@ try:
# and signature_hash_algorithm attribute in OCSPResponse class
from cryptography.x509 import ocsp # pylint: disable=ungrouped-imports
getattr(ocsp.OCSPResponse, 'signature_hash_algorithm')
CRYPTOGRAPHY_OCSP_AVAILABLE = True
except (ImportError, AttributeError): # pragma: no cover
ocsp = None # type: ignore
CRYPTOGRAPHY_OCSP_AVAILABLE = False
logger = logging.getLogger(__name__)
@ -41,7 +50,7 @@ class RevocationChecker(object):
def __init__(self, enforce_openssl_binary_usage=False):
self.broken = False
self.use_openssl_binary = enforce_openssl_binary_usage or not ocsp
self.use_openssl_binary = enforce_openssl_binary_usage or not CRYPTOGRAPHY_OCSP_AVAILABLE
if self.use_openssl_binary:
if not util.exe_exists("openssl"):
@ -58,6 +67,29 @@ class RevocationChecker(object):
else:
self.host_args = lambda host: ["Host", host]
def ocsp_response_by_paths(self, cert_path, chain_path, timeout=10):
"""Obtains a validated OCSP response.
The OCSP response could have any certificate status, however, if
an OCSP response is returned from this function, the caller
knows it is properly timestamped, signed, etc.
.. note:: This function currently only works when cryptography
is used for OCSP. Whether a new enough version of crypography
with OCSP support is available can be checked through
CRYPTOGRAPHY_OCSP_AVAILABLE. If it is not available, None is
always returned by this function for now.
:param str cert_path: Certificate filepath
:param str chain_path: Certificate chain
:param int timeout: Timeout (in seconds) for the OCSP query
:returns: The OCSP response if it could be obtained and
validated, otherwise, None
:rtype: interfaces.OCSPResponse or None
"""
def ocsp_revoked(self, cert):
# type: (RenewableCert) -> bool
"""Get revoked status for a particular cert version.
@ -139,6 +171,10 @@ class RevocationChecker(object):
return _translate_ocsp_query(cert_path, output, err)
class _CryptographyOCSPResponse(interfaces.OCSPResponse):
"""Cryptography implementation of OCSPResponse interface."""
def _determine_ocsp_server(cert_path):
# type: (str) -> Tuple[Optional[str], Optional[str]]
"""Extract the OCSP server host from a certificate.