certbot/certbot-ci/certbot_integration_tests/utils/pebble_ocsp_server.py
Adrien Ferrand 1c7105a940 Create a mock OCSP server for Pebble integration tests (#7281)
* Implement a logic, miss the private key of pebble

* Complete process

* Fix nginx cert path

* Check conditionnally docker

* Update gitignore, fix apacheconftest

* Full object

* Carriage return

* Work in progress

* Move to official v2.1.0 of pebble

* Fix name

* Update acme_server.py

* Link things together with new version of pebble

* Plug the logic to tests

* Update config

* Reinitiate config

* Add OCSP config to pebble

* Working.

* Simplify logic

* Clean code

* Use forked pebble for now

# Conflicts:
#	certbot-ci/certbot_integration_tests/utils/pebble_artifacts.py

* Move full logic of mock at the acme server config

* Continue work

* Finish fixing the date parsing

* Update module name

* Use again official pebble

* Activate mock OCSP server

* Clean code

* Update pebble_artifacts.py

* Remove OCSP stale test

* Add executable permissions

* Clean code

* Update setup.py

* Simplify code

* On-demand import of pebble_ocsp_server

* Revert "Remove OCSP stale test"

This reverts commit 2e4c985b42.

# Conflicts:
#	certbot-ci/certbot_integration_tests/utils/misc.py

* Fix for virtualenv on Python 3.7.4 for Windows

* Update acme_server.py
2019-08-02 11:46:12 -07:00

71 lines
3 KiB
Python
Executable file

#!/usr/bin/env python
"""
This runnable module interfaces itself with the Pebble management interface in order
to serve a mock OCSP responder during integration tests against Pebble.
"""
import datetime
import re
import requests
from dateutil import parser
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization, hashes
from cryptography import x509
from cryptography.x509 import ocsp
from six.moves import BaseHTTPServer
from certbot_integration_tests.utils.misc import GracefulTCPServer
from certbot_integration_tests.utils.constants import MOCK_OCSP_SERVER_PORT, PEBBLE_MANAGEMENT_URL
class _ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def do_POST(self):
request = requests.get(PEBBLE_MANAGEMENT_URL + '/intermediate-keys/0', verify=False)
issuer_key = serialization.load_pem_private_key(request.content, None, default_backend())
request = requests.get(PEBBLE_MANAGEMENT_URL + '/intermediates/0', verify=False)
issuer_cert = x509.load_pem_x509_certificate(request.content, default_backend())
try:
content_len = int(self.headers.getheader('content-length', 0))
except AttributeError:
content_len = int(self.headers.get('Content-Length'))
ocsp_request = ocsp.load_der_ocsp_request(self.rfile.read(content_len))
response = requests.get('{0}/cert-status-by-serial/{1}'.format(
PEBBLE_MANAGEMENT_URL, str(hex(ocsp_request.serial_number)).replace('0x', '')), verify=False)
if not response.ok:
ocsp_response = ocsp.OCSPResponseBuilder.build_unsuccessful(ocsp.OCSPResponseStatus.UNAUTHORIZED)
else:
data = response.json()
now = datetime.datetime.utcnow()
cert = x509.load_pem_x509_certificate(data['Certificate'].encode(), default_backend())
if data['Status'] != 'Revoked':
ocsp_status, revocation_time, revocation_reason = ocsp.OCSPCertStatus.GOOD, None, None
else:
ocsp_status, revocation_reason = ocsp.OCSPCertStatus.REVOKED, x509.ReasonFlags.unspecified
revoked_at = re.sub(r'( \+\d{4}).*$', r'\1', data['RevokedAt']) # "... +0000 UTC" => "+0000"
revocation_time = parser.parse(revoked_at)
ocsp_response = ocsp.OCSPResponseBuilder().add_response(
cert=cert, issuer=issuer_cert, algorithm=hashes.SHA1(),
cert_status=ocsp_status,
this_update=now, next_update=now + datetime.timedelta(hours=1),
revocation_time=revocation_time, revocation_reason=revocation_reason
).responder_id(
ocsp.OCSPResponderEncoding.NAME, issuer_cert
).sign(issuer_key, hashes.SHA256())
self.send_response(200)
self.end_headers()
self.wfile.write(ocsp_response.public_bytes(serialization.Encoding.DER))
if __name__ == '__main__':
try:
GracefulTCPServer(('', MOCK_OCSP_SERVER_PORT), _ProxyHandler).serve_forever()
except KeyboardInterrupt:
pass