From c066a4d7966cda28b33229dbff4eb14cc61705bc Mon Sep 17 00:00:00 2001 From: Erica Portnoy Date: Wed, 21 Feb 2018 14:04:44 -0800 Subject: [PATCH] util methods and imports for finalize_order shim refactor --- acme/acme/client.py | 2 +- acme/acme/crypto_util.py | 22 ++++++++++++++++++++++ certbot/client.py | 20 +++++++++----------- certbot/crypto_util.py | 25 ++++++++++++++----------- 4 files changed, 46 insertions(+), 23 deletions(-) diff --git a/acme/acme/client.py b/acme/acme/client.py index d409ad858..03480c5da 100644 --- a/acme/acme/client.py +++ b/acme/acme/client.py @@ -749,7 +749,7 @@ class BackwardsCompatibleClientV2(object): try: chain = self.client.fetch_chain(certr) break - except acme_errors.Error: + except errors.Error: time.sleep(1) if chain is None: diff --git a/acme/acme/crypto_util.py b/acme/acme/crypto_util.py index a986721f0..f13c5109c 100644 --- a/acme/acme/crypto_util.py +++ b/acme/acme/crypto_util.py @@ -8,6 +8,8 @@ import socket import sys import OpenSSL +import josepy as jose + from acme import errors @@ -280,3 +282,23 @@ def gen_ss_cert(key, domains, not_before=None, cert.set_pubkey(key) cert.sign(key, "sha256") return cert + +def dump_pyopenssl_chain(chain, filetype=OpenSSL.crypto.FILETYPE_PEM): + """Dump certificate chain into a bundle. + + :param list chain: List of `OpenSSL.crypto.X509` (or wrapped in + :class:`josepy.util.ComparableX509`). + + """ + # XXX: returns empty string when no chain is available, which + # shuts up RenewableCert, but might not be the best solution... + + def _dump_cert(cert): + if isinstance(cert, jose.ComparableX509): + # pylint: disable=protected-access + cert = cert.wrapped + return OpenSSL.crypto.dump_certificate(filetype, cert) + + # assumes that OpenSSL.crypto.dump_certificate includes ending + # newline character + return b"".join(_dump_cert(cert) for cert in chain) diff --git a/certbot/client.py b/certbot/client.py index b9bcf05df..fc3848a5c 100644 --- a/certbot/client.py +++ b/certbot/client.py @@ -1,4 +1,5 @@ """Certbot client API.""" +import datetime import logging import os import platform @@ -11,7 +12,6 @@ import zope.component from acme import client as acme_client from acme import crypto_util as acme_crypto_util -from acme import errors as acme_errors from acme import messages import certbot @@ -274,10 +274,9 @@ class Client(object): :param list domains: domains to get a certificate - :returns: `.CertificateResource`, certificate chain (as - returned by `.fetch_chain`), and newly generated private key - (`.util.Key`) and DER-encoded Certificate Signing Request - (`.util.CSR`). + :returns: :returns: certificate as PEM string, chain as PEM string, + newly generated private key (`.util.Key`), and DER-encoded + Certificate Signing Request (`.util.CSR`). :rtype: tuple """ @@ -305,9 +304,9 @@ class Client(object): os.remove(csr.file) return self.obtain_certificate(successful_domains) else: - certr, chain = self.obtain_certificate_from_csr(csr, orderr) + cert, chain = self.obtain_certificate_from_csr(csr, orderr) - return certr, chain, key, csr + return cert, chain, key, csr # pylint: disable=no-member def obtain_and_enroll_certificate(self, domains, certname): @@ -326,7 +325,7 @@ class Client(object): be obtained, or None if doing a successful dry run. """ - certr, chain, key, _ = self.obtain_certificate(domains) + cert, chain, key, _ = self.obtain_certificate(domains) if (self.config.config_dir != constants.CLI_DEFAULTS["config_dir"] or self.config.work_dir != constants.CLI_DEFAULTS["work_dir"]): @@ -341,9 +340,8 @@ class Client(object): return None else: return storage.RenewableCert.new_lineage( - new_name, OpenSSL.crypto.dump_certificate( - OpenSSL.crypto.FILETYPE_PEM, certr.body.wrapped), - key.pem, crypto_util.dump_pyopenssl_chain(chain), + new_name, cert, + key.pem, chain, self.config) def save_certificate(self, cert_pem, chain_pem, diff --git a/certbot/crypto_util.py b/certbot/crypto_util.py index 8368855cd..508467409 100644 --- a/certbot/crypto_util.py +++ b/certbot/crypto_util.py @@ -14,7 +14,6 @@ import six import zope.component from cryptography.hazmat.backends import default_backend from cryptography import x509 -import josepy as jose from acme import crypto_util as acme_crypto_util @@ -367,16 +366,7 @@ def dump_pyopenssl_chain(chain, filetype=OpenSSL.crypto.FILETYPE_PEM): """ # XXX: returns empty string when no chain is available, which # shuts up RenewableCert, but might not be the best solution... - - def _dump_cert(cert): - if isinstance(cert, jose.ComparableX509): - # pylint: disable=protected-access - cert = cert.wrapped - return OpenSSL.crypto.dump_certificate(filetype, cert) - - # assumes that OpenSSL.crypto.dump_certificate includes ending - # newline character - return b"".join(_dump_cert(cert) for cert in chain) + return acme_crypto_util.dump_pyopenssl_chain(chain, filetype) def notBefore(cert_path): @@ -443,3 +433,16 @@ def sha256sum(filename): with open(filename, 'rb') as f: sha256.update(f.read()) return sha256.hexdigest() + +def cert_and_chain_from_fullchain(fullchain_pem): + """Split fullchain_pem into cert_pem and chain_pem + + :param str fullchain_pem: concatenated cert + chain + + :returns: tuple of string cert_pem and chain_pem + :rytpe: tuple + """ + cert = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, + OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, fullchain_pem)) + chain = fullchain_pem[len(cert):] + return (cert, chain)