From f77307c28bb94df81464f8642f17f220b401501e Mon Sep 17 00:00:00 2001 From: James Kasten Date: Wed, 18 Feb 2015 04:01:49 -0800 Subject: [PATCH] Finish revoker implementation and unittests --- letsencrypt/client/display/enhancements.py | 2 +- letsencrypt/client/display/ops.py | 2 +- letsencrypt/client/display/revocation.py | 37 +- letsencrypt/client/display/util.py | 5 +- letsencrypt/client/log.py | 2 +- letsencrypt/client/reverter.py | 2 +- letsencrypt/client/revoker.py | 222 ++++++------ .../client/tests/apache/parser_test.py | 2 +- .../client/tests/configuration_test.py | 6 +- .../client/tests/display/enhancements_test.py | 2 +- letsencrypt/client/tests/display/ops_test.py | 2 +- .../client/tests/display/revocation_test.py | 61 +++- letsencrypt/client/tests/display/util_test.py | 36 +- letsencrypt/client/tests/revoker_test.py | 322 +++++++++++++++++- letsencrypt/scripts/main.py | 2 +- tox.ini | 2 +- 16 files changed, 539 insertions(+), 168 deletions(-) diff --git a/letsencrypt/client/display/enhancements.py b/letsencrypt/client/display/enhancements.py index ed73f24db..c9a81a4b1 100644 --- a/letsencrypt/client/display/enhancements.py +++ b/letsencrypt/client/display/enhancements.py @@ -5,7 +5,7 @@ import zope.component from letsencrypt.client import errors from letsencrypt.client import interfaces -from letsencrypt.client.display import display_util +from letsencrypt.client.display import util as display_util def ask(enhancement): diff --git a/letsencrypt/client/display/ops.py b/letsencrypt/client/display/ops.py index c64d6a5e7..205f3fb08 100644 --- a/letsencrypt/client/display/ops.py +++ b/letsencrypt/client/display/ops.py @@ -5,7 +5,7 @@ import sys import zope.component from letsencrypt.client import interfaces -from letsencrypt.client.display import display_util +from letsencrypt.client.display import util as display_util # Define a helper function to avoid verbose code util = zope.component.getUtility # pylint: disable=invalid-name diff --git a/letsencrypt/client/display/revocation.py b/letsencrypt/client/display/revocation.py index 0646adb10..76666ddf6 100644 --- a/letsencrypt/client/display/revocation.py +++ b/letsencrypt/client/display/revocation.py @@ -4,37 +4,33 @@ import os import zope.component from letsencrypt.client import interfaces -from letsencrypt.client.display import display_util +from letsencrypt.client.display import util as display_util util = zope.component.getUtility # pylint: disable=invalid-name def choose_certs(certs): - """Display choose certificates menu. + """Choose a certificate from a menu. :param list certs: List of cert dicts. - :returns: cert to revoke - :rtype: :class:`letsencrypt.client.revoker.Cert` + :returns: selection (zero-based index) + :rtype: int """ - code, tag = display_certs(certs) + while True: + code, selection = _display_certs(certs) - if code == display_util.OK: - cert = certs[tag] - if confirm_revocation(cert): - return cert + if code == display_util.OK: + if confirm_revocation(certs[selection]): + return selection + elif code == display_util.HELP: + more_info_cert(certs[selection]) else: - choose_certs(certs) - elif code == display_util.HELP: - cert = certs[tag] - more_info_cert(cert) - choose_certs(certs) - else: - exit(0) + exit(0) -def display_certs(certs): +def _display_certs(certs): """Display the certificates in a menu for revocation. :param list certs: each is a :class:`letsencrypt.client.revoker.Cert` @@ -53,15 +49,12 @@ def display_certs(certs): else "") for cert in certs ] - print list_choices code, tag = util(interfaces.IDisplay).menu( "Which certificates would you like to revoke?", list_choices, help_label="More Info", ok_label="Revoke", cancel_label="Exit") - if not tag: - tag = -1 - return code, (int(tag) - 1) + return code, tag def confirm_revocation(cert): @@ -78,7 +71,7 @@ def confirm_revocation(cert): "certificate:{0}".format(os.linesep)) text += cert.pretty_print() text += "This action cannot be reversed!" - return display_util.OK == util(interfaces.IDisplay).yesno(text) + return util(interfaces.IDisplay).yesno(text) def more_info_cert(cert): diff --git a/letsencrypt/client/display/util.py b/letsencrypt/client/display/util.py index d0df0fa12..0b5e7c7d6 100644 --- a/letsencrypt/client/display/util.py +++ b/letsencrypt/client/display/util.py @@ -93,7 +93,10 @@ class NcursesDisplay(object): help_button=help_button, help_label=help_label, width=self.width, height=self.height) - return code, int(tag) - 1 + if code == OK: + return code, int(tag) - 1 + + return code, -1 def input(self, message): """Display an input box to the user. diff --git a/letsencrypt/client/log.py b/letsencrypt/client/log.py index 90d923f76..a267fa77e 100644 --- a/letsencrypt/client/log.py +++ b/letsencrypt/client/log.py @@ -3,7 +3,7 @@ import logging import dialog -from letsencrypt.client.display import display_util +from letsencrypt.client.display import util as display_util class DialogHandler(logging.Handler): # pylint: disable=too-few-public-methods diff --git a/letsencrypt/client/reverter.py b/letsencrypt/client/reverter.py index a6ae4323d..75ff8b9f6 100644 --- a/letsencrypt/client/reverter.py +++ b/letsencrypt/client/reverter.py @@ -9,7 +9,7 @@ import zope.component from letsencrypt.client import errors from letsencrypt.client import interfaces from letsencrypt.client import le_util -from letsencrypt.client.display import display_util +from letsencrypt.client.display import util as display_util class Reverter(object): diff --git a/letsencrypt/client/revoker.py b/letsencrypt/client/revoker.py index b9c72cc07..76898e533 100644 --- a/letsencrypt/client/revoker.py +++ b/letsencrypt/client/revoker.py @@ -22,7 +22,7 @@ from letsencrypt.client import errors from letsencrypt.client import le_util from letsencrypt.client import network -from letsencrypt.client.display import display_util +from letsencrypt.client.display import util as display_util from letsencrypt.client.display import revocation @@ -47,33 +47,12 @@ class Revoker(object): self.config = config self.no_confirm = no_confirm - le_util.make_or_verify_dir(config.cert_key_backup, 0o700) + le_util.make_or_verify_dir(config.cert_key_backup, 0o700, os.geteuid()) # TODO: Find a better solution for this... self.list_path = os.path.join(config.cert_key_backup, "LIST") - - def safe_revoke(self, certs): - """Confirm and revoke certificates. - - :param certs: certs intended to be revoked - :type certs: :class:`letsencrypt.client.revoker.Cert` - - """ - success_list = [] - try: - for cert in certs: - if self.no_confirm or revocation.confirm_revocation(cert): - revoc = self._acme_revoke( - cert.backup_path, cert.backup_key_path) - - if revoc is not None: - success_list.append(cert) - revocation.success_revocation(cert) - else: - # TODO: Display a nice explanation - pass - finally: - self._remove_certs_keys(success_list) + # Make sure that the file is available for use for rest of class + open(self.list_path, "a").close() def revoke_from_key(self, authkey): """Revoke all certificates under an authorized key. @@ -83,7 +62,7 @@ class Revoker(object): """ certs = [] - with open(self.list_path, "r") as csvfile: + with open(self.list_path, "rb") as csvfile: csvreader = csv.reader(csvfile) for row in csvreader: # idx, cert, key @@ -94,9 +73,10 @@ class Revoker(object): # certificate. _, b_k = self._row_to_backup(row) if authkey.pem == open(b_k).read(): - certs.append(Cert.fromrow(row)) + certs.append(Cert.fromrow(row, self.config.cert_key_backup)) - self.safe_revoke(certs) + if certs: + self._safe_revoke(certs) def revoke_from_cert(self, cert_path): """Revoke a certificate by specifying a file path. @@ -107,59 +87,36 @@ class Revoker(object): # Locate the correct certificate (do not rely on filename) cert_to_revoke = Cert(cert_path) - with open(self.list_path, "r") as csvfile: + with open(self.list_path, "rb") as csvfile: csvreader = csv.reader(csvfile) for row in csvreader: - cert = Cert.fromrow(row) + cert = Cert.fromrow(row, self.config.cert_key_backup) - # This uses md5 but it doesn't matter and it is easier to read - if cert.get_fingerprint() == cert_to_revoke.get_fingerprint(): - self.safe_revoke([cert]) + if cert == cert_to_revoke: + self._safe_revoke([cert]) - def _acme_revoke(self, cert_path, key_path): - """Revoke the certificate with the ACME server. - - :param str cert_path: path to certificate file - :param str key_path: path to associated private key or authorized key - - :returns: TODO - - """ - try: - cert_der = M2Crypto.X509.load_cert(cert_path).as_der() - with open(key_path, "rU") as backup_key_file: - key = backup_key_file.read() - - # If either of the files don't exist... or are corrupted - except (OSError, IOError, M2Crypto.X509.X509Error): - return None - - # TODO: Catch error associated with already revoked and proceed. - return self.network.send_and_receive_expected( - messages.RevocationRequest.create( - certificate=certificate, key=key), - messages.Revocation) - - def display_menu(self): + def revoke_from_menu(self): """List trusted Let's Encrypt certificates.""" - if not os.path.isfile(self.list_path): - logging.info( - "You don't have any certificates saved from letsencrypt") - return - csha1_vhlist = self._get_installed_locations() certs = self._populate_saved_certs(csha1_vhlist) - if certs: - cert = revocation.choose_certs(certs) - self.revoke_from_interface(cert) - # Recursive... - self.display_menu() - else: - logging.info( - "There are not any trusted Let's Encrypt " - "certificates for this server.") + while True: + if certs: + selection = revocation.choose_certs(certs) + + self._safe_revoke([certs[selection]]) + # This is safer than using remove as Revoker.Certs only check + # the DER value of the cert. There could potentially be multiple + # backup certs with the same value. + del certs[selection] + else: + logging.info( + "There are not any trusted Let's Encrypt " + "certificates for this server.") + return + + def _populate_saved_certs(self, csha1_vhlist): # pylint: disable=no-self-use @@ -177,7 +134,7 @@ class Revoker(object): csvreader = csv.reader(csvfile) # idx, orig_cert, orig_key for row in csvreader: - cert = Cert.fromrow(row) + cert = Cert.fromrow(row, self.config.cert_key_backup) # If we were able to find the cert installed... update status cert.installed = csha1_vhlist.get(cert.get_fingerprint(), []) @@ -189,8 +146,8 @@ class Revoker(object): def _get_installed_locations(self): """Get installed locations of certificates - :returns: cert sha1 fingerprint -> :class:`list` of vhosts where - the certificate is installed. + :returns: map from cert sha1 fingerprint to :class:`list` of vhosts + where the certificate is installed. """ csha1_vhlist = {} @@ -211,10 +168,58 @@ class Revoker(object): return csha1_vhlist + def _safe_revoke(self, certs): + """Confirm and revoke certificates. + + :param certs: certs intended to be revoked + :type certs: :class:`list` of :class:`letsencrypt.client.revoker.Cert` + + """ + success_list = [] + try: + for cert in certs: + if self.no_confirm or revocation.confirm_revocation(cert): + try: + self._acme_revoke(cert) + + success_list.append(cert) + revocation.success_revocation(cert) + except errors.LetsEncryptClientError: + # TODO: Improve error handling when networking is set... + logging.error( + "Unable to revoke cert:%s%s", os.linesep, str(cert)) + finally: + if success_list: + self._remove_certs_keys(success_list) + + def _acme_revoke(self, cert): + """Revoke the certificate with the ACME server. + + :param cert: certificate to revoke + :type cert: :class:`letsencrypt.client.revoker.Cert` + + :returns: TODO + + """ + try: + certificate = acme_util.ComparableX509(cert.cert) + with open(cert.backup_key_path, "rU") as backup_key_file: + key = Crypto.PublicKey.RSA.importKey(backup_key_file.read()) + + # If the key file doesn't exist... or is corrupted + except (OSError, IOError): + raise errors.LetsEncryptRevokerError("Unable to read key file") + + # TODO: Catch error associated with already revoked and proceed. + return self.network.send_and_receive_expected( + messages.RevocationRequest.create( + certificate=certificate, key=key), + messages.Revocation) + def _remove_certs_keys(self, cert_list): # pylint: disable=no-self-use """Remove certificate and key. - :param list cert_list: each is of type + :param list cert_list: Must contain certs, each is of type :class:`letsencrypt.client.revoker.Cert` """ @@ -229,26 +234,29 @@ class Revoker(object): def _remove_certs_from_list(self, cert_list): # pylint: disable=no-self-use """Remove a certificate from the LIST file. - :param list cert_list: each is of type + :param list cert_list: Must contain valid certs, each is of type :class:`letsencrypt.client.revoker.Cert` """ list_path2 = os.path.join(self.config.cert_key_backup, "LIST.tmp") idx = 0 + with open(self.list_path, "rb") as orgfile: csvreader = csv.reader(orgfile) with open(list_path2, "wb") as newfile: csvwriter = csv.writer(newfile) for row in csvreader: - if row != cert_list[idx].get_row(): + if idx >= len(cert_list) or row != cert_list[idx].get_row(): csvwriter.writerow(row) else: idx += 1 + # This should never happen... if idx != len(cert_list): - errors.LetsEncryptRevokerError("Did not find all cert_list items") + raise errors.LetsEncryptRevokerError( + "Did not find all cert_list items to remove from LIST") shutil.copy2(list_path2, self.list_path) os.remove(list_path2) @@ -266,35 +274,22 @@ class Revoker(object): self._get_backup(self.config.cert_key_backup, row[0], row[2])) @classmethod - def store_cert_key(cls, cert_path, key_path, config, encrypt=False): + def store_cert_key(cls, cert_path, key_path, config): """Store certificate key. (Used to allow quick revocation) :param str cert_path: Path to a certificate file. - :param key_path: Authorized key for certificate - :type key_path: :class:`letsencrypt.client.le_util.Key` + :param str key_path: Path to authorized key for certificate + :ivar config: Configuration. :type config: :class:`~letsencrypt.client.interfaces.IConfig` - :param bool encrypt: Should the certificate key be encrypted? - - :returns: True if key file was stored successfully, False otherwise. - :rtype: bool - """ - list_path = (config.cert_key_backup, "LIST") - le_util.make_or_verify_dir(config.cert_key_backup, 0o700) - - if encrypt: - logging.error( - "Unfortunately securely storing the certificates/" - "keys is not yet available. Stay tuned for the " - "next update!") - return False + list_path = os.path.join(config.cert_key_backup, "LIST") + le_util.make_or_verify_dir(config.cert_key_backup, 0o700, os.geteuid()) cls._catalog_files( config.cert_key_backup, cert_path, key_path, list_path) - return True @classmethod def _catalog_files(cls, backup_dir, cert_path, key_path, list_path): @@ -319,11 +314,13 @@ class Revoker(object): @classmethod def _copy_files(cls, backup_dir, idx, cert_path, key_path): + """Copies the files into the backup dir appropriately.""" shutil.copy2(cert_path, cls._get_backup(backup_dir, idx, cert_path)) shutil.copy2(key_path, cls._get_backup(backup_dir, idx, key_path)) @classmethod def _get_backup(cls, backup_dir, idx, orig_path): + """Returns the path to the backup.""" return os.path.join( backup_dir, "{name}_{idx}".format( name=os.path.basename(orig_path), idx=str(idx))) @@ -336,9 +333,9 @@ class Cert(object): :type cert: :class:`M2Crypto.X509` :ivar int idx: convenience index used for listing - :ivar orig: (`str` original certificate filepath, `str` status) - :type orig: PathStatus - :ivar orig_key: named tuple with(`str` original auth key path, `str` status) + :ivar orig: (`str` path - original certificate, `str` status) + :type orig: :class:`PathStatus` + :ivar orig_key: (`str` path - original auth key, `str` status) :type orig_key: :class:`PathStatus` :ivar str backup_path: backup filepath of the certificate :ivar str backup_key_path: backup filepath of the authorized key @@ -350,6 +347,9 @@ class Cert(object): PathStatus = collections.namedtuple("PathStatus", "path status") """Convenience container to hold path and status info""" + DELETED_MSG = "This file has been moved or deleted" + CHANGED_MSG = "This file has changed" + def __init__(self, cert_path): """Cert initialization @@ -373,7 +373,7 @@ class Cert(object): self.installed = ["Unknown"] @classmethod - def fromrow(cls, row, backup_dir): + def fromrow(cls, row, backup_dir): # pylint: disable=protected-access """Initialize Cert from a csv row.""" idx = int(row[0]) backup = Revoker._get_backup(backup_dir, idx, row[1]) @@ -381,10 +381,13 @@ class Cert(object): obj = cls(backup) obj.add_meta(idx, row[1], row[2], backup, backup_key) + return obj def get_row(self): - """Returns a list in CSV format.""" - return [str(self.idx), self.orig, self.orig_key] + """Returns a list in CSV format. If meta data is available.""" + if self.orig is not None and self.orig_key is not None: + return [str(self.idx), self.orig.path, self.orig_key.path] + return None def add_meta(self, idx, orig, orig_key, backup, backup_key): """Add meta data to cert @@ -396,29 +399,27 @@ class Cert(object): :param str backup_key: backup key filepath """ - deleted_msg = "This file has been moved or deleted" - changed_msg = "This file has changed" status = "" key_status = "" # Verify original cert path if not os.path.isfile(orig): - status = deleted_msg + status = Cert.DELETED_MSG else: o_cert = M2Crypto.X509.load_cert(orig) if self.get_fingerprint() != o_cert.get_fingerprint(md="sha1"): - status = changed_msg + status = Cert.CHANGED_MSG # Verify original key path if not os.path.isfile(orig_key): - key_status = deleted_msg + key_status = Cert.DELETED_MSG else: with open(orig_key, "r") as fd: key_pem = fd.read() with open(backup_key, "r") as fd: backup_key_pem = fd.read() if key_pem != backup_key_pem: - key_status = changed_msg + key_status = Cert.CHANGED_MSG self.idx = idx self.orig = Cert.PathStatus(orig, status) @@ -488,4 +489,5 @@ class Cert(object): frame = "-" * (display_util.WIDTH - 4) + os.linesep return "{frame}{cert}{frame}".format(frame=frame, cert=str(self)) - + def __eq__(self, other): + return self.cert.as_der() == other.cert.as_der() \ No newline at end of file diff --git a/letsencrypt/client/tests/apache/parser_test.py b/letsencrypt/client/tests/apache/parser_test.py index 3f5cc36ae..f30927886 100644 --- a/letsencrypt/client/tests/apache/parser_test.py +++ b/letsencrypt/client/tests/apache/parser_test.py @@ -9,7 +9,7 @@ import mock import zope.component from letsencrypt.client import errors -from letsencrypt.client.display import display_util +from letsencrypt.client.display import util as display_util from letsencrypt.client.tests.apache import util diff --git a/letsencrypt/client/tests/configuration_test.py b/letsencrypt/client/tests/configuration_test.py index a07953396..dde1f44cb 100644 --- a/letsencrypt/client/tests/configuration_test.py +++ b/letsencrypt/client/tests/configuration_test.py @@ -9,7 +9,8 @@ class NamespaceConfigTest(unittest.TestCase): def setUp(self): from letsencrypt.client.configuration import NamespaceConfig - namespace = mock.MagicMock(work_dir='/tmp/foo', foo='bar') + namespace = mock.MagicMock( + work_dir='/tmp/foo', foo='bar', server='acme-server.org:443') self.config = NamespaceConfig(namespace) def test_proxy_getattr(self): @@ -24,7 +25,8 @@ class NamespaceConfigTest(unittest.TestCase): constants.REC_TOKEN_DIR = '/r' self.assertEqual(self.config.temp_checkpoint_dir, '/tmp/foo/t') self.assertEqual(self.config.in_progress_dir, '/tmp/foo/../p') - self.assertEqual(self.config.cert_key_backup, '/tmp/foo/c/') + self.assertEqual( + self.config.cert_key_backup, '/tmp/foo/c/acme-server.org') self.assertEqual(self.config.rec_token_dir, '/r') diff --git a/letsencrypt/client/tests/display/enhancements_test.py b/letsencrypt/client/tests/display/enhancements_test.py index dfdac52e2..a7fb7f246 100644 --- a/letsencrypt/client/tests/display/enhancements_test.py +++ b/letsencrypt/client/tests/display/enhancements_test.py @@ -5,7 +5,7 @@ import unittest import mock from letsencrypt.client import errors -from letsencrypt.client.display import display_util +from letsencrypt.client.display import util as display_util class AskTest(unittest.TestCase): diff --git a/letsencrypt/client/tests/display/ops_test.py b/letsencrypt/client/tests/display/ops_test.py index 10d8d9642..7b4a6d8b5 100644 --- a/letsencrypt/client/tests/display/ops_test.py +++ b/letsencrypt/client/tests/display/ops_test.py @@ -5,7 +5,7 @@ import unittest import mock import zope.component -from letsencrypt.client.display import display_util +from letsencrypt.client.display import util as display_util class ChooseAuthenticatorTest(unittest.TestCase): diff --git a/letsencrypt/client/tests/display/revocation_test.py b/letsencrypt/client/tests/display/revocation_test.py index c78a726e1..359e80c5e 100644 --- a/letsencrypt/client/tests/display/revocation_test.py +++ b/letsencrypt/client/tests/display/revocation_test.py @@ -6,19 +6,19 @@ import unittest import mock import zope.component -from letsencrypt.client.display import display_util +from letsencrypt.client.display import util as display_util class ChooseCertsTest(unittest.TestCase): def setUp(self): from letsencrypt.client.revoker import Cert base_package = "letsencrypt.client.tests" - self.cert1 = Cert(pkg_resources.resource_filename( + self.cert0 = Cert(pkg_resources.resource_filename( base_package, os.path.join("testdata", "cert.pem"))) - self.cert2 = Cert(pkg_resources.resource_filename( + self.cert1 = Cert(pkg_resources.resource_filename( base_package, os.path.join("testdata", "cert-san.pem"))) - self.certs = [self.cert1, self.cert2] + self.certs = [self.cert0, self.cert1] zope.component.provideUtility(display_util.FileDisplay(sys.stdout)) @@ -27,13 +27,56 @@ class ChooseCertsTest(unittest.TestCase): from letsencrypt.client.display.revocation import choose_certs return choose_certs(certs) - #@mock.patch("letsencrypt.client.display.revocation.util") - def test_confirm_revocation(self): - pass - #mock_util().yesno.return_value = True - self._call(self.certs) + @mock.patch("letsencrypt.client.display.revocation.util") + def test_confirm_revocation(self, mock_util): + mock_util().yesno.return_value = True + mock_util().menu.return_value = (display_util.OK, 0) + choice = self._call(self.certs) + self.assertTrue(self.certs[choice] == self.cert0) + + @mock.patch("letsencrypt.client.display.revocation.util") + def test_confirm_cancel(self, mock_util): + mock_util().yesno.return_value = False + mock_util().menu.side_effect = [ + (display_util.OK, 0), + (display_util.CANCEL, -1) + ] + + self.assertRaises(SystemExit, self._call, self.certs) + + @mock.patch("letsencrypt.client.display.revocation.util") + def test_more_info(self, mock_util): + mock_util().menu.side_effect = [ + (display_util.HELP, 1), + (display_util.OK, 1), + ] + mock_util().yesno.return_value = True + + choice = self._call(self.certs) + + self.assertTrue(self.certs[choice] == self.cert1) + self.assertEqual(mock_util().notification.call_count, 1) + +class SuccessRevocationTest(unittest.TestCase): + def setUp(self): + from letsencrypt.client.revoker import Cert + base_package = "letsencrypt.client.tests" + self.cert = Cert(pkg_resources.resource_filename( + base_package, os.path.join("testdata", "cert.pem"))) + + @classmethod + def _call(cls, cert): + from letsencrypt.client.display.revocation import success_revocation + success_revocation(cert) + + # Pretty trivial test... something is displayed... + @mock.patch("letsencrypt.client.display.revocation.util") + def test_success_revocation(self, mock_util): + self._call(self.cert) + + self.assertEqual(mock_util().notification.call_count, 1) if __name__ == "__main__": unittest.main() diff --git a/letsencrypt/client/tests/display/util_test.py b/letsencrypt/client/tests/display/util_test.py index 3c81a463f..3a0c78e94 100644 --- a/letsencrypt/client/tests/display/util_test.py +++ b/letsencrypt/client/tests/display/util_test.py @@ -4,7 +4,7 @@ import unittest import mock -from letsencrypt.client.display import display_util +from letsencrypt.client.display import util as display_util class DisplayT(unittest.TestCase): @@ -42,13 +42,13 @@ class NcursesDisplayTest(DisplayT): super(NcursesDisplayTest, self).setUp() self.displayer = display_util.NcursesDisplay() - @mock.patch("letsencrypt.client.display.display_util.dialog.Dialog.msgbox") + @mock.patch("letsencrypt.client.display.util.dialog.Dialog.msgbox") def test_notification(self, mock_msgbox): """Kind of worthless... one liner.""" self.displayer.notification("message") self.assertEqual(mock_msgbox.call_count, 1) - @mock.patch("letsencrypt.client.display.display_util.dialog.Dialog.menu") + @mock.patch("letsencrypt.client.display.util.dialog.Dialog.menu") def test_menu_tag_and_desc(self, mock_menu): mock_menu.return_value = (display_util.OK, "First") @@ -61,7 +61,7 @@ class NcursesDisplayTest(DisplayT): self.assertEqual(ret, (display_util.OK, 0)) - @mock.patch("letsencrypt.client.display.display_util.dialog.Dialog.menu") + @mock.patch("letsencrypt.client.display.util.dialog.Dialog.menu") def test_menu_tag_and_desc_cancel(self, mock_menu): mock_menu.return_value = (display_util.CANCEL, "") @@ -76,7 +76,7 @@ class NcursesDisplayTest(DisplayT): self.assertEqual(ret, (display_util.CANCEL, -1)) - @mock.patch("letsencrypt.client.display.display_util.dialog.Dialog.menu") + @mock.patch("letsencrypt.client.display.util.dialog.Dialog.menu") def test_menu_desc_only(self, mock_menu): mock_menu.return_value = (display_util.OK, "1") @@ -91,13 +91,21 @@ class NcursesDisplayTest(DisplayT): self.assertEqual(ret, (display_util.OK, 0)) - @mock.patch("letsencrypt.client.display.display_util." + @mock.patch("letsencrypt.client.display.util.dialog.Dialog.menu") + def test_menu_desc_only_cancel(self, mock_menu): + mock_menu.return_value = (display_util.CANCEL, "") + + ret = self.displayer.menu("Message", self.tags, help_label="More Info") + + self.assertEqual(ret, (display_util.CANCEL, -1)) + + @mock.patch("letsencrypt.client.display.util." "dialog.Dialog.inputbox") def test_input(self, mock_input): self.displayer.input("message") mock_input.assert_called_with("message") - @mock.patch("letsencrypt.client.display.display_util.dialog.Dialog.yesno") + @mock.patch("letsencrypt.client.display.util.dialog.Dialog.yesno") def test_yesno(self, mock_yesno): mock_yesno.return_value = display_util.OK @@ -107,7 +115,7 @@ class NcursesDisplayTest(DisplayT): "message", display_util.HEIGHT, display_util.WIDTH, yes_label="Yes", no_label="No") - @mock.patch("letsencrypt.client.display.display_util." + @mock.patch("letsencrypt.client.display.util." "dialog.Dialog.checklist") def test_checklist(self, mock_checklist): self.displayer.checklist("message", self.tags) @@ -149,7 +157,7 @@ class FileOutputDisplayTest(DisplayT): self.assertTrue("message" in self.mock_stdout.write.call_args[0][0]) - @mock.patch("letsencrypt.client.display.display_util." + @mock.patch("letsencrypt.client.display.util." "FileDisplay._get_valid_int_ans") def test_menu(self, mock_ans): mock_ans.return_value = (display_util.OK, 1) @@ -179,14 +187,14 @@ class FileOutputDisplayTest(DisplayT): with mock.patch("__builtin__.raw_input", return_value="a"): self.assertTrue(self.displayer.yesno("msg", yes_label="Agree")) - @mock.patch("letsencrypt.client.display.display_util.FileDisplay.input") + @mock.patch("letsencrypt.client.display.util.FileDisplay.input") def test_checklist_valid(self, mock_input): mock_input.return_value = (display_util.OK, "2 1") code, tag_list = self.displayer.checklist("msg", self.tags) self.assertEqual( (code, set(tag_list)), (display_util.OK, set(["tag1", "tag2"]))) - @mock.patch("letsencrypt.client.display.display_util.FileDisplay.input") + @mock.patch("letsencrypt.client.display.util.FileDisplay.input") def test_checklist_miss_valid(self, mock_input): mock_input.side_effect = [ (display_util.OK, "10"), @@ -197,7 +205,7 @@ class FileOutputDisplayTest(DisplayT): ret = self.displayer.checklist("msg", self.tags) self.assertEqual(ret, (display_util.OK, ["tag1"])) - @mock.patch("letsencrypt.client.display.display_util.FileDisplay.input") + @mock.patch("letsencrypt.client.display.util.FileDisplay.input") def test_checklist_miss_quit(self, mock_input): mock_input.side_effect = [ (display_util.OK, "10"), @@ -288,7 +296,7 @@ class SeparateListInputTest(unittest.TestCase): @classmethod def _call(cls, input_): - from letsencrypt.client.display.display_util import separate_list_input + from letsencrypt.client.display.util import separate_list_input return separate_list_input(input_) def test_commas(self): @@ -314,7 +322,7 @@ class SeparateListInputTest(unittest.TestCase): class PlaceParensTest(unittest.TestCase): @classmethod def _call(cls, label): # pylint: disable=protected-access - from letsencrypt.client.display.display_util import _parens_around_char + from letsencrypt.client.display.util import _parens_around_char return _parens_around_char(label) def test_single_letter(self): diff --git a/letsencrypt/client/tests/revoker_test.py b/letsencrypt/client/tests/revoker_test.py index b62b9e1b7..47b057220 100644 --- a/letsencrypt/client/tests/revoker_test.py +++ b/letsencrypt/client/tests/revoker_test.py @@ -1,6 +1,326 @@ +import csv +import os +import pkg_resources +import shutil +import tempfile import unittest -import package + +import mock + +from letsencrypt.client import errors +from letsencrypt.client import le_util + + +class RevokerBase(unittest.TestCase): + def setUp(self): + self.paths, self.certs, self.key_path = create_revoker_certs() + + self.backup_dir = tempfile.mkdtemp("cert_backup") + self.mock_config = mock.MagicMock(cert_key_backup=self.backup_dir) + + self.list_path = os.path.join(self.backup_dir, "LIST") + + def _store_certs(self): + from letsencrypt.client.revoker import Revoker + Revoker.store_cert_key(self.paths[0], self.key_path, self.mock_config) + Revoker.store_cert_key(self.paths[1], self.key_path, self.mock_config) + + # Set metadata + for i in xrange(2): + self.certs[i].add_meta( + i, self.paths[i], self.key_path, + Revoker._get_backup(self.backup_dir, i, self.paths[i]), + Revoker._get_backup(self.backup_dir, i, self.key_path)) + + def _get_rows(self): + with open(self.list_path, "rb") as csvfile: + return [row for row in csv.reader(csvfile)] + + def _write_rows(self, rows): + with open(self.list_path, "wb") as csvfile: + csvwriter = csv.writer(csvfile) + for row in rows: + csvwriter.writerow(row) + + +class RevokerTest(RevokerBase): + def setUp(self): + from letsencrypt.client.revoker import Revoker + super(RevokerTest, self).setUp() + + with open(self.key_path) as key_file: + self.key = le_util.Key(self.key_path, key_file.read()) + + self._store_certs() + + self.mock_installer = mock.MagicMock() + self.revoker = Revoker(self.mock_installer, self.mock_config) + + def tearDown(self): + shutil.rmtree(self.backup_dir) + + @mock.patch("letsencrypt.client.revoker.network." + "Network.send_and_receive_expected") + @mock.patch("letsencrypt.client.revoker.revocation") + def test_revoke_by_key_all(self, mock_display, mock_net): + mock_display().confirm_revocation.return_value = True + + self.revoker.revoke_from_key(self.key) + self.assertEqual(self._get_rows(), []) + + # Check to make sure backups were eliminated + for i in xrange(2): + self.assertFalse(self._backups_exist(self.certs[i].get_row())) + + self.assertEqual(mock_net.call_count, 2) + + @mock.patch("letsencrypt.client.revoker.network." + "Network.send_and_receive_expected") + @mock.patch("letsencrypt.client.revoker.revocation") + def test_revoke_by_cert(self, mock_display, mock_net): + mock_display().confirm_revocation.return_value = True + + self.revoker.revoke_from_cert(self.paths[1]) + + row0 = self.certs[0].get_row() + row1 = self.certs[1].get_row() + + self.assertEqual(self._get_rows(), [row0]) + + self.assertTrue(self._backups_exist(row0)) + self.assertFalse(self._backups_exist(row1)) + + self.assertEqual(mock_net.call_count, 1) + + @mock.patch("letsencrypt.client.revoker.network." + "Network.send_and_receive_expected") + @mock.patch("letsencrypt.client.revoker.revocation") + def test_revoke_by_menu(self, mock_display, mock_net): + mock_display().confirm_revocation.return_value = True + mock_display.choose_certs.side_effect = [0, SystemExit] + + self.assertRaises(SystemExit, self.revoker.revoke_from_menu) + + row0 = self.certs[0].get_row() + row1 = self.certs[1].get_row() + + self.assertEqual(self._get_rows(), [row1]) + + self.assertFalse(self._backups_exist(row0)) + self.assertTrue(self._backups_exist(row1)) + + self.assertEqual(mock_net.call_count, 1) + + @mock.patch("letsencrypt.client.revoker.logging") + @mock.patch("letsencrypt.client.revoker.network." + "Network.send_and_receive_expected") + @mock.patch("letsencrypt.client.revoker.revocation") + def test_revoke_by_menu_delete_all(self, mock_display, mock_net, mock_log): + mock_display().confirm_revocation.return_value = True + mock_display.choose_certs.return_value = 0 + + self.revoker.revoke_from_menu() + + self.assertEqual(self._get_rows(), []) + + # Everything should be deleted... + for i in xrange(2): + self.assertFalse(self._backups_exist(self.certs[i].get_row())) + + self.assertEqual(mock_net.call_count, 2) + # Info is called when there aren't any certs left... + self.assertTrue(mock_log.info.called) + + @mock.patch("letsencrypt.client.revoker.revocation") + @mock.patch("letsencrypt.client.revoker.Revoker._acme_revoke") + @mock.patch("letsencrypt.client.revoker.logging") + def test_safe_revoke_acme_fail(self, mock_log, mock_revoke, mock_display): + mock_revoke.side_effect = errors.LetsEncryptClientError + mock_display().confirm_revocation.return_value = True + + self.revoker._safe_revoke(self.certs) + self.assertTrue(mock_log.error.called) + + @mock.patch("letsencrypt.client.revoker.Crypto.PublicKey.RSA.importKey") + def test_acme_revoke_failure(self, mock_crypto): + mock_crypto.side_effect = IOError + self.assertRaises(errors.LetsEncryptClientError, + self.revoker._acme_revoke, + self.certs[0]) + + def test_remove_certs_from_list_bad_certs(self): + from letsencrypt.client.revoker import Cert + + new_cert = Cert(self.paths[0]) + + # This isn't stored in the db + new_cert.idx = 10 + new_cert.backup_path = self.paths[0] + new_cert.backup_key_path = self.key_path + new_cert.orig = Cert.PathStatus("false path", "not here") + new_cert.orig_key = Cert.PathStatus("false path", "not here") + + self.assertRaises(errors.LetsEncryptRevokerError, + self.revoker._remove_certs_from_list, + [new_cert]) + + def _backups_exist(self, row): + cert_path, key_path = self.revoker._row_to_backup(row) + return os.path.isfile(cert_path) and os.path.isfile(key_path) + + +class RevokerInstallerTest(RevokerBase): + def setUp(self): + super(RevokerInstallerTest, self).setUp() + + self.installs = [ + ["installation/path0a", "installation/path0b"], + ["installation/path1"], + ] + + self.certs_keys = [ + (self.paths[0], self.key_path, self.installs[0][0]), + (self.paths[0], self.key_path, self.installs[0][1]), + (self.paths[1], self.key_path, self.installs[1][0]), + ] + + self._store_certs() + + def _get_revoker(self, installer): + from letsencrypt.client.revoker import Revoker + return Revoker(installer, self.mock_config) + + def test_no_installer_get_installed_locations(self): + revoker = self._get_revoker(None) + self.assertEqual(revoker._get_installed_locations(), {}) + + def test_get_installed_locations(self): + mock_installer = mock.MagicMock() + mock_installer.get_all_certs_keys.return_value = self.certs_keys + + revoker = self._get_revoker(mock_installer) + sha_vh = revoker._get_installed_locations() + + self.assertEqual(len(sha_vh), 2) + for i, cert in enumerate(self.certs): + self.assertTrue(cert.get_fingerprint() in sha_vh) + self.assertEqual( + sha_vh[cert.get_fingerprint()], self.installs[i]) + + @mock.patch("letsencrypt.client.revoker.M2Crypto.X509.load_cert") + def test_get_installed_load_failure(self, mock_m2): + mock_installer = mock.MagicMock() + mock_installer.get_all_certs_keys.return_value = self.certs_keys + + mock_m2.side_effect = IOError + + revoker = self._get_revoker(mock_installer) + + self.assertEqual(revoker._get_installed_locations(), {}) + +class RevokerClassMethodsTest(RevokerBase): + def setUp(self): + super(RevokerClassMethodsTest, self).setUp() + self.mock_config = mock.MagicMock(cert_key_backup=self.backup_dir) + + def tearDown(self): + shutil.rmtree(self.backup_dir) + + def _call(self, cert_path, key_path): + from letsencrypt.client.revoker import Revoker + Revoker.store_cert_key(cert_path, key_path, self.mock_config) + + def test_store_two(self): + from letsencrypt.client.revoker import Revoker + self._call(self.paths[0], self.key_path) + self._call(self.paths[1], self.key_path) + + self.assertTrue(os.path.isfile(self.list_path)) + rows = self._get_rows() + i = 0 + for i, row in enumerate(rows): + self.assertTrue(os.path.isfile( + Revoker._get_backup(self.backup_dir, i, self.paths[i]))) + self.assertTrue(os.path.isfile( + Revoker._get_backup(self.backup_dir, i, self.key_path))) + self.assertEqual([str(i), self.paths[i], self.key_path], row) + + self.assertEqual(i, 1) + + def test_store_one_mixed(self): + from letsencrypt.client.revoker import Revoker + self._write_rows( + [["5", "blank", "blank"], ["18", "dc", "dc"], ["21", "b", "b"]]) + self._call(self.paths[0], self.key_path) + + self.assertEqual( + self._get_rows()[3], ["22", self.paths[0], self.key_path]) + + self.assertTrue(os.path.isfile( + Revoker._get_backup(self.backup_dir, 22, self.paths[0]))) + self.assertTrue(os.path.isfile( + Revoker._get_backup(self.backup_dir, 22, self.key_path))) class CertTest(unittest.TestCase): def setUp(self): + self.paths, self.certs, self.key_path = create_revoker_certs() + + def test_failed_load(self): + from letsencrypt.client.revoker import Cert + self.assertRaises(errors.LetsEncryptRevokerError, Cert, self.key_path) + + def test_no_row(self): + self.assertEqual(self.certs[0].get_row(), None) + + def test_meta_moved_files(self): + from letsencrypt.client.revoker import Cert + fake_path = "/not/a/real/path/r72d3t6" + self.certs[0].add_meta( + 0, fake_path, fake_path, self.paths[0], self.key_path) + + self.assertEqual(self.certs[0].orig.status, Cert.DELETED_MSG) + self.assertEqual(self.certs[0].orig_key.status, Cert.DELETED_MSG) + + def test_meta_changed_files(self): + from letsencrypt.client.revoker import Cert + self.certs[0].add_meta( + 0, self.paths[1], self.paths[1], self.paths[0], self.key_path) + + self.assertEqual(self.certs[0].orig.status, Cert.CHANGED_MSG) + self.assertEqual(self.certs[0].orig_key.status, Cert.CHANGED_MSG) + + def test_meta_no_status(self): + self.certs[0].add_meta( + 0, self.paths[0], self.key_path, self.paths[0], self.key_path) + + self.assertEqual(self.certs[0].orig.status, "") + self.assertEqual(self.certs[0].orig_key.status, "") + + def test_print(self): + """Just make sure there aren't any errors.""" + self.assertTrue(self.certs[0].pretty_print()) + self.assertTrue(self.certs[1].pretty_print()) + +def create_revoker_certs(): + from letsencrypt.client.revoker import Cert + + base_package = "letsencrypt.client.tests" + + cert0_path = pkg_resources.resource_filename( + base_package, os.path.join("testdata", "cert.pem")) + + cert1_path = pkg_resources.resource_filename( + base_package, os.path.join("testdata", "cert-san.pem")) + + cert0 = Cert(cert0_path) + cert1 = Cert(cert1_path) + + key_path = pkg_resources.resource_filename( + base_package, os.path.join("testdata", "rsa512_key.pem")) + + return [cert0_path, cert1_path], [cert0, cert1], key_path + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/letsencrypt/scripts/main.py b/letsencrypt/scripts/main.py index 2e3922b32..8e2589f15 100755 --- a/letsencrypt/scripts/main.py +++ b/letsencrypt/scripts/main.py @@ -20,7 +20,7 @@ from letsencrypt.client import errors from letsencrypt.client import interfaces from letsencrypt.client import le_util from letsencrypt.client import log -from letsencrypt.client.display import display_util +from letsencrypt.client.display import util as display_util from letsencrypt.client.display import ops diff --git a/tox.ini b/tox.ini index 7a5f3810d..d4af50fa5 100644 --- a/tox.ini +++ b/tox.ini @@ -17,7 +17,7 @@ setenv = basepython = python2.7 commands = pip install -e .[testing] - python setup.py nosetests --with-coverage --cover-min-percentage=73 + python setup.py nosetests --with-coverage --cover-min-percentage=83 [testenv:lint] # recent versions of pylint do not support Python 2.6 (#97, #187)