Remove revoker and associated code

This commit is contained in:
James Kasten 2015-09-25 22:35:43 -07:00
parent 5cc9061413
commit 022c5c3c24
6 changed files with 0 additions and 985 deletions

View file

@ -19,7 +19,6 @@ class NamespaceConfig(object):
- `accounts_dir`
- `cert_dir`
- `cert_key_backup`
- `in_progress_dir`
- `key_dir`
- `renewer_config_file`
@ -57,11 +56,6 @@ class NamespaceConfig(object):
def cert_dir(self): # pylint: disable=missing-docstring
return os.path.join(self.namespace.config_dir, constants.CERT_DIR)
@property
def cert_key_backup(self): # pylint: disable=missing-docstring
return os.path.join(self.namespace.work_dir,
constants.CERT_KEY_BACKUP_DIR, self.server_path)
@property
def in_progress_dir(self): # pylint: disable=missing-docstring
return os.path.join(self.namespace.work_dir, constants.IN_PROGRESS_DIR)

View file

@ -71,10 +71,6 @@ BACKUP_DIR = "backups"
CERT_DIR = "certs"
"""See `.IConfig.cert_dir`."""
CERT_KEY_BACKUP_DIR = "keys-certs"
"""Directory where all certificates and keys are stored (relative to
`IConfig.work_dir`). Used for easy revocation."""
IN_PROGRESS_DIR = "IN_PROGRESS"
"""Directory used before a permanent checkpoint is finalized (relative to
`IConfig.work_dir`)."""

View file

@ -208,9 +208,6 @@ class IConfig(zope.interface.Interface):
cert_dir = zope.interface.Attribute(
"Directory where newly generated Certificate Signing Requests "
"(CSRs) and certificates not enrolled in the renewer are saved.")
cert_key_backup = zope.interface.Attribute(
"Directory where all certificates and keys are stored. "
"Used for easy revocation.")
in_progress_dir = zope.interface.Attribute(
"Directory used before a permanent checkpoint is finalized.")
key_dir = zope.interface.Attribute("Keys storage.")

View file

