diff --git a/letsencrypt/configuration.py b/letsencrypt/configuration.py index 6f3ece9fd..ec8ddb14e 100644 --- a/letsencrypt/configuration.py +++ b/letsencrypt/configuration.py @@ -19,7 +19,6 @@ class NamespaceConfig(object): - `accounts_dir` - `cert_dir` - - `cert_key_backup` - `in_progress_dir` - `key_dir` - `renewer_config_file` @@ -57,11 +56,6 @@ class NamespaceConfig(object): def cert_dir(self): # pylint: disable=missing-docstring return os.path.join(self.namespace.config_dir, constants.CERT_DIR) - @property - def cert_key_backup(self): # pylint: disable=missing-docstring - return os.path.join(self.namespace.work_dir, - constants.CERT_KEY_BACKUP_DIR, self.server_path) - @property def in_progress_dir(self): # pylint: disable=missing-docstring return os.path.join(self.namespace.work_dir, constants.IN_PROGRESS_DIR) diff --git a/letsencrypt/constants.py b/letsencrypt/constants.py index 6c67ce445..adca4ed02 100644 --- a/letsencrypt/constants.py +++ b/letsencrypt/constants.py @@ -71,10 +71,6 @@ BACKUP_DIR = "backups" CERT_DIR = "certs" """See `.IConfig.cert_dir`.""" -CERT_KEY_BACKUP_DIR = "keys-certs" -"""Directory where all certificates and keys are stored (relative to -`IConfig.work_dir`). Used for easy revocation.""" - IN_PROGRESS_DIR = "IN_PROGRESS" """Directory used before a permanent checkpoint is finalized (relative to `IConfig.work_dir`).""" diff --git a/letsencrypt/interfaces.py b/letsencrypt/interfaces.py index 5db92b368..345a0d779 100644 --- a/letsencrypt/interfaces.py +++ b/letsencrypt/interfaces.py @@ -208,9 +208,6 @@ class IConfig(zope.interface.Interface): cert_dir = zope.interface.Attribute( "Directory where newly generated Certificate Signing Requests " "(CSRs) and certificates not enrolled in the renewer are saved.") - cert_key_backup = zope.interface.Attribute( - "Directory where all certificates and keys are stored. " - "Used for easy revocation.") in_progress_dir = zope.interface.Attribute( "Directory used before a permanent checkpoint is finalized.") key_dir = zope.interface.Attribute("Keys storage.") diff --git a/letsencrypt/revoker.py b/letsencrypt/revoker.py deleted file mode 100644 index 32c6f003d..000000000 --- a/letsencrypt/revoker.py +++ /dev/null @@ -1,560 +0,0 @@ -"""Revoker module to enable LE revocations. - -The backend of this module would fit a database quite nicely, but in order to -minimize dependencies and maintain transparency, the class currently implements -its own storage system. The number of certs that will likely be stored on any -given client might not warrant requiring a database. - -""" -import collections -import csv -import logging -import os -import shutil -import tempfile - -import OpenSSL - -from acme import client as acme_client -from acme import crypto_util as acme_crypto_util -from acme.jose import util as jose_util - -from letsencrypt import crypto_util -from letsencrypt import errors -from letsencrypt import le_util - -from letsencrypt.display import util as display_util -from letsencrypt.display import revocation - - -logger = logging.getLogger(__name__) - - -class Revoker(object): - """A revocation class for LE. - - .. todo:: Add a method to specify your own certificate for revocation - CLI - - :ivar .acme.client.Client acme: ACME client - - :ivar installer: Installer object - :type installer: :class:`~letsencrypt.interfaces.IInstaller` - - :ivar config: Configuration. - :type config: :class:`~letsencrypt.interfaces.IConfig` - - :ivar bool no_confirm: Whether or not to ask for confirmation for revocation - - """ - def __init__(self, installer, config, no_confirm=False): - # XXX - self.acme = acme_client.Client(directory=None, key=None, alg=None) - - self.installer = installer - self.config = config - self.no_confirm = no_confirm - - le_util.make_or_verify_dir(config.cert_key_backup, 0o700, os.geteuid(), - self.config.strict_permissions) - - # TODO: Find a better solution for this... - self.list_path = os.path.join(config.cert_key_backup, "LIST") - # Make sure that the file is available for use for rest of class - open(self.list_path, "a").close() - - def revoke_from_key(self, authkey): - """Revoke all certificates under an authorized key. - - :param authkey: Authorized key used in previous transactions - :type authkey: :class:`letsencrypt.le_util.Key` - - """ - certs = [] - try: - clean_pem = OpenSSL.crypto.dump_privatekey( - OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.load_privatekey( - OpenSSL.crypto.FILETYPE_PEM, authkey.pem)) - except OpenSSL.crypto.Error as error: - logger.debug(error, exc_info=True) - raise errors.RevokerError( - "Invalid key file specified to revoke_from_key") - - with open(self.list_path, "rb") as csvfile: - csvreader = csv.reader(csvfile) - for row in csvreader: - # idx, cert, key - # Add all keys that match to marked list - # Note: The key can be different than the pub key found in the - # certificate. - _, b_k = self._row_to_backup(row) - try: - test_pem = OpenSSL.crypto.dump_privatekey( - OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.load_privatekey( - OpenSSL.crypto.FILETYPE_PEM, open(b_k).read())) - except OpenSSL.crypto.Error as error: - logger.debug(error, exc_info=True) - # This should never happen given the assumptions of the - # module. If it does, it is probably best to delete the - # the offending key/cert. For now... just raise an exception - raise errors.RevokerError("%s - backup file is corrupted.") - - if clean_pem == test_pem: - certs.append( - Cert.fromrow(row, self.config.cert_key_backup)) - if certs: - self._safe_revoke(certs) - else: - logger.info("No certificates using the authorized key were found.") - - def revoke_from_cert(self, cert_path): - """Revoke a certificate by specifying a file path. - - .. todo:: Add the ability to revoke the certificate even if the cert - is not stored locally. A path to the auth key will need to be - attained from the user. - - :param str cert_path: path to ACME certificate in pem form - - """ - # Locate the correct certificate (do not rely on filename) - cert_to_revoke = Cert(cert_path) - - with open(self.list_path, "rb") as csvfile: - csvreader = csv.reader(csvfile) - for row in csvreader: - cert = Cert.fromrow(row, self.config.cert_key_backup) - - if cert.get_der() == cert_to_revoke.get_der(): - self._safe_revoke([cert]) - return - - logger.info("Associated ACME certificate was not found.") - - def revoke_from_menu(self): - """List trusted Let's Encrypt certificates.""" - - csha1_vhlist = self._get_installed_locations() - certs = self._populate_saved_certs(csha1_vhlist) - - while True: - if certs: - code, selection = revocation.display_certs(certs) - - if code == display_util.OK: - revoked_certs = self._safe_revoke([certs[selection]]) - # Since we are currently only revoking one cert at a time... - if revoked_certs: - del certs[selection] - elif code == display_util.HELP: - revocation.more_info_cert(certs[selection]) - else: - return - else: - logger.info( - "There are not any trusted Let's Encrypt " - "certificates for this server.") - return - - def _populate_saved_certs(self, csha1_vhlist): - # pylint: disable=no-self-use - """Populate a list of all the saved certs. - - It is important to read from the file rather than the directory. - We assume that the LIST file is the master record and depending on - program crashes, this may differ from what is actually in the directory. - Namely, additional certs/keys may exist. There should never be any - certs/keys in the LIST that don't exist in the directory however. - - :param dict csha1_vhlist: map from cert sha1 fingerprints to a list - of it's installed location paths. - - """ - certs = [] - with open(self.list_path, "rb") as csvfile: - csvreader = csv.reader(csvfile) - # idx, orig_cert, orig_key - for row in csvreader: - cert = Cert.fromrow(row, self.config.cert_key_backup) - - # If we were able to find the cert installed... update status - cert.installed = csha1_vhlist.get(cert.get_fingerprint(), []) - - certs.append(cert) - - return certs - - def _get_installed_locations(self): - """Get installed locations of certificates. - - :returns: map from cert sha1 fingerprint to :class:`list` of vhosts - where the certificate is installed. - - """ - csha1_vhlist = {} - - if self.installer is None: - return csha1_vhlist - - for (cert_path, _, path) in self.installer.get_all_certs_keys(): - try: - with open(cert_path) as cert_file: - cert_data = cert_file.read() - except IOError: - continue - try: - cert_obj, _ = crypto_util.pyopenssl_load_certificate(cert_data) - except errors.Error: - continue - cert_sha1 = cert_obj.digest("sha1") - if cert_sha1 in csha1_vhlist: - csha1_vhlist[cert_sha1].append(path) - else: - csha1_vhlist[cert_sha1] = [path] - - return csha1_vhlist - - def _safe_revoke(self, certs): - """Confirm and revoke certificates. - - :param certs: certs intended to be revoked - :type certs: :class:`list` of :class:`letsencrypt.revoker.Cert` - - :returns: certs successfully revoked - :rtype: :class:`list` of :class:`letsencrypt.revoker.Cert` - - """ - success_list = [] - try: - for cert in certs: - if self.no_confirm or revocation.confirm_revocation(cert): - try: - self._acme_revoke(cert) - except errors.Error: - # TODO: Improve error handling when networking is set... - logger.error( - "Unable to revoke cert:%s%s", os.linesep, str(cert)) - success_list.append(cert) - revocation.success_revocation(cert) - finally: - if success_list: - self._remove_certs_keys(success_list) - - return success_list - - def _acme_revoke(self, cert): - """Revoke the certificate with the ACME server. - - :param cert: certificate to revoke - :type cert: :class:`letsencrypt.revoker.Cert` - - :returns: TODO - - """ - # XXX | pylint: disable=unused-variable - - # pylint: disable=protected-access - certificate = jose_util.ComparableX509(cert._cert) - try: - with open(cert.backup_key_path, "rU") as backup_key_file: - key = OpenSSL.crypto.load_privatekey( - OpenSSL.crypto.FILETYPE_PEM, backup_key_file.read()) - # If the key file doesn't exist... or is corrupted - except OpenSSL.crypto.Error as error: - logger.debug(error, exc_info=True) - raise errors.RevokerError( - "Corrupted backup key file: %s" % cert.backup_key_path) - - return self.acme.revoke(cert=None) # XXX - - def _remove_certs_keys(self, cert_list): # pylint: disable=no-self-use - """Remove certificate and key. - - :param list cert_list: Must contain certs, each is of type - :class:`letsencrypt.revoker.Cert` - - """ - # This must occur first, LIST is the official key - self._remove_certs_from_list(cert_list) - - # Remove files - for cert in cert_list: - os.remove(cert.backup_path) - os.remove(cert.backup_key_path) - - def _remove_certs_from_list(self, cert_list): # pylint: disable=no-self-use - """Remove a certificate from the LIST file. - - :param list cert_list: Must contain valid certs, each is of type - :class:`letsencrypt.revoker.Cert` - - """ - newfile_handle, list_path2 = tempfile.mkstemp(".tmp", "LIST") - idx = 0 - - with open(self.list_path, "rb") as orgfile: - csvreader = csv.reader(orgfile) - with os.fdopen(newfile_handle, "wb") as newfile: - csvwriter = csv.writer(newfile) - - for row in csvreader: - if idx >= len(cert_list) or row != cert_list[idx].get_row(): - csvwriter.writerow(row) - else: - idx += 1 - - # This should never happen... - if idx != len(cert_list): - raise errors.RevokerError( - "Did not find all cert_list items to remove from LIST") - - shutil.copy2(list_path2, self.list_path) - os.remove(list_path2) - - def _row_to_backup(self, row): - """Convenience function - - :param list row: csv file row 'idx', 'cert_path', 'key_path' - - :returns: tuple of the form ('backup_cert_path', 'backup_key_path') - :rtype: tuple - - """ - return (self._get_backup(self.config.cert_key_backup, row[0], row[1]), - self._get_backup(self.config.cert_key_backup, row[0], row[2])) - - @classmethod - def store_cert_key(cls, cert_path, key_path, config): - """Store certificate key. (Used to allow quick revocation) - - :param str cert_path: Path to a certificate file. - :param str key_path: Path to authorized key for certificate - - :ivar config: Configuration. - :type config: :class:`~letsencrypt.interfaces.IConfig` - - """ - list_path = os.path.join(config.cert_key_backup, "LIST") - le_util.make_or_verify_dir(config.cert_key_backup, 0o700, os.geteuid(), - config.strict_permissions) - - cls._catalog_files( - config.cert_key_backup, cert_path, key_path, list_path) - - @classmethod - def _catalog_files(cls, backup_dir, cert_path, key_path, list_path): - idx = 0 - if os.path.isfile(list_path): - with open(list_path, "r+b") as csvfile: - csvreader = csv.reader(csvfile) - - # Find the highest index in the file - for row in csvreader: - idx = int(row[0]) + 1 - csvwriter = csv.writer(csvfile) - # You must move the files before appending the row - cls._copy_files(backup_dir, idx, cert_path, key_path) - csvwriter.writerow([str(idx), cert_path, key_path]) - - else: - with open(list_path, "wb") as csvfile: - csvwriter = csv.writer(csvfile) - # You must move the files before appending the row - cls._copy_files(backup_dir, idx, cert_path, key_path) - csvwriter.writerow([str(idx), cert_path, key_path]) - - @classmethod - def _copy_files(cls, backup_dir, idx, cert_path, key_path): - """Copies the files into the backup dir appropriately.""" - shutil.copy2(cert_path, cls._get_backup(backup_dir, idx, cert_path)) - shutil.copy2(key_path, cls._get_backup(backup_dir, idx, key_path)) - - @classmethod - def _get_backup(cls, backup_dir, idx, orig_path): - """Returns the path to the backup.""" - return os.path.join( - backup_dir, "{name}_{idx}".format( - name=os.path.basename(orig_path), idx=str(idx))) - - -class Cert(object): - """Cert object used for Revocation convenience. - - :ivar _cert: Certificate - :type _cert: :class:`OpenSSL.crypto.X509` - - :ivar int idx: convenience index used for listing - :ivar orig: (`str` path - original certificate, `str` status) - :type orig: :class:`PathStatus` - :ivar orig_key: (`str` path - original auth key, `str` status) - :type orig_key: :class:`PathStatus` - :ivar str backup_path: backup filepath of the certificate - :ivar str backup_key_path: backup filepath of the authorized key - - :ivar list installed: `list` of `str` describing all locations the cert - is installed - - """ - PathStatus = collections.namedtuple("PathStatus", "path status") - """Convenience container to hold path and status info""" - - DELETED_MSG = "This file has been moved or deleted" - CHANGED_MSG = "This file has changed" - - def __init__(self, cert_path): - """Cert initialization - - :param str cert_filepath: Name of file containing certificate in - PEM format. - - """ - try: - with open(cert_path) as cert_file: - cert_data = cert_file.read() - except IOError: - raise errors.RevokerError( - "Error loading certificate: %s" % cert_path) - - try: - self._cert = OpenSSL.crypto.load_certificate( - OpenSSL.crypto.FILETYPE_PEM, cert_data) - except OpenSSL.crypto.Error: - raise errors.RevokerError( - "Error loading certificate: %s" % cert_path) - - self.idx = -1 - - self.orig = None - self.orig_key = None - self.backup_path = "" - self.backup_key_path = "" - - self.installed = ["Unknown"] - - @classmethod - def fromrow(cls, row, backup_dir): - # pylint: disable=protected-access - """Initialize Cert from a csv row.""" - idx = int(row[0]) - backup = Revoker._get_backup(backup_dir, idx, row[1]) - backup_key = Revoker._get_backup(backup_dir, idx, row[2]) - - obj = cls(backup) - obj.add_meta(idx, row[1], row[2], backup, backup_key) - return obj - - def get_row(self): - """Returns a list in CSV format. If meta data is available.""" - if self.orig is not None and self.orig_key is not None: - return [str(self.idx), self.orig.path, self.orig_key.path] - return None - - def add_meta(self, idx, orig, orig_key, backup, backup_key): - """Add meta data to cert - - :param int idx: convenience index for revoker - :param tuple orig: (`str` original certificate filepath, `str` status) - :param tuple orig_key: (`str` original auth key path, `str` status) - :param str backup: backup certificate filepath - :param str backup_key: backup key filepath - - """ - status = "" - key_status = "" - - # Verify original cert path - if not os.path.isfile(orig): - status = Cert.DELETED_MSG - else: - with open(orig) as orig_file: - orig_data = orig_file.read() - o_cert = OpenSSL.crypto.load_certificate( - OpenSSL.crypto.FILETYPE_PEM, orig_data) - if self.get_fingerprint() != o_cert.digest("sha1"): - status = Cert.CHANGED_MSG - - # Verify original key path - if not os.path.isfile(orig_key): - key_status = Cert.DELETED_MSG - else: - with open(orig_key, "r") as fd: - key_pem = fd.read() - with open(backup_key, "r") as fd: - backup_key_pem = fd.read() - if key_pem != backup_key_pem: - key_status = Cert.CHANGED_MSG - - self.idx = idx - self.orig = Cert.PathStatus(orig, status) - self.orig_key = Cert.PathStatus(orig_key, key_status) - self.backup_path = backup - self.backup_key_path = backup_key - - def get_cn(self): - """Get common name.""" - return self._cert.get_subject().CN - - def get_fingerprint(self): - """Get SHA1 fingerprint.""" - return self._cert.digest("sha1") - - def get_not_before(self): - """Get not_valid_before field.""" - return crypto_util.asn1_generalizedtime_to_dt( - self._cert.get_notBefore()) - - def get_not_after(self): - """Get not_valid_after field.""" - return crypto_util.asn1_generalizedtime_to_dt( - self._cert.get_notAfter()) - - def get_der(self): - """Get certificate in der format.""" - return OpenSSL.crypto.dump_certificate( - OpenSSL.crypto.FILETYPE_ASN1, self._cert) - - def get_pub_key(self): - """Get public key size. - - .. todo:: Support for ECC - - """ - return "RSA {0}".format(self._cert.get_pubkey().bits) - - def get_san(self): - """Get subject alternative name if available.""" - # pylint: disable=protected-access - return ", ".join(acme_crypto_util._pyopenssl_cert_or_req_san(self._cert)) - - def __str__(self): - text = [ - "Subject: %s" % crypto_util.pyopenssl_x509_name_as_text( - self._cert.get_subject()), - "SAN: %s" % self.get_san(), - "Issuer: %s" % crypto_util.pyopenssl_x509_name_as_text( - self._cert.get_issuer()), - "Public Key: %s" % self.get_pub_key(), - "Not Before: %s" % str(self.get_not_before()), - "Not After: %s" % str(self.get_not_after()), - "Serial Number: %s" % self._cert.get_serial_number(), - "SHA1: %s%s" % (self.get_fingerprint(), os.linesep), - "Installed: %s" % ", ".join(self.installed), - ] - - if self.orig is not None: - if self.orig.status == "": - text.append("Path: %s" % self.orig.path) - else: - text.append("Orig Path: %s (%s)" % self.orig) - if self.orig_key is not None: - if self.orig_key.status == "": - text.append("Auth Key Path: %s" % self.orig_key.path) - else: - text.append("Orig Auth Key Path: %s (%s)" % self.orig_key) - - text.append("") - return os.linesep.join(text) - - def pretty_print(self): - """Nicely frames a cert str""" - frame = "-" * (display_util.WIDTH - 4) + os.linesep - return "{frame}{cert}{frame}".format(frame=frame, cert=str(self)) diff --git a/letsencrypt/tests/configuration_test.py b/letsencrypt/tests/configuration_test.py index 498147c6d..110bfe223 100644 --- a/letsencrypt/tests/configuration_test.py +++ b/letsencrypt/tests/configuration_test.py @@ -32,7 +32,6 @@ class NamespaceConfigTest(unittest.TestCase): def test_dynamic_dirs(self, constants): constants.ACCOUNTS_DIR = 'acc' constants.BACKUP_DIR = 'backups' - constants.CERT_KEY_BACKUP_DIR = 'c/' constants.CERT_DIR = 'certs' constants.IN_PROGRESS_DIR = '../p' constants.KEY_DIR = 'keys' @@ -42,8 +41,6 @@ class NamespaceConfigTest(unittest.TestCase): self.config.accounts_dir, '/tmp/config/acc/acme-server.org:443/new') self.assertEqual(self.config.backup_dir, '/tmp/foo/backups') self.assertEqual(self.config.cert_dir, '/tmp/config/certs') - self.assertEqual( - self.config.cert_key_backup, '/tmp/foo/c/acme-server.org:443/new') self.assertEqual(self.config.in_progress_dir, '/tmp/foo/../p') self.assertEqual(self.config.key_dir, '/tmp/config/keys') self.assertEqual(self.config.temp_checkpoint_dir, '/tmp/foo/t') diff --git a/letsencrypt/tests/revoker_test.py b/letsencrypt/tests/revoker_test.py deleted file mode 100644 index 87dab4eb8..000000000 --- a/letsencrypt/tests/revoker_test.py +++ /dev/null @@ -1,409 +0,0 @@ -"""Test letsencrypt.revoker.""" -import csv -import os -import shutil -import tempfile -import unittest - -import mock -import OpenSSL - -from letsencrypt import errors -from letsencrypt import le_util -from letsencrypt.display import util as display_util - -from letsencrypt.tests import test_util - - -KEY = OpenSSL.crypto.load_privatekey( - OpenSSL.crypto.FILETYPE_PEM, test_util.load_vector("rsa512_key.pem")) - - -class RevokerBase(unittest.TestCase): # pylint: disable=too-few-public-methods - """Base Class for Revoker Tests.""" - def setUp(self): - self.paths, self.certs, self.key_path = create_revoker_certs() - - self.backup_dir = tempfile.mkdtemp("cert_backup") - self.mock_config = mock.MagicMock(cert_key_backup=self.backup_dir) - - self.list_path = os.path.join(self.backup_dir, "LIST") - - def _store_certs(self): - # pylint: disable=protected-access - from letsencrypt.revoker import Revoker - Revoker.store_cert_key(self.paths[0], self.key_path, self.mock_config) - Revoker.store_cert_key(self.paths[1], self.key_path, self.mock_config) - - # Set metadata - for i in xrange(2): - self.certs[i].add_meta( - i, self.paths[i], self.key_path, - Revoker._get_backup(self.backup_dir, i, self.paths[i]), - Revoker._get_backup(self.backup_dir, i, self.key_path)) - - def _get_rows(self): - with open(self.list_path, "rb") as csvfile: - return [row for row in csv.reader(csvfile)] - - def _write_rows(self, rows): - with open(self.list_path, "wb") as csvfile: - csvwriter = csv.writer(csvfile) - for row in rows: - csvwriter.writerow(row) - - -class RevokerTest(RevokerBase): - def setUp(self): - from letsencrypt.revoker import Revoker - super(RevokerTest, self).setUp() - - with open(self.key_path) as key_file: - self.key = le_util.Key(self.key_path, key_file.read()) - - self._store_certs() - - self.revoker = Revoker( - installer=mock.MagicMock(), config=self.mock_config) - - def tearDown(self): - shutil.rmtree(self.backup_dir) - - @mock.patch("acme.client.Client.revoke") - @mock.patch("letsencrypt.revoker.revocation") - def test_revoke_by_key_all(self, mock_display, mock_acme): - mock_display().confirm_revocation.return_value = True - - self.revoker.revoke_from_key(self.key) - self.assertEqual(self._get_rows(), []) - - # Check to make sure backups were eliminated - for i in xrange(2): - self.assertFalse(self._backups_exist(self.certs[i].get_row())) - - self.assertEqual(mock_acme.call_count, 2) - - @mock.patch("letsencrypt.revoker.OpenSSL.crypto.load_privatekey") - def test_revoke_by_invalid_keys(self, mock_load_privatekey): - mock_load_privatekey.side_effect = OpenSSL.crypto.Error - self.assertRaises( - errors.RevokerError, self.revoker.revoke_from_key, self.key) - - mock_load_privatekey.side_effect = [KEY, OpenSSL.crypto.Error] - self.assertRaises( - errors.RevokerError, self.revoker.revoke_from_key, self.key) - - @mock.patch("acme.client.Client.revoke") - @mock.patch("letsencrypt.revoker.revocation") - def test_revoke_by_wrong_key(self, mock_display, mock_acme): - mock_display().confirm_revocation.return_value = True - - key_path = test_util.vector_path("rsa256_key.pem") - - wrong_key = le_util.Key(key_path, open(key_path).read()) - self.revoker.revoke_from_key(wrong_key) - - # Nothing was removed - self.assertEqual(len(self._get_rows()), 2) - # No revocation went through - self.assertEqual(mock_acme.call_count, 0) - - @mock.patch("acme.client.Client.revoke") - @mock.patch("letsencrypt.revoker.revocation") - def test_revoke_by_cert(self, mock_display, mock_acme): - mock_display().confirm_revocation.return_value = True - - self.revoker.revoke_from_cert(self.paths[1]) - - row0 = self.certs[0].get_row() - row1 = self.certs[1].get_row() - - self.assertEqual(self._get_rows(), [row0]) - - self.assertTrue(self._backups_exist(row0)) - self.assertFalse(self._backups_exist(row1)) - - self.assertEqual(mock_acme.call_count, 1) - - @mock.patch("acme.client.Client.revoke") - @mock.patch("letsencrypt.revoker.revocation") - def test_revoke_by_cert_not_found(self, mock_display, mock_acme): - mock_display().confirm_revocation.return_value = True - - self.revoker.revoke_from_cert(self.paths[0]) - self.revoker.revoke_from_cert(self.paths[0]) - - row0 = self.certs[0].get_row() - row1 = self.certs[1].get_row() - - # Same check as last time... just reversed. - self.assertEqual(self._get_rows(), [row1]) - - self.assertTrue(self._backups_exist(row1)) - self.assertFalse(self._backups_exist(row0)) - - self.assertEqual(mock_acme.call_count, 1) - - @mock.patch("acme.client.Client.revoke") - @mock.patch("letsencrypt.revoker.revocation") - def test_revoke_by_menu(self, mock_display, mock_acme): - mock_display().confirm_revocation.return_value = True - mock_display.display_certs.side_effect = [ - (display_util.HELP, 0), - (display_util.OK, 0), - (display_util.CANCEL, -1), - ] - - self.revoker.revoke_from_menu() - - row0 = self.certs[0].get_row() - row1 = self.certs[1].get_row() - - self.assertEqual(self._get_rows(), [row1]) - - self.assertFalse(self._backups_exist(row0)) - self.assertTrue(self._backups_exist(row1)) - - self.assertEqual(mock_acme.call_count, 1) - self.assertEqual(mock_display.more_info_cert.call_count, 1) - - @mock.patch("letsencrypt.revoker.logger") - @mock.patch("acme.client.Client.revoke") - @mock.patch("letsencrypt.revoker.revocation") - def test_revoke_by_menu_delete_all(self, mock_display, mock_acme, mock_log): - mock_display().confirm_revocation.return_value = True - mock_display.display_certs.return_value = (display_util.OK, 0) - - self.revoker.revoke_from_menu() - - self.assertEqual(self._get_rows(), []) - - # Everything should be deleted... - for i in xrange(2): - self.assertFalse(self._backups_exist(self.certs[i].get_row())) - - self.assertEqual(mock_acme.call_count, 2) - # Info is called when there aren't any certs left... - self.assertTrue(mock_log.info.called) - - @mock.patch("letsencrypt.revoker.revocation") - @mock.patch("letsencrypt.revoker.Revoker._acme_revoke") - @mock.patch("letsencrypt.revoker.logger") - def test_safe_revoke_acme_fail(self, mock_log, mock_revoke, mock_display): - # pylint: disable=protected-access - mock_revoke.side_effect = errors.Error - mock_display().confirm_revocation.return_value = True - - self.revoker._safe_revoke(self.certs) - self.assertTrue(mock_log.error.called) - - @mock.patch("letsencrypt.revoker.OpenSSL.crypto.load_privatekey") - def test_acme_revoke_failure(self, mock_load_privatekey): - # pylint: disable=protected-access - mock_load_privatekey.side_effect = OpenSSL.crypto.Error - self.assertRaises( - errors.Error, self.revoker._acme_revoke, self.certs[0]) - - def test_remove_certs_from_list_bad_certs(self): - # pylint: disable=protected-access - from letsencrypt.revoker import Cert - - new_cert = Cert(self.paths[0]) - - # This isn't stored in the db - new_cert.idx = 10 - new_cert.backup_path = self.paths[0] - new_cert.backup_key_path = self.key_path - new_cert.orig = Cert.PathStatus("false path", "not here") - new_cert.orig_key = Cert.PathStatus("false path", "not here") - - self.assertRaises(errors.RevokerError, - self.revoker._remove_certs_from_list, [new_cert]) - - def _backups_exist(self, row): - # pylint: disable=protected-access - cert_path, key_path = self.revoker._row_to_backup(row) - return os.path.isfile(cert_path) and os.path.isfile(key_path) - - -class RevokerInstallerTest(RevokerBase): - def setUp(self): - super(RevokerInstallerTest, self).setUp() - - self.installs = [ - ["installation/path0a", "installation/path0b"], - ["installation/path1"], - ] - - self.certs_keys = [ - (self.paths[0], self.key_path, self.installs[0][0]), - (self.paths[0], self.key_path, self.installs[0][1]), - (self.paths[1], self.key_path, self.installs[1][0]), - ] - - self._store_certs() - - def _get_revoker(self, installer): - from letsencrypt.revoker import Revoker - return Revoker(installer, self.mock_config) - - def test_no_installer_get_installed_locations(self): - # pylint: disable=protected-access - revoker = self._get_revoker(None) - self.assertEqual(revoker._get_installed_locations(), {}) - - def test_get_installed_locations(self): - # pylint: disable=protected-access - mock_installer = mock.MagicMock() - mock_installer.get_all_certs_keys.return_value = self.certs_keys - - revoker = self._get_revoker(mock_installer) - sha_vh = revoker._get_installed_locations() - - self.assertEqual(len(sha_vh), 2) - for i, cert in enumerate(self.certs): - self.assertTrue(cert.get_fingerprint() in sha_vh) - self.assertEqual( - sha_vh[cert.get_fingerprint()], self.installs[i]) - - @mock.patch("letsencrypt.revoker.OpenSSL.crypto.load_certificate") - def test_get_installed_load_failure(self, mock_load_certificate): - mock_installer = mock.MagicMock() - mock_installer.get_all_certs_keys.return_value = self.certs_keys - - mock_load_certificate.side_effect = OpenSSL.crypto.Error - - revoker = self._get_revoker(mock_installer) - - # pylint: disable=protected-access - self.assertEqual(revoker._get_installed_locations(), {}) - - def test_get_installed_load_failure_open(self): - tmp = tempfile.mkdtemp() - mock_installer = mock.MagicMock() - mock_installer.get_all_certs_keys.return_value = [( - os.path.join(tmp, 'missing'), None, None)] - revoker = self._get_revoker(mock_installer) - # pylint: disable=protected-access - self.assertEqual(revoker._get_installed_locations(), {}) - os.rmdir(tmp) - - -class RevokerClassMethodsTest(RevokerBase): - def setUp(self): - super(RevokerClassMethodsTest, self).setUp() - self.mock_config = mock.MagicMock(cert_key_backup=self.backup_dir) - - def tearDown(self): - shutil.rmtree(self.backup_dir) - - def _call(self, cert_path, key_path): - from letsencrypt.revoker import Revoker - Revoker.store_cert_key(cert_path, key_path, self.mock_config) - - def test_store_two(self): - from letsencrypt.revoker import Revoker - self._call(self.paths[0], self.key_path) - self._call(self.paths[1], self.key_path) - - self.assertTrue(os.path.isfile(self.list_path)) - rows = self._get_rows() - - for i, row in enumerate(rows): - # pylint: disable=protected-access - self.assertTrue(os.path.isfile( - Revoker._get_backup(self.backup_dir, i, self.paths[i]))) - self.assertTrue(os.path.isfile( - Revoker._get_backup(self.backup_dir, i, self.key_path))) - self.assertEqual([str(i), self.paths[i], self.key_path], row) - - self.assertEqual(len(rows), 2) - - def test_store_one_mixed(self): - from letsencrypt.revoker import Revoker - self._write_rows( - [["5", "blank", "blank"], ["18", "dc", "dc"], ["21", "b", "b"]]) - self._call(self.paths[0], self.key_path) - - self.assertEqual( - self._get_rows()[3], ["22", self.paths[0], self.key_path]) - - # pylint: disable=protected-access - self.assertTrue(os.path.isfile( - Revoker._get_backup(self.backup_dir, 22, self.paths[0]))) - self.assertTrue(os.path.isfile( - Revoker._get_backup(self.backup_dir, 22, self.key_path))) - - -class CertTest(unittest.TestCase): - def setUp(self): - self.paths, self.certs, self.key_path = create_revoker_certs() - - def test_failed_load(self): - from letsencrypt.revoker import Cert - self.assertRaises(errors.RevokerError, Cert, self.key_path) - - def test_failed_load_open(self): - tmp = tempfile.mkdtemp() - from letsencrypt.revoker import Cert - self.assertRaises( - errors.RevokerError, Cert, os.path.join(tmp, 'missing')) - os.rmdir(tmp) - - def test_no_row(self): - self.assertEqual(self.certs[0].get_row(), None) - - def test_meta_moved_files(self): - from letsencrypt.revoker import Cert - fake_path = "/not/a/real/path/r72d3t6" - self.certs[0].add_meta( - 0, fake_path, fake_path, self.paths[0], self.key_path) - - self.assertEqual(self.certs[0].orig.status, Cert.DELETED_MSG) - self.assertEqual(self.certs[0].orig_key.status, Cert.DELETED_MSG) - - def test_meta_changed_files(self): - from letsencrypt.revoker import Cert - self.certs[0].add_meta( - 0, self.paths[1], self.paths[1], self.paths[0], self.key_path) - - self.assertEqual(self.certs[0].orig.status, Cert.CHANGED_MSG) - self.assertEqual(self.certs[0].orig_key.status, Cert.CHANGED_MSG) - - def test_meta_no_status(self): - self.certs[0].add_meta( - 0, self.paths[0], self.key_path, self.paths[0], self.key_path) - - self.assertEqual(self.certs[0].orig.status, "") - self.assertEqual(self.certs[0].orig_key.status, "") - - def test_print_meta(self): - """Just make sure there aren't any major errors.""" - self.certs[0].add_meta( - 0, self.paths[0], self.key_path, self.paths[0], self.key_path) - # Changed path and deleted file - self.certs[1].add_meta( - 1, self.paths[0], "/not/a/path", self.paths[1], self.key_path) - self.assertTrue(self.certs[0].pretty_print()) - self.assertTrue(self.certs[1].pretty_print()) - - def test_print_no_meta(self): - self.assertTrue(self.certs[0].pretty_print()) - self.assertTrue(self.certs[1].pretty_print()) - - -def create_revoker_certs(): - """Create a few revoker.Cert objects.""" - cert0_path = test_util.vector_path("cert.pem") - cert1_path = test_util.vector_path("cert-san.pem") - key_path = test_util.vector_path("rsa512_key.pem") - - from letsencrypt.revoker import Cert - cert0 = Cert(cert0_path) - cert1 = Cert(cert1_path) - - return [cert0_path, cert1_path], [cert0, cert1], key_path - - -if __name__ == "__main__": - unittest.main() # pragma: no cover