2019-08-02 14:46:12 -04:00
|
|
|
#!/usr/bin/env python
|
|
|
|
|
"""
|
|
|
|
|
This runnable module interfaces itself with the Pebble management interface in order
|
|
|
|
|
to serve a mock OCSP responder during integration tests against Pebble.
|
|
|
|
|
"""
|
|
|
|
|
import datetime
|
2021-02-09 14:43:15 -05:00
|
|
|
import http.server as BaseHTTPServer
|
2023-07-18 18:44:25 -04:00
|
|
|
import pytz
|
2019-08-02 14:46:12 -04:00
|
|
|
import re
|
2023-02-10 13:51:20 -05:00
|
|
|
from typing import cast
|
|
|
|
|
from typing import Union
|
2019-08-02 14:46:12 -04:00
|
|
|
|
|
|
|
|
from cryptography import x509
|
2019-12-09 15:50:20 -05:00
|
|
|
from cryptography.hazmat.backends import default_backend
|
|
|
|
|
from cryptography.hazmat.primitives import hashes
|
|
|
|
|
from cryptography.hazmat.primitives import serialization
|
2023-02-08 14:17:44 -05:00
|
|
|
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
|
|
|
|
|
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
|
2019-08-02 14:46:12 -04:00
|
|
|
from cryptography.x509 import ocsp
|
2019-12-09 15:50:20 -05:00
|
|
|
from dateutil import parser
|
|
|
|
|
import requests
|
2019-08-02 14:46:12 -04:00
|
|
|
|
2019-12-09 15:50:20 -05:00
|
|
|
from certbot_integration_tests.utils.constants import MOCK_OCSP_SERVER_PORT
|
|
|
|
|
from certbot_integration_tests.utils.constants import PEBBLE_MANAGEMENT_URL
|
2019-08-02 14:46:12 -04:00
|
|
|
from certbot_integration_tests.utils.misc import GracefulTCPServer
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class _ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
|
2020-12-16 14:34:12 -05:00
|
|
|
# pylint: disable=missing-function-docstring
|
2021-11-29 16:24:39 -05:00
|
|
|
def do_POST(self) -> None:
|
2022-11-17 02:21:14 -05:00
|
|
|
request = requests.get(PEBBLE_MANAGEMENT_URL + '/intermediate-keys/0',
|
|
|
|
|
verify=False, timeout=10)
|
2023-02-08 14:17:44 -05:00
|
|
|
issuer_key = cast(
|
|
|
|
|
Union[RSAPrivateKey, EllipticCurvePrivateKey],
|
|
|
|
|
serialization.load_pem_private_key(request.content, None, default_backend()))
|
2019-08-02 14:46:12 -04:00
|
|
|
|
2022-11-17 02:21:14 -05:00
|
|
|
request = requests.get(PEBBLE_MANAGEMENT_URL + '/intermediates/0',
|
|
|
|
|
verify=False, timeout=10)
|
2019-08-02 14:46:12 -04:00
|
|
|
issuer_cert = x509.load_pem_x509_certificate(request.content, default_backend())
|
|
|
|
|
|
2023-04-08 21:49:08 -04:00
|
|
|
raw_content_len = self.headers.get('Content-Length')
|
|
|
|
|
assert isinstance(raw_content_len, str)
|
|
|
|
|
content_len = int(raw_content_len)
|
2019-08-02 14:46:12 -04:00
|
|
|
|
|
|
|
|
ocsp_request = ocsp.load_der_ocsp_request(self.rfile.read(content_len))
|
|
|
|
|
response = requests.get('{0}/cert-status-by-serial/{1}'.format(
|
2020-12-16 14:34:12 -05:00
|
|
|
PEBBLE_MANAGEMENT_URL, str(hex(ocsp_request.serial_number)).replace('0x', '')),
|
2022-11-17 02:21:14 -05:00
|
|
|
verify=False, timeout=10
|
2020-12-16 14:34:12 -05:00
|
|
|
)
|
2019-08-02 14:46:12 -04:00
|
|
|
|
|
|
|
|
if not response.ok:
|
2020-12-16 14:34:12 -05:00
|
|
|
ocsp_response = ocsp.OCSPResponseBuilder.build_unsuccessful(
|
|
|
|
|
ocsp.OCSPResponseStatus.UNAUTHORIZED
|
|
|
|
|
)
|
2019-08-02 14:46:12 -04:00
|
|
|
else:
|
|
|
|
|
data = response.json()
|
|
|
|
|
|
2023-07-18 18:44:25 -04:00
|
|
|
now = datetime.datetime.now(pytz.UTC)
|
2019-08-02 14:46:12 -04:00
|
|
|
cert = x509.load_pem_x509_certificate(data['Certificate'].encode(), default_backend())
|
|
|
|
|
if data['Status'] != 'Revoked':
|
2020-12-16 14:34:12 -05:00
|
|
|
ocsp_status = ocsp.OCSPCertStatus.GOOD
|
|
|
|
|
revocation_time = None
|
|
|
|
|
revocation_reason = None
|
2019-08-02 14:46:12 -04:00
|
|
|
else:
|
2020-12-16 14:34:12 -05:00
|
|
|
ocsp_status = ocsp.OCSPCertStatus.REVOKED
|
|
|
|
|
revocation_reason = x509.ReasonFlags.unspecified
|
|
|
|
|
# "... +0000 UTC" => "+0000"
|
|
|
|
|
revoked_at = re.sub(r'( \+\d{4}).*$', r'\1', data['RevokedAt'])
|
2019-08-02 14:46:12 -04:00
|
|
|
revocation_time = parser.parse(revoked_at)
|
|
|
|
|
|
|
|
|
|
ocsp_response = ocsp.OCSPResponseBuilder().add_response(
|
|
|
|
|
cert=cert, issuer=issuer_cert, algorithm=hashes.SHA1(),
|
|
|
|
|
cert_status=ocsp_status,
|
|
|
|
|
this_update=now, next_update=now + datetime.timedelta(hours=1),
|
|
|
|
|
revocation_time=revocation_time, revocation_reason=revocation_reason
|
|
|
|
|
).responder_id(
|
|
|
|
|
ocsp.OCSPResponderEncoding.NAME, issuer_cert
|
|
|
|
|
).sign(issuer_key, hashes.SHA256())
|
|
|
|
|
|
|
|
|
|
self.send_response(200)
|
|
|
|
|
self.end_headers()
|
|
|
|
|
self.wfile.write(ocsp_response.public_bytes(serialization.Encoding.DER))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
try:
|
|
|
|
|
GracefulTCPServer(('', MOCK_OCSP_SERVER_PORT), _ProxyHandler).serve_forever()
|
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
|
pass
|