@ -1,560 +0,0 @@
"""Revoker module to enable LE revocations.
The backend of this module would fit a database quite nicely, but in order to
minimize dependencies and maintain transparency, the class currently implements
its own storage system. The number of certs that will likely be stored on any
given client might not warrant requiring a database.
"""
import collections
import csv
import logging
import os
import shutil
import tempfile
import OpenSSL
from acme import client as acme_client
from acme import crypto_util as acme_crypto_util
from acme.jose import util as jose_util
from letsencrypt import crypto_util
from letsencrypt import errors
from letsencrypt import le_util
from letsencrypt.display import util as display_util
from letsencrypt.display import revocation
logger = logging.getLogger(__name__)
class Revoker(object):
"""A revocation class for LE.
.. todo:: Add a method to specify your own certificate for revocation - CLI
:ivar .acme.client.Client acme: ACME client
:ivar installer: Installer object
:type installer: :class:`~letsencrypt.interfaces.IInstaller`
:ivar config: Configuration.
:type config: :class:`~letsencrypt.interfaces.IConfig`
:ivar bool no_confirm: Whether or not to ask for confirmation for revocation
"""
def __init__(self, installer, config, no_confirm=False):
# XXX
self.acme = acme_client.Client(directory=None, key=None, alg=None)
self.installer = installer
self.config = config
self.no_confirm = no_confirm
le_util.make_or_verify_dir(config.cert_key_backup, 0o700, os.geteuid(),
self.config.strict_permissions)
# TODO: Find a better solution for this...
self.list_path = os.path.join(config.cert_key_backup, "LIST")
# Make sure that the file is available for use for rest of class
open(self.list_path, "a").close()
def revoke_from_key(self, authkey):
"""Revoke all certificates under an authorized key.
:param authkey: Authorized key used in previous transactions
:type authkey: :class:`letsencrypt.le_util.Key`
"""
certs = []
try:
clean_pem = OpenSSL.crypto.dump_privatekey(
OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, authkey.pem))
except OpenSSL.crypto.Error as error:
logger.debug(error, exc_info=True)
raise errors.RevokerError(
"Invalid key file specified to revoke_from_key")
with open(self.list_path, "rb") as csvfile:
csvreader = csv.reader(csvfile)
for row in csvreader:
# idx, cert, key
# Add all keys that match to marked list
# Note: The key can be different than the pub key found in the
# certificate.
_, b_k = self._row_to_backup(row)
try:
test_pem = OpenSSL.crypto.dump_privatekey(
OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, open(b_k).read()))
except OpenSSL.crypto.Error as error:
logger.debug(error, exc_info=True)
# This should never happen given the assumptions of the
# module. If it does, it is probably best to delete the
# the offending key/cert. For now... just raise an exception
raise errors.RevokerError("%s - backup file is corrupted.")
if clean_pem == test_pem:
certs.append(
Cert.fromrow(row, self.config.cert_key_backup))
if certs:
self._safe_revoke(certs)
else:
logger.info("No certificates using the authorized key were found.")
def revoke_from_cert(self, cert_path):
"""Revoke a certificate by specifying a file path.
.. todo:: Add the ability to revoke the certificate even if the cert
is not stored locally. A path to the auth key will need to be
attained from the user.
:param str cert_path: path to ACME certificate in pem form
"""
# Locate the correct certificate (do not rely on filename)
cert_to_revoke = Cert(cert_path)
with open(self.list_path, "rb") as csvfile:
csvreader = csv.reader(csvfile)
for row in csvreader:
cert = Cert.fromrow(row, self.config.cert_key_backup)
if cert.get_der() == cert_to_revoke.get_der():
self._safe_revoke([cert])
return
logger.info("Associated ACME certificate was not found.")
def revoke_from_menu(self):
"""List trusted Let's Encrypt certificates."""
csha1_vhlist = self._get_installed_locations()
certs = self._populate_saved_certs(csha1_vhlist)
while True:
if certs:
code, selection = revocation.display_certs(certs)
if code == display_util.OK:
revoked_certs = self._safe_revoke([certs[selection]])
# Since we are currently only revoking one cert at a time...
if revoked_certs:
del certs[selection]
elif code == display_util.HELP:
revocation.more_info_cert(certs[selection])
else:
return
else:
logger.info(
"There are not any trusted Let's Encrypt "
"certificates for this server.")
return
def _populate_saved_certs(self, csha1_vhlist):
# pylint: disable=no-self-use
"""Populate a list of all the saved certs.
It is important to read from the file rather than the directory.
We assume that the LIST file is the master record and depending on
program crashes, this may differ from what is actually in the directory.
Namely, additional certs/keys may exist. There should never be any
certs/keys in the LIST that don't exist in the directory however.
:param dict csha1_vhlist: map from cert sha1 fingerprints to a list
of it's installed location paths.
"""
certs = []
with open(self.list_path, "rb") as csvfile:
csvreader = csv.reader(csvfile)
# idx, orig_cert, orig_key
for row in csvreader:
cert = Cert.fromrow(row, self.config.cert_key_backup)
# If we were able to find the cert installed... update status
cert.installed = csha1_vhlist.get(cert.get_fingerprint(), [])
certs.append(cert)
return certs
def _get_installed_locations(self):
"""Get installed locations of certificates.
:returns: map from cert sha1 fingerprint to :class:`list` of vhosts
where the certificate is installed.
"""
csha1_vhlist = {}
if self.installer is None:
return csha1_vhlist
for (cert_path, _, path) in self.installer.get_all_certs_keys():
try:
with open(cert_path) as cert_file:
cert_data = cert_file.read()
except IOError:
continue
try:
cert_obj, _ = crypto_util.pyopenssl_load_certificate(cert_data)
except errors.Error:
continue
cert_sha1 = cert_obj.digest("sha1")
if cert_sha1 in csha1_vhlist:
csha1_vhlist[cert_sha1].append(path)
else:
csha1_vhlist[cert_sha1] = [path]
return csha1_vhlist
def _safe_revoke(self, certs):
"""Confirm and revoke certificates.
:param certs: certs intended to be revoked
:type certs: :class:`list` of :class:`letsencrypt.revoker.Cert`
:returns: certs successfully revoked
:rtype: :class:`list` of :class:`letsencrypt.revoker.Cert`
"""
success_list = []
try:
for cert in certs:
if self.no_confirm or revocation.confirm_revocation(cert):
try:
self._acme_revoke(cert)
except errors.Error:
# TODO: Improve error handling when networking is set...
logger.error(
"Unable to revoke cert:%s%s", os.linesep, str(cert))
success_list.append(cert)
revocation.success_revocation(cert)
finally:
if success_list:
self._remove_certs_keys(success_list)
return success_list
def _acme_revoke(self, cert):
"""Revoke the certificate with the ACME server.
:param cert: certificate to revoke
:type cert: :class:`letsencrypt.revoker.Cert`
:returns: TODO
"""
# XXX | pylint: disable=unused-variable
# pylint: disable=protected-access
certificate = jose_util.ComparableX509(cert._cert)
try:
with open(cert.backup_key_path, "rU") as backup_key_file:
key = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, backup_key_file.read())
# If the key file doesn't exist... or is corrupted
except OpenSSL.crypto.Error as error:
logger.debug(error, exc_info=True)
raise errors.RevokerError(
"Corrupted backup key file: %s" % cert.backup_key_path)
return self.acme.revoke(cert=None) # XXX
def _remove_certs_keys(self, cert_list): # pylint: disable=no-self-use
"""Remove certificate and key.
:param list cert_list: Must contain certs, each is of type
:class:`letsencrypt.revoker.Cert`
"""
# This must occur first, LIST is the official key
self._remove_certs_from_list(cert_list)
# Remove files
for cert in cert_list:
os.remove(cert.backup_path)
os.remove(cert.backup_key_path)
def _remove_certs_from_list(self, cert_list): # pylint: disable=no-self-use
"""Remove a certificate from the LIST file.
:param list cert_list: Must contain valid certs, each is of type
:class:`letsencrypt.revoker.Cert`
"""
newfile_handle, list_path2 = tempfile.mkstemp(".tmp", "LIST")
idx = 0
with open(self.list_path, "rb") as orgfile:
csvreader = csv.reader(orgfile)
with os.fdopen(newfile_handle, "wb") as newfile:
csvwriter = csv.writer(newfile)
for row in csvreader:
if idx >= len(cert_list) or row != cert_list[idx].get_row():
csvwriter.writerow(row)
else:
idx += 1
# This should never happen...
if idx != len(cert_list):
raise errors.RevokerError(
"Did not find all cert_list items to remove from LIST")
shutil.copy2(list_path2, self.list_path)
os.remove(list_path2)
def _row_to_backup(self, row):
"""Convenience function
:param list row: csv file row 'idx', 'cert_path', 'key_path'
:returns: tuple of the form ('backup_cert_path', 'backup_key_path')
:rtype: tuple
"""
return (self._get_backup(self.config.cert_key_backup, row[0], row[1]),
self._get_backup(self.config.cert_key_backup, row[0], row[2]))
@classmethod
def store_cert_key(cls, cert_path, key_path, config):
"""Store certificate key. (Used to allow quick revocation)
:param str cert_path: Path to a certificate file.
:param str key_path: Path to authorized key for certificate
:ivar config: Configuration.
:type config: :class:`~letsencrypt.interfaces.IConfig`
"""
list_path = os.path.join(config.cert_key_backup, "LIST")
le_util.make_or_verify_dir(config.cert_key_backup, 0o700, os.geteuid(),
config.strict_permissions)
cls._catalog_files(
config.cert_key_backup, cert_path, key_path, list_path)
@classmethod
def _catalog_files(cls, backup_dir, cert_path, key_path, list_path):
idx = 0
if os.path.isfile(list_path):
with open(list_path, "r+b") as csvfile:
csvreader = csv.reader(csvfile)
# Find the highest index in the file
for row in csvreader:
idx = int(row[0]) + 1
csvwriter = csv.writer(csvfile)
# You must move the files before appending the row
cls._copy_files(backup_dir, idx, cert_path, key_path)
csvwriter.writerow([str(idx), cert_path, key_path])
else:
with open(list_path, "wb") as csvfile:
csvwriter = csv.writer(csvfile)
# You must move the files before appending the row
cls._copy_files(backup_dir, idx, cert_path, key_path)
csvwriter.writerow([str(idx), cert_path, key_path])
@classmethod
def _copy_files(cls, backup_dir, idx, cert_path, key_path):
"""Copies the files into the backup dir appropriately."""
shutil.copy2(cert_path, cls._get_backup(backup_dir, idx, cert_path))
shutil.copy2(key_path, cls._get_backup(backup_dir, idx, key_path))
@classmethod
def _get_backup(cls, backup_dir, idx, orig_path):
"""Returns the path to the backup."""
return os.path.join(
backup_dir, "{name}_{idx}".format(
name=os.path.basename(orig_path), idx=str(idx)))
class Cert(object):
"""Cert object used for Revocation convenience.
:ivar _cert: Certificate
:type _cert: :class:`OpenSSL.crypto.X509`
:ivar int idx: convenience index used for listing
:ivar orig: (`str` path - original certificate, `str` status)
:type orig: :class:`PathStatus`
:ivar orig_key: (`str` path - original auth key, `str` status)
:type orig_key: :class:`PathStatus`
:ivar str backup_path: backup filepath of the certificate
:ivar str backup_key_path: backup filepath of the authorized key
:ivar list installed: `list` of `str` describing all locations the cert
is installed
"""
PathStatus = collections.namedtuple("PathStatus", "path status")
"""Convenience container to hold path and status info"""
DELETED_MSG = "This file has been moved or deleted"
CHANGED_MSG = "This file has changed"
def __init__(self, cert_path):
"""Cert initialization
:param str cert_filepath: Name of file containing certificate in
PEM format.
"""
try:
with open(cert_path) as cert_file:
cert_data = cert_file.read()
except IOError:
raise errors.RevokerError(
"Error loading certificate: %s" % cert_path)
try:
self._cert = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, cert_data)
except OpenSSL.crypto.Error:
raise errors.RevokerError(
"Error loading certificate: %s" % cert_path)
self.idx = -1
self.orig = None
self.orig_key = None
self.backup_path = ""
self.backup_key_path = ""
self.installed = ["Unknown"]
@classmethod
def fromrow(cls, row, backup_dir):
# pylint: disable=protected-access
"""Initialize Cert from a csv row."""
idx = int(row[0])
backup = Revoker._get_backup(backup_dir, idx, row[1])
backup_key = Revoker._get_backup(backup_dir, idx, row[2])
obj = cls(backup)
obj.add_meta(idx, row[1], row[2], backup, backup_key)
return obj
def get_row(self):
"""Returns a list in CSV format. If meta data is available."""
if self.orig is not None and self.orig_key is not None:
return [str(self.idx), self.orig.path, self.orig_key.path]
return None
def add_meta(self, idx, orig, orig_key, backup, backup_key):
"""Add meta data to cert
:param int idx: convenience index for revoker
:param tuple orig: (`str` original certificate filepath, `str` status)
:param tuple orig_key: (`str` original auth key path, `str` status)
:param str backup: backup certificate filepath
:param str backup_key: backup key filepath
"""
status = ""
key_status = ""
# Verify original cert path
if not os.path.isfile(orig):
status = Cert.DELETED_MSG
else:
with open(orig) as orig_file:
orig_data = orig_file.read()
o_cert = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, orig_data)
if self.get_fingerprint() != o_cert.digest("sha1"):
status = Cert.CHANGED_MSG
# Verify original key path
if not os.path.isfile(orig_key):
key_status = Cert.DELETED_MSG
else:
with open(orig_key, "r") as fd:
key_pem = fd.read()
with open(backup_key, "r") as fd:
backup_key_pem = fd.read()
if key_pem != backup_key_pem:
key_status = Cert.CHANGED_MSG
self.idx = idx
self.orig = Cert.PathStatus(orig, status)
self.orig_key = Cert.PathStatus(orig_key, key_status)
self.backup_path = backup
self.backup_key_path = backup_key
def get_cn(self):
"""Get common name."""
return self._cert.get_subject().CN
def get_fingerprint(self):
"""Get SHA1 fingerprint."""
return self._cert.digest("sha1")
def get_not_before(self):
"""Get not_valid_before field."""
return crypto_util.asn1_generalizedtime_to_dt(
self._cert.get_notBefore())
def get_not_after(self):
"""Get not_valid_after field."""
return crypto_util.asn1_generalizedtime_to_dt(
self._cert.get_notAfter())
def get_der(self):
"""Get certificate in der format."""
return OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_ASN1, self._cert)
def get_pub_key(self):
"""Get public key size.
.. todo:: Support for ECC
"""
return "RSA {0}".format(self._cert.get_pubkey().bits)
def get_san(self):
"""Get subject alternative name if available."""
# pylint: disable=protected-access
return ", ".join(acme_crypto_util._pyopenssl_cert_or_req_san(self._cert))
def __str__(self):
text = [
"Subject: %s" % crypto_util.pyopenssl_x509_name_as_text(
self._cert.get_subject()),
"SAN: %s" % self.get_san(),
"Issuer: %s" % crypto_util.pyopenssl_x509_name_as_text(
self._cert.get_issuer()),
"Public Key: %s" % self.get_pub_key(),
"Not Before: %s" % str(self.get_not_before()),
"Not After: %s" % str(self.get_not_after()),
"Serial Number: %s" % self._cert.get_serial_number(),
"SHA1: %s%s" % (self.get_fingerprint(), os.linesep),
"Installed: %s" % ", ".join(self.installed),
]
if self.orig is not None:
if self.orig.status == "":
text.append("Path: %s" % self.orig.path)
else:
text.append("Orig Path: %s (%s)" % self.orig)
if self.orig_key is not None:
if self.orig_key.status == "":
text.append("Auth Key Path: %s" % self.orig_key.path)
else:
text.append("Orig Auth Key Path: %s (%s)" % self.orig_key)
text.append("")
return os.linesep.join(text)
def pretty_print(self):
"""Nicely frames a cert str"""
frame = "-" * (display_util.WIDTH - 4) + os.linesep
return "{frame}{cert}{frame}".format(frame=frame, cert=str(self))

View file

@ -32,7 +32,6 @@ class NamespaceConfigTest(unittest.TestCase):
def test_dynamic_dirs(self, constants):
constants.ACCOUNTS_DIR = 'acc'
constants.BACKUP_DIR = 'backups'
constants.CERT_KEY_BACKUP_DIR = 'c/'
constants.CERT_DIR = 'certs'
constants.IN_PROGRESS_DIR = '../p'
constants.KEY_DIR = 'keys'
@ -42,8 +41,6 @@ class NamespaceConfigTest(unittest.TestCase):
self.config.accounts_dir, '/tmp/config/acc/acme-server.org:443/new')
self.assertEqual(self.config.backup_dir, '/tmp/foo/backups')
self.assertEqual(self.config.cert_dir, '/tmp/config/certs')
self.assertEqual(
self.config.cert_key_backup, '/tmp/foo/c/acme-server.org:443/new')
self.assertEqual(self.config.in_progress_dir, '/tmp/foo/../p')
self.assertEqual(self.config.key_dir, '/tmp/config/keys')
self.assertEqual(self.config.temp_checkpoint_dir, '/tmp/foo/t')

View file

@ -1,409 +0,0 @@
"""Test letsencrypt.revoker."""
import csv
import os
import shutil
import tempfile
import unittest
import mock
import OpenSSL
from letsencrypt import errors
from letsencrypt import le_util
from letsencrypt.display import util as display_util
from letsencrypt.tests import test_util
KEY = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, test_util.load_vector("rsa512_key.pem"))
class RevokerBase(unittest.TestCase): # pylint: disable=too-few-public-methods
"""Base Class for Revoker Tests."""
def setUp(self):
self.paths, self.certs, self.key_path = create_revoker_certs()
self.backup_dir = tempfile.mkdtemp("cert_backup")
self.mock_config = mock.MagicMock(cert_key_backup=self.backup_dir)
self.list_path = os.path.join(self.backup_dir, "LIST")
def _store_certs(self):
# pylint: disable=protected-access
from letsencrypt.revoker import Revoker
Revoker.store_cert_key(self.paths[0], self.key_path, self.mock_config)
Revoker.store_cert_key(self.paths[1], self.key_path, self.mock_config)
# Set metadata
for i in xrange(2):
self.certs[i].add_meta(
i, self.paths[i], self.key_path,
Revoker._get_backup(self.backup_dir, i, self.paths[i]),
Revoker._get_backup(self.backup_dir, i, self.key_path))
def _get_rows(self):
with open(self.list_path, "rb") as csvfile:
return [row for row in csv.reader(csvfile)]
def _write_rows(self, rows):
with open(self.list_path, "wb") as csvfile:
csvwriter = csv.writer(csvfile)
for row in rows:
csvwriter.writerow(row)
class RevokerTest(RevokerBase):
def setUp(self):
from letsencrypt.revoker import Revoker
super(RevokerTest, self).setUp()
with open(self.key_path) as key_file:
self.key = le_util.Key(self.key_path, key_file.read())
self._store_certs()
self.revoker = Revoker(
installer=mock.MagicMock(), config=self.mock_config)
def tearDown(self):
shutil.rmtree(self.backup_dir)
@mock.patch("acme.client.Client.revoke")
@mock.patch("letsencrypt.revoker.revocation")
def test_revoke_by_key_all(self, mock_display, mock_acme):
mock_display().confirm_revocation.return_value = True
self.revoker.revoke_from_key(self.key)
self.assertEqual(self._get_rows(), [])
# Check to make sure backups were eliminated
for i in xrange(2):
self.assertFalse(self._backups_exist(self.certs[i].get_row()))
self.assertEqual(mock_acme.call_count, 2)
@mock.patch("letsencrypt.revoker.OpenSSL.crypto.load_privatekey")
def test_revoke_by_invalid_keys(self, mock_load_privatekey):
mock_load_privatekey.side_effect = OpenSSL.crypto.Error
self.assertRaises(
errors.RevokerError, self.revoker.revoke_from_key, self.key)
mock_load_privatekey.side_effect = [KEY, OpenSSL.crypto.Error]
self.assertRaises(
errors.RevokerError, self.revoker.revoke_from_key, self.key)
@mock.patch("acme.client.Client.revoke")
@mock.patch("letsencrypt.revoker.revocation")
def test_revoke_by_wrong_key(self, mock_display, mock_acme):
mock_display().confirm_revocation.return_value = True
key_path = test_util.vector_path("rsa256_key.pem")
wrong_key = le_util.Key(key_path, open(key_path).read())
self.revoker.revoke_from_key(wrong_key)
# Nothing was removed
self.assertEqual(len(self._get_rows()), 2)
# No revocation went through
self.assertEqual(mock_acme.call_count, 0)
@mock.patch("acme.client.Client.revoke")
@mock.patch("letsencrypt.revoker.revocation")
def test_revoke_by_cert(self, mock_display, mock_acme):
mock_display().confirm_revocation.return_value = True
self.revoker.revoke_from_cert(self.paths[1])
row0 = self.certs[0].get_row()
row1 = self.certs[1].get_row()
self.assertEqual(self._get_rows(), [row0])
self.assertTrue(self._backups_exist(row0))
self.assertFalse(self._backups_exist(row1))
self.assertEqual(mock_acme.call_count, 1)
@mock.patch("acme.client.Client.revoke")
@mock.patch("letsencrypt.revoker.revocation")
def test_revoke_by_cert_not_found(self, mock_display, mock_acme):
mock_display().confirm_revocation.return_value = True
self.revoker.revoke_from_cert(self.paths[0])
self.revoker.revoke_from_cert(self.paths[0])
row0 = self.certs[0].get_row()
row1 = self.certs[1].get_row()
# Same check as last time... just reversed.
self.assertEqual(self._get_rows(), [row1])
self.assertTrue(self._backups_exist(row1))
self.assertFalse(self._backups_exist(row0))
self.assertEqual(mock_acme.call_count, 1)
@mock.patch("acme.client.Client.revoke")
@mock.patch("letsencrypt.revoker.revocation")
def test_revoke_by_menu(self, mock_display, mock_acme):
mock_display().confirm_revocation.return_value = True
mock_display.display_certs.side_effect = [
(display_util.HELP, 0),
(display_util.OK, 0),
(display_util.CANCEL, -1),
]
self.revoker.revoke_from_menu()
row0 = self.certs[0].get_row()
row1 = self.certs[1].get_row()
self.assertEqual(self._get_rows(), [row1])
self.assertFalse(self._backups_exist(row0))
self.assertTrue(self._backups_exist(row1))
self.assertEqual(mock_acme.call_count, 1)
self.assertEqual(mock_display.more_info_cert.call_count, 1)
@mock.patch("letsencrypt.revoker.logger")
@mock.patch("acme.client.Client.revoke")
@mock.patch("letsencrypt.revoker.revocation")
def test_revoke_by_menu_delete_all(self, mock_display, mock_acme, mock_log):
mock_display().confirm_revocation.return_value = True
mock_display.display_certs.return_value = (display_util.OK, 0)
self.revoker.revoke_from_menu()
self.assertEqual(self._get_rows(), [])
# Everything should be deleted...
for i in xrange(2):
self.assertFalse(self._backups_exist(self.certs[i].get_row()))
self.assertEqual(mock_acme.call_count, 2)
# Info is called when there aren't any certs left...
self.assertTrue(mock_log.info.called)
@mock.patch("letsencrypt.revoker.revocation")
@mock.patch("letsencrypt.revoker.Revoker._acme_revoke")
@mock.patch("letsencrypt.revoker.logger")
def test_safe_revoke_acme_fail(self, mock_log, mock_revoke, mock_display):
# pylint: disable=protected-access
mock_revoke.side_effect = errors.Error
mock_display().confirm_revocation.return_value = True
self.revoker._safe_revoke(self.certs)
self.assertTrue(mock_log.error.called)
@mock.patch("letsencrypt.revoker.OpenSSL.crypto.load_privatekey")
def test_acme_revoke_failure(self, mock_load_privatekey):
# pylint: disable=protected-access
mock_load_privatekey.side_effect = OpenSSL.crypto.Error
self.assertRaises(
errors.Error, self.revoker._acme_revoke, self.certs[0])
def test_remove_certs_from_list_bad_certs(self):
# pylint: disable=protected-access
from letsencrypt.revoker import Cert
new_cert = Cert(self.paths[0])
# This isn't stored in the db
new_cert.idx = 10
new_cert.backup_path = self.paths[0]
new_cert.backup_key_path = self.key_path
new_cert.orig = Cert.PathStatus("false path", "not here")
new_cert.orig_key = Cert.PathStatus("false path", "not here")
self.assertRaises(errors.RevokerError,
self.revoker._remove_certs_from_list, [new_cert])
def _backups_exist(self, row):
# pylint: disable=protected-access
cert_path, key_path = self.revoker._row_to_backup(row)
return os.path.isfile(cert_path) and os.path.isfile(key_path)
class RevokerInstallerTest(RevokerBase):
def setUp(self):
super(RevokerInstallerTest, self).setUp()
self.installs = [
["installation/path0a", "installation/path0b"],
["installation/path1"],
]
self.certs_keys = [
(self.paths[0], self.key_path, self.installs[0][0]),
(self.paths[0], self.key_path, self.installs[0][1]),
(self.paths[1], self.key_path, self.installs[1][0]),
]
self._store_certs()
def _get_revoker(self, installer):
from letsencrypt.revoker import Revoker
return Revoker(installer, self.mock_config)
def test_no_installer_get_installed_locations(self):
# pylint: disable=protected-access
revoker = self._get_revoker(None)
self.assertEqual(revoker._get_installed_locations(), {})
def test_get_installed_locations(self):
# pylint: disable=protected-access
mock_installer = mock.MagicMock()
mock_installer.get_all_certs_keys.return_value = self.certs_keys
revoker = self._get_revoker(mock_installer)
sha_vh = revoker._get_installed_locations()
self.assertEqual(len(sha_vh), 2)
for i, cert in enumerate(self.certs):
self.assertTrue(cert.get_fingerprint() in sha_vh)
self.assertEqual(
sha_vh[cert.get_fingerprint()], self.installs[i])
@mock.patch("letsencrypt.revoker.OpenSSL.crypto.load_certificate")
def test_get_installed_load_failure(self, mock_load_certificate):
mock_installer = mock.MagicMock()
mock_installer.get_all_certs_keys.return_value = self.certs_keys
mock_load_certificate.side_effect = OpenSSL.crypto.Error
revoker = self._get_revoker(mock_installer)
# pylint: disable=protected-access
self.assertEqual(revoker._get_installed_locations(), {})
def test_get_installed_load_failure_open(self):
tmp = tempfile.mkdtemp()
mock_installer = mock.MagicMock()
mock_installer.get_all_certs_keys.return_value = [(
os.path.join(tmp, 'missing'), None, None)]
revoker = self._get_revoker(mock_installer)
# pylint: disable=protected-access
self.assertEqual(revoker._get_installed_locations(), {})
os.rmdir(tmp)
class RevokerClassMethodsTest(RevokerBase):
def setUp(self):
super(RevokerClassMethodsTest, self).setUp()
self.mock_config = mock.MagicMock(cert_key_backup=self.backup_dir)
def tearDown(self):
shutil.rmtree(self.backup_dir)
def _call(self, cert_path, key_path):
from letsencrypt.revoker import Revoker
Revoker.store_cert_key(cert_path, key_path, self.mock_config)
def test_store_two(self):
from letsencrypt.revoker import Revoker
self._call(self.paths[0], self.key_path)
self._call(self.paths[1], self.key_path)
self.assertTrue(os.path.isfile(self.list_path))
rows = self._get_rows()
for i, row in enumerate(rows):
# pylint: disable=protected-access
self.assertTrue(os.path.isfile(
Revoker._get_backup(self.backup_dir, i, self.paths[i])))
self.assertTrue(os.path.isfile(
Revoker._get_backup(self.backup_dir, i, self.key_path)))
self.assertEqual([str(i), self.paths[i], self.key_path], row)
self.assertEqual(len(rows), 2)
def test_store_one_mixed(self):
from letsencrypt.revoker import Revoker
self._write_rows(
[["5", "blank", "blank"], ["18", "dc", "dc"], ["21", "b", "b"]])
self._call(self.paths[0], self.key_path)
self.assertEqual(
self._get_rows()[3], ["22", self.paths[0], self.key_path])
# pylint: disable=protected-access
self.assertTrue(os.path.isfile(
Revoker._get_backup(self.backup_dir, 22, self.paths[0])))
self.assertTrue(os.path.isfile(
Revoker._get_backup(self.backup_dir, 22, self.key_path)))
class CertTest(unittest.TestCase):
def setUp(self):
self.paths, self.certs, self.key_path = create_revoker_certs()
def test_failed_load(self):
from letsencrypt.revoker import Cert
self.assertRaises(errors.RevokerError, Cert, self.key_path)
def test_failed_load_open(self):
tmp = tempfile.mkdtemp()
from letsencrypt.revoker import Cert
self.assertRaises(
errors.RevokerError, Cert, os.path.join(tmp, 'missing'))
os.rmdir(tmp)
def test_no_row(self):
self.assertEqual(self.certs[0].get_row(), None)
def test_meta_moved_files(self):
from letsencrypt.revoker import Cert
fake_path = "/not/a/real/path/r72d3t6"
self.certs[0].add_meta(
0, fake_path, fake_path, self.paths[0], self.key_path)
self.assertEqual(self.certs[0].orig.status, Cert.DELETED_MSG)
self.assertEqual(self.certs[0].orig_key.status, Cert.DELETED_MSG)
def test_meta_changed_files(self):
from letsencrypt.revoker import Cert
self.certs[0].add_meta(
0, self.paths[1], self.paths[1], self.paths[0], self.key_path)
self.assertEqual(self.certs[0].orig.status, Cert.CHANGED_MSG)
self.assertEqual(self.certs[0].orig_key.status, Cert.CHANGED_MSG)
def test_meta_no_status(self):
self.certs[0].add_meta(
0, self.paths[0], self.key_path, self.paths[0], self.key_path)
self.assertEqual(self.certs[0].orig.status, "")
self.assertEqual(self.certs[0].orig_key.status, "")
def test_print_meta(self):
"""Just make sure there aren't any major errors."""
self.certs[0].add_meta(
0, self.paths[0], self.key_path, self.paths[0], self.key_path)
# Changed path and deleted file
self.certs[1].add_meta(
1, self.paths[0], "/not/a/path", self.paths[1], self.key_path)
self.assertTrue(self.certs[0].pretty_print())
self.assertTrue(self.certs[1].pretty_print())
def test_print_no_meta(self):
self.assertTrue(self.certs[0].pretty_print())
self.assertTrue(self.certs[1].pretty_print())
def create_revoker_certs():
"""Create a few revoker.Cert objects."""
cert0_path = test_util.vector_path("cert.pem")
cert1_path = test_util.vector_path("cert-san.pem")
key_path = test_util.vector_path("rsa512_key.pem")
from letsencrypt.revoker import Cert
cert0 = Cert(cert0_path)
cert1 = Cert(cert1_path)
return [cert0_path, cert1_path], [cert0, cert1], key_path
if __name__ == "__main__":
unittest.main() # pragma: no cover