Reorder new ocsp functions and add tests

This commit is contained in:
Joona Hoikkala 2020-01-27 14:03:24 +02:00
parent 549061249f
commit dd9f76c60c
No known key found for this signature in database
GPG key ID: D5AA86BBF9B29A5C
2 changed files with 87 additions and 32 deletions

View file

@ -21,7 +21,6 @@ from acme.magic_typing import Tuple # pylint: disable=unused-import, no-name-in
from certbot import crypto_util
from certbot import errors
from certbot import util
from certbot._internal.storage import RenewableCert # pylint: disable=unused-import
try:
# Only cryptography>=2.5 has ocsp module
@ -32,7 +31,6 @@ except (ImportError, AttributeError): # pragma: no cover
ocsp = None # type: ignore
logger = logging.getLogger(__name__)
@ -112,34 +110,9 @@ class RevocationChecker(object):
"""
if self.use_openssl_binary:
return self._ocsp_times_openssl_bin(response_file)
return _ocsp_times_openssl_bin(response_file)
return _ocsp_times_cryptography(response_file)
def _ocsp_times_openssl_bin(self, response_file):
"""
Reads OCSP response file using OpenSSL binary and returns
producedAt, thisUpdate and nextUpdate values in datetime format.
:param str response_file: File path to OCSP response
:returns: tuple of producedAt, thisUpdate and nextUpdate values
:rtype: tuple of datetime
"""
cmd = ["openssl", "ocsp", "-resp_text", "-noverify", "-respin", response_file]
logger.debug("Reading OCSP response from temp file: %s", response_file)
logger.debug(" ".join(cmd))
try:
output, err = util.run_script(cmd, log=logger.debug)
except errors.SubprocessError:
logger.info("Reading OCSP response from file failed.")
return None, None, None
prod_str, this_str, next_str = _translate_ocsp_response_times(output)
prod_dt = util.parse_datetime(prod_str)
this_dt = util.parse_datetime(this_str)
next_dt = util.parse_datetime(next_str)
return prod_dt, this_dt, next_dt
def _check_ocsp_openssl_bin(self, cert_path, chain_path, host, url, response_file=None):
# type: (str, str, str, str, Optional[str]) -> bool
# jdkasten thanks "Bulletproof SSL and TLS - Ivan Ristic" for documenting this!
@ -195,10 +168,37 @@ def _determine_ocsp_server(cert_path):
return None, None
def _ocsp_times_openssl_bin(response_file):
"""
Reads OCSP response file using OpenSSL binary and returns
producedAt, thisUpdate and nextUpdate values in datetime format.
:param str response_file: File path to OCSP response
:returns: tuple of producedAt, thisUpdate and nextUpdate values
:rtype: tuple of datetime
"""
cmd = ["openssl", "ocsp", "-resp_text", "-noverify", "-respin", response_file]
logger.debug("Reading OCSP response from temp file: %s", response_file)
logger.debug(" ".join(cmd))
try:
output, err = util.run_script(cmd, log=logger.debug)
except errors.SubprocessError:
logger.info("Reading OCSP response from file failed.")
return None, None, None
prod_str, this_str, next_str = _translate_ocsp_response_times(output)
prod_dt = util.parse_datetime(prod_str)
this_dt = util.parse_datetime(this_str)
next_dt = util.parse_datetime(next_str)
return prod_dt, this_dt, next_dt
def _ocsp_times_cryptography(response_file):
"""
Reads OCSP response using cryptography and returns producedAt,
thisUpdate and nextUpdate values in datetime format.
thisUpdate and nextUpdate values in datetime format, or None
if the file cannot be opened.
:param str response_file: File path to OCSP response
@ -206,8 +206,11 @@ def _ocsp_times_cryptography(response_file):
:rtype: tuple of datetime
"""
with open(response_file, 'rb') as fh:
raw_response = fh.read()
try:
with open(response_file, 'rb') as fh:
raw_response = fh.read()
except OSError:
return None, None, None
response = ocsp.load_der_ocsp_response(raw_response)
return response.produced_at, response.this_update, response.next_update
@ -371,7 +374,7 @@ def _translate_ocsp_query(cert_path, ocsp_output, ocsp_errors):
return True
else:
logger.warning("Unable to properly parse OCSP output: %s\nstderr:%s",
ocsp_output, ocsp_errors)
ocsp_output, ocsp_errors)
return False

View file

@ -137,6 +137,32 @@ class OCSPTestOpenSSL(unittest.TestCase):
self.assertEqual(ocsp._translate_ocsp_query(*openssl_expired_ocsp_revoked), True)
self.assertEqual(mock_log.info.call_count, 1)
@mock.patch('certbot.util.run_script')
def test_ocsp_response_get_times(self, mock_run):
mock_run.return_value = ocsp_times_example
producedAt, thisUpdate, nextUpdate = self.checker.ocsp_times("mocked")
self.assertEqual(producedAt, datetime(2020, 1, 24, 11, 10))
self.assertEqual(thisUpdate, datetime(2020, 1, 24, 11, 0))
self.assertEqual(nextUpdate, datetime(2020, 1, 31, 11, 0))
@mock.patch('certbot.util.run_script')
def test_ocsp_response_get_times_badoutput(self, mock_run):
mock_run.return_value = ("Something unparsable", "")
producedAt, thisUpdate, nextUpdate = self.checker.ocsp_times("mocked")
self.assertEqual(producedAt, None)
self.assertEqual(thisUpdate, None)
self.assertEqual(nextUpdate, None)
@mock.patch('certbot._internal.ocsp.logger')
@mock.patch('certbot.util.run_script')
def test_ocsp_response_get_times_error(self, mock_run, mock_log):
mock_run.side_effect = errors.SubprocessError
producedAt, thisUpdate, nextUpdate = self.checker.ocsp_times("mocked")
self.assertEqual(producedAt, None)
self.assertEqual(thisUpdate, None)
self.assertEqual(nextUpdate, None)
self.assertEqual(mock_log.info.call_count, 1)
@unittest.skipIf(not ocsp_lib,
reason='This class tests functionalities available only on cryptography>=2.5.0')
@ -278,6 +304,26 @@ class OSCPTestCryptography(unittest.TestCase):
revoked = self.checker.ocsp_revoked(self.cert_obj)
self.assertFalse(revoked)
def test_ocsp_times_cryptography(self):
with mock.patch('certbot._internal.ocsp.open', mock.mock_open(read_data="")):
with mock.patch('cryptography.x509.ocsp.load_der_ocsp_response') as mock_load:
resp = mock.MagicMock()
resp.produced_at = datetime(2020, 1, 2, 9, 9)
resp.this_update = datetime(2020, 1, 2, 8, 8)
resp.next_update = datetime(2020, 1, 3, 4, 4)
mock_load.return_value = resp
produced_at, this_update, next_update = self.checker.ocsp_times("mocked")
self.assertEqual(produced_at, datetime(2020, 1, 2, 9, 9))
self.assertEqual(this_update, datetime(2020, 1, 2, 8, 8))
self.assertEqual(next_update, datetime(2020, 1, 3, 4, 4))
def test_ocsp_times_cryptography_error(self):
with mock.patch('certbot._internal.ocsp.open', mock.mock_open(read_data="")) as mock_open:
mock_open.side_effect = OSError
produced_at, this_update, next_update = self.checker.ocsp_times("mocked")
self.assertEqual([produced_at, this_update, next_update], [None, None, None])
@contextlib.contextmanager
def _ocsp_mock(certificate_status, response_status,
@ -377,6 +423,12 @@ revoked
""",
"""Response verify OK""")
ocsp_times_example = ("""
Produced At: Jan 24 11:10:00 2020 GMT
This Update: Jan 24 11:00:00 2020 GMT
Next Update: Jan 31 11:00:00 2020 GMT
""", "")
if __name__ == '__main__':
unittest.main() # pragma: no cover