mirror of
https://github.com/certbot/certbot.git
synced 2026-05-28 04:34:11 -04:00
* Do not call deprecated datetime.utcnow() and datetime.utcfromtimestamp()
* Ignore DeprecationWarnings from importing dependencies
$ python3 -Wdefault
Python 3.12.0b4 (main, Jul 12 2023, 00:00:00) [GCC 13.1.1 20230614 (Red Hat 13.1.1-4)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pkg_resources
/usr/lib/python3.12/site-packages/pkg_resources/__init__.py:121: DeprecationWarning: pkg_resources is deprecated as an API
warnings.warn("pkg_resources is deprecated as an API", DeprecationWarning)
>>> import pytz
/usr/lib/python3.12/site-packages/pytz/tzinfo.py:27: DeprecationWarning: datetime.utcfromtimestamp() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.fromtimestamp(timestamp, datetime.UTC).
_epoch = datetime.utcfromtimestamp(0)
* Used pytz.UTC consistently for clarity
89 lines
3.7 KiB
Python
Executable file
89 lines
3.7 KiB
Python
Executable file
#!/usr/bin/env python
|
|
"""
|
|
This runnable module interfaces itself with the Pebble management interface in order
|
|
to serve a mock OCSP responder during integration tests against Pebble.
|
|
"""
|
|
import datetime
|
|
import http.server as BaseHTTPServer
|
|
import pytz
|
|
import re
|
|
from typing import cast
|
|
from typing import Union
|
|
|
|
from cryptography import x509
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
|
|
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
|
|
from cryptography.x509 import ocsp
|
|
from dateutil import parser
|
|
import requests
|
|
|
|
from certbot_integration_tests.utils.constants import MOCK_OCSP_SERVER_PORT
|
|
from certbot_integration_tests.utils.constants import PEBBLE_MANAGEMENT_URL
|
|
from certbot_integration_tests.utils.misc import GracefulTCPServer
|
|
|
|
|
|
class _ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
|
|
# pylint: disable=missing-function-docstring
|
|
def do_POST(self) -> None:
|
|
request = requests.get(PEBBLE_MANAGEMENT_URL + '/intermediate-keys/0',
|
|
verify=False, timeout=10)
|
|
issuer_key = cast(
|
|
Union[RSAPrivateKey, EllipticCurvePrivateKey],
|
|
serialization.load_pem_private_key(request.content, None, default_backend()))
|
|
|
|
request = requests.get(PEBBLE_MANAGEMENT_URL + '/intermediates/0',
|
|
verify=False, timeout=10)
|
|
issuer_cert = x509.load_pem_x509_certificate(request.content, default_backend())
|
|
|
|
raw_content_len = self.headers.get('Content-Length')
|
|
assert isinstance(raw_content_len, str)
|
|
content_len = int(raw_content_len)
|
|
|
|
ocsp_request = ocsp.load_der_ocsp_request(self.rfile.read(content_len))
|
|
response = requests.get('{0}/cert-status-by-serial/{1}'.format(
|
|
PEBBLE_MANAGEMENT_URL, str(hex(ocsp_request.serial_number)).replace('0x', '')),
|
|
verify=False, timeout=10
|
|
)
|
|
|
|
if not response.ok:
|
|
ocsp_response = ocsp.OCSPResponseBuilder.build_unsuccessful(
|
|
ocsp.OCSPResponseStatus.UNAUTHORIZED
|
|
)
|
|
else:
|
|
data = response.json()
|
|
|
|
now = datetime.datetime.now(pytz.UTC)
|
|
cert = x509.load_pem_x509_certificate(data['Certificate'].encode(), default_backend())
|
|
if data['Status'] != 'Revoked':
|
|
ocsp_status = ocsp.OCSPCertStatus.GOOD
|
|
revocation_time = None
|
|
revocation_reason = None
|
|
else:
|
|
ocsp_status = ocsp.OCSPCertStatus.REVOKED
|
|
revocation_reason = x509.ReasonFlags.unspecified
|
|
# "... +0000 UTC" => "+0000"
|
|
revoked_at = re.sub(r'( \+\d{4}).*$', r'\1', data['RevokedAt'])
|
|
revocation_time = parser.parse(revoked_at)
|
|
|
|
ocsp_response = ocsp.OCSPResponseBuilder().add_response(
|
|
cert=cert, issuer=issuer_cert, algorithm=hashes.SHA1(),
|
|
cert_status=ocsp_status,
|
|
this_update=now, next_update=now + datetime.timedelta(hours=1),
|
|
revocation_time=revocation_time, revocation_reason=revocation_reason
|
|
).responder_id(
|
|
ocsp.OCSPResponderEncoding.NAME, issuer_cert
|
|
).sign(issuer_key, hashes.SHA256())
|
|
|
|
self.send_response(200)
|
|
self.end_headers()
|
|
self.wfile.write(ocsp_response.public_bytes(serialization.Encoding.DER))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
GracefulTCPServer(('', MOCK_OCSP_SERVER_PORT), _ProxyHandler).serve_forever()
|
|
except KeyboardInterrupt:
|
|
pass
|