mirror of
https://github.com/certbot/certbot.git
synced 2026-06-06 15:22:38 -04:00
Use new OCSP interface.
This commit is contained in:
parent
e7462fa2c5
commit
b8211f81ba
2 changed files with 39 additions and 63 deletions
|
|
@ -37,6 +37,7 @@ import time
|
|||
from acme.magic_typing import Dict, Union # pylint: disable=unused-import, no-name-in-module
|
||||
|
||||
from certbot import errors
|
||||
from certbot import interfaces
|
||||
from certbot import ocsp
|
||||
from certbot.plugins.enhancements import OCSPPrefetchEnhancement
|
||||
|
||||
|
|
@ -153,25 +154,19 @@ class OCSPPrefetchMixin(object):
|
|||
# Ensure cert_path exists before trying to use it.
|
||||
if not os.path.isfile(cert_path):
|
||||
raise OCSPCertificateError("Certificate has been removed from the system.")
|
||||
ocsp_workfile = os.path.join(
|
||||
os.path.dirname(self._ocsp_work),
|
||||
apache_util.certid_sha1_hex(cert_path))
|
||||
handler = ocsp.RevocationChecker()
|
||||
try:
|
||||
if handler.ocsp_revoked_by_paths(cert_path, chain_path, response_file=ocsp_workfile):
|
||||
raise OCSPCertificateError("Certificate has been revoked.")
|
||||
else:
|
||||
# Guaranteed good response
|
||||
cert_sha = apache_util.certid_sha1(cert_path)
|
||||
# dbm.open automatically adds the file extension
|
||||
self._write_to_dbm(self._ocsp_store, cert_sha,
|
||||
self._ocsp_response_dbm(ocsp_workfile))
|
||||
finally:
|
||||
try:
|
||||
os.remove(ocsp_workfile)
|
||||
except OSError:
|
||||
# The OCSP workfile did not exist because of an OCSP response fetching error
|
||||
pass
|
||||
ocsp_response = handler.ocsp_response_by_paths(cert_path, chain_path)
|
||||
if ocsp_response is None:
|
||||
raise errors.PluginError("Unable to obtain an OCSP response.")
|
||||
elif ocsp_response.certificate_status == interfaces.OCSPCertStatus.REVOKED:
|
||||
raise OCSPCertificateError("Certificate has been revoked.")
|
||||
elif ocsp_response.certificate_status == interfaces.OCSPCertStatus.UNKNOWN:
|
||||
raise OCSPCertificateError("Certificate is unknown to the OCSP responder.")
|
||||
else:
|
||||
# Guaranteed good response
|
||||
cert_sha = apache_util.certid_sha1(cert_path)
|
||||
# dbm.open automatically adds the file extension
|
||||
self._write_to_dbm(self._ocsp_store, cert_sha, self._ocsp_response_dbm(ocsp_response))
|
||||
|
||||
def _write_to_dbm(self, filename, key, value):
|
||||
"""Helper method to write an OCSP response cache value to DBM.
|
||||
|
|
@ -237,10 +232,10 @@ class OCSPPrefetchMixin(object):
|
|||
"Staple should not be prefetched.")
|
||||
raise errors.PluginError(suberror)
|
||||
|
||||
def _ocsp_response_dbm(self, workfile):
|
||||
def _ocsp_response_dbm(self, ocsp_response):
|
||||
"""Creates a dbm entry for OCSP response data
|
||||
|
||||
:param str workfile: File path for raw OCSP response
|
||||
:param interfaces.OCSPResponse ocsp_response: Good OCSP response
|
||||
|
||||
:raises .errors.PluginError: If the OCSP response should not be
|
||||
configured for use with Apache
|
||||
|
|
@ -249,13 +244,8 @@ class OCSPPrefetchMixin(object):
|
|||
:rtype: string
|
||||
|
||||
"""
|
||||
|
||||
handler = ocsp.RevocationChecker()
|
||||
_, _, next_update = handler.ocsp_times(workfile)
|
||||
ttl = self._ocsp_ttl(next_update)
|
||||
|
||||
with open(workfile, 'rb') as fh:
|
||||
response = fh.read()
|
||||
ttl = self._ocsp_ttl(ocsp_response.next_update)
|
||||
response = ocsp_response.bytes
|
||||
return apache_util.get_apache_ocsp_struct(ttl, response)
|
||||
|
||||
def _ocsp_prefetch_save(self, cert_path, chain_path):
|
||||
|
|
|
|||
|
|
@ -220,34 +220,23 @@ class OCSPPrefetchTest(util.ApacheTest):
|
|||
|
||||
@mock.patch("certbot_apache._internal.constants.OCSP_INTERNAL_TTL", 0)
|
||||
def test_ocsp_prefetch_refresh(self):
|
||||
def ocsp_req_mock(_cert, _chain, response_file):
|
||||
"""Method to mock the OCSP request and write response to file"""
|
||||
with open(response_file, 'w') as fh:
|
||||
fh.write("MOCKRESPONSE")
|
||||
return False
|
||||
|
||||
ocsp_path = "certbot.ocsp.RevocationChecker.ocsp_revoked_by_paths"
|
||||
with mock.patch(ocsp_path, side_effect=ocsp_req_mock):
|
||||
with mock.patch('certbot.ocsp.RevocationChecker.ocsp_times') as mock_times:
|
||||
produced_at = datetime.today() - timedelta(days=1)
|
||||
this_update = datetime.today() - timedelta(days=2)
|
||||
next_update = datetime.today() + timedelta(days=2)
|
||||
mock_times.return_value = produced_at, this_update, next_update
|
||||
self.call_mocked_py2(self.config.enable_ocsp_prefetch,
|
||||
self.lineage, ["ocspvhost.com"])
|
||||
ocsp_bytes = b'MOCKRESPONSE'
|
||||
ocsp_path = "certbot.ocsp.RevocationChecker.ocsp_response_by_paths"
|
||||
with mock.patch(ocsp_path) as mock_ocsp:
|
||||
mock_ocsp.return_value.next_update = datetime.today() + timedelta(days=2)
|
||||
mock_ocsp.return_value.bytes = b'MOCKRESPONSE'
|
||||
self.call_mocked_py2(self.config.enable_ocsp_prefetch,
|
||||
self.lineage, ["ocspvhost.com"])
|
||||
odbm = _read_dbm(self.db_path)
|
||||
self.assertEqual(len(odbm.keys()), 1)
|
||||
# The actual response data is prepended by Apache timestamp
|
||||
self.assertTrue(odbm[list(odbm.keys())[0]].endswith(b'MOCKRESPONSE'))
|
||||
self.assertTrue(odbm[list(odbm.keys())[0]].endswith(ocsp_bytes))
|
||||
|
||||
with mock.patch(ocsp_path, side_effect=ocsp_req_mock) as mock_ocsp:
|
||||
with mock.patch('certbot.ocsp.RevocationChecker.ocsp_times') as mock_times:
|
||||
produced_at = datetime.today() - timedelta(days=1)
|
||||
this_update = datetime.today() - timedelta(days=2)
|
||||
next_update = datetime.today() + timedelta(days=2)
|
||||
mock_times.return_value = produced_at, this_update, next_update
|
||||
self.call_mocked_py2(self.config.update_ocsp_prefetch, None)
|
||||
self.assertTrue(mock_ocsp.called)
|
||||
with mock.patch(ocsp_path) as mock_ocsp:
|
||||
mock_ocsp.return_value.next_update = datetime.today() + timedelta(days=2)
|
||||
mock_ocsp.return_value.bytes = b'MOCKRESPONSE'
|
||||
self.call_mocked_py2(self.config.update_ocsp_prefetch, None)
|
||||
self.assertTrue(mock_ocsp.called)
|
||||
|
||||
def test_ocsp_prefetch_refresh_cert_deleted(self):
|
||||
self.config._ocsp_prefetch_save("nonexistent_cert_path", "irrelevant")
|
||||
|
|
@ -318,19 +307,16 @@ class OCSPPrefetchTest(util.ApacheTest):
|
|||
|
||||
@mock.patch("certbot_apache._internal.prefetch_ocsp.OCSPPrefetchMixin.restart")
|
||||
def test_ocsp_prefetch_refresh_fail(self, _mock_restart):
|
||||
ocsp_path = "certbot.ocsp.RevocationChecker.ocsp_revoked_by_paths"
|
||||
log_path = "certbot_apache._internal.prefetch_ocsp.logger.warning"
|
||||
ocsp_path = "certbot.ocsp.RevocationChecker.ocsp_response_by_paths"
|
||||
with mock.patch(ocsp_path) as mock_ocsp:
|
||||
mock_ocsp.return_value = True
|
||||
with mock.patch(log_path) as mock_log:
|
||||
self.assertRaises(errors.PluginError,
|
||||
self.call_mocked_py2,
|
||||
self.config.enable_ocsp_prefetch,
|
||||
self.lineage, ["ocspvhost.com"]
|
||||
)
|
||||
self.assertTrue(mock_log.called)
|
||||
self.assertTrue(
|
||||
"trying to prefetch OCSP" in mock_log.call_args[0][0])
|
||||
mock_ocsp.return_value = None
|
||||
try:
|
||||
self.call_mocked_py2(self.config.enable_ocsp_prefetch,
|
||||
self.lineage, ["ocspvhost.com"])
|
||||
except errors.PluginError as e:
|
||||
self.assertIn("Unable to obtain an OCSP response.", str(e))
|
||||
else: # pragma: no cover
|
||||
self.fail("errors.PluginError should have been raised")
|
||||
|
||||
@mock.patch("certbot_apache._internal.override_debian.DebianConfigurator._ocsp_refresh_needed")
|
||||
def test_ocsp_prefetch_update_noop(self, mock_refresh):
|
||||
|
|
|
|||
Loading…
Reference in a new issue