mirror of
https://github.com/certbot/certbot.git
synced 2026-06-05 23:04:39 -04:00
* Implement a logic, miss the private key of pebble
* Complete process
* Fix nginx cert path
* Check conditionnally docker
* Update gitignore, fix apacheconftest
* Full object
* Carriage return
* Work in progress
* Move to official v2.1.0 of pebble
* Fix name
* Update acme_server.py
* Link things together with new version of pebble
* Plug the logic to tests
* Update config
* Reinitiate config
* Add OCSP config to pebble
* Working.
* Simplify logic
* Clean code
* Use forked pebble for now
# Conflicts:
# certbot-ci/certbot_integration_tests/utils/pebble_artifacts.py
* Move full logic of mock at the acme server config
* Continue work
* Finish fixing the date parsing
* Update module name
* Use again official pebble
* Activate mock OCSP server
* Clean code
* Update pebble_artifacts.py
* Remove OCSP stale test
* Add executable permissions
* Clean code
* Update setup.py
* Simplify code
* On-demand import of pebble_ocsp_server
* Revert "Remove OCSP stale test"
This reverts commit 2e4c985b42.
# Conflicts:
# certbot-ci/certbot_integration_tests/utils/misc.py
* Fix for virtualenv on Python 3.7.4 for Windows
* Update acme_server.py
71 lines
3 KiB
Python
Executable file
71 lines
3 KiB
Python
Executable file
#!/usr/bin/env python
|
|
"""
|
|
This runnable module interfaces itself with the Pebble management interface in order
|
|
to serve a mock OCSP responder during integration tests against Pebble.
|
|
"""
|
|
import datetime
|
|
import re
|
|
|
|
import requests
|
|
from dateutil import parser
|
|
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives import serialization, hashes
|
|
from cryptography import x509
|
|
from cryptography.x509 import ocsp
|
|
from six.moves import BaseHTTPServer
|
|
|
|
from certbot_integration_tests.utils.misc import GracefulTCPServer
|
|
from certbot_integration_tests.utils.constants import MOCK_OCSP_SERVER_PORT, PEBBLE_MANAGEMENT_URL
|
|
|
|
|
|
class _ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
|
|
def do_POST(self):
|
|
request = requests.get(PEBBLE_MANAGEMENT_URL + '/intermediate-keys/0', verify=False)
|
|
issuer_key = serialization.load_pem_private_key(request.content, None, default_backend())
|
|
|
|
request = requests.get(PEBBLE_MANAGEMENT_URL + '/intermediates/0', verify=False)
|
|
issuer_cert = x509.load_pem_x509_certificate(request.content, default_backend())
|
|
|
|
try:
|
|
content_len = int(self.headers.getheader('content-length', 0))
|
|
except AttributeError:
|
|
content_len = int(self.headers.get('Content-Length'))
|
|
|
|
ocsp_request = ocsp.load_der_ocsp_request(self.rfile.read(content_len))
|
|
response = requests.get('{0}/cert-status-by-serial/{1}'.format(
|
|
PEBBLE_MANAGEMENT_URL, str(hex(ocsp_request.serial_number)).replace('0x', '')), verify=False)
|
|
|
|
if not response.ok:
|
|
ocsp_response = ocsp.OCSPResponseBuilder.build_unsuccessful(ocsp.OCSPResponseStatus.UNAUTHORIZED)
|
|
else:
|
|
data = response.json()
|
|
|
|
now = datetime.datetime.utcnow()
|
|
cert = x509.load_pem_x509_certificate(data['Certificate'].encode(), default_backend())
|
|
if data['Status'] != 'Revoked':
|
|
ocsp_status, revocation_time, revocation_reason = ocsp.OCSPCertStatus.GOOD, None, None
|
|
else:
|
|
ocsp_status, revocation_reason = ocsp.OCSPCertStatus.REVOKED, x509.ReasonFlags.unspecified
|
|
revoked_at = re.sub(r'( \+\d{4}).*$', r'\1', data['RevokedAt']) # "... +0000 UTC" => "+0000"
|
|
revocation_time = parser.parse(revoked_at)
|
|
|
|
ocsp_response = ocsp.OCSPResponseBuilder().add_response(
|
|
cert=cert, issuer=issuer_cert, algorithm=hashes.SHA1(),
|
|
cert_status=ocsp_status,
|
|
this_update=now, next_update=now + datetime.timedelta(hours=1),
|
|
revocation_time=revocation_time, revocation_reason=revocation_reason
|
|
).responder_id(
|
|
ocsp.OCSPResponderEncoding.NAME, issuer_cert
|
|
).sign(issuer_key, hashes.SHA256())
|
|
|
|
self.send_response(200)
|
|
self.end_headers()
|
|
self.wfile.write(ocsp_response.public_bytes(serialization.Encoding.DER))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
GracefulTCPServer(('', MOCK_OCSP_SERVER_PORT), _ProxyHandler).serve_forever()
|
|
except KeyboardInterrupt:
|
|
pass
|