From 9a990ccfaf97426b8a4759f1da58f525647275ea Mon Sep 17 00:00:00 2001 From: James Kasten Date: Sun, 15 Feb 2015 23:17:53 -0800 Subject: [PATCH] revoker progress --- letsencrypt/client/auth_handler.py | 7 +- letsencrypt/client/client.py | 14 +- letsencrypt/client/constants.py | 2 +- letsencrypt/client/display/revocation.py | 14 +- letsencrypt/client/revoker.py | 195 ++++++++++++------ .../client/tests/display/revocation_test.py | 39 ++++ letsencrypt/scripts/main.py | 19 +- 7 files changed, 201 insertions(+), 89 deletions(-) create mode 100644 letsencrypt/client/tests/display/revocation_test.py diff --git a/letsencrypt/client/auth_handler.py b/letsencrypt/client/auth_handler.py index 6f0ece535..ed785a5f1 100644 --- a/letsencrypt/client/auth_handler.py +++ b/letsencrypt/client/auth_handler.py @@ -149,10 +149,11 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes for dom in self.domains: flat_client.extend(ichall.chall for ichall in self.client_c[dom]) flat_auth.extend(ichall.chall for ichall in self.dv_c[dom]) - try: - client_resp = self.client_auth.perform(flat_client) - dv_resp = self.dv_auth.perform(flat_auth) + if flat_client: + client_resp = self.client_auth.perform(flat_client) + if flat_auth: + dv_resp = self.dv_auth.perform(flat_auth) # This will catch both specific types of errors. except errors.LetsEncryptAuthHandlerError as err: logging.critical("Failure in setting up challenges:") diff --git a/letsencrypt/client/client.py b/letsencrypt/client/client.py index 1385e3a94..09a8aeaee 100644 --- a/letsencrypt/client/client.py +++ b/letsencrypt/client/client.py @@ -231,7 +231,7 @@ class Client(object): try: self.installer.enhance(dom, "redirect") except errors.LetsEncryptConfiguratorError: - logging.warn('Unable to perform redirect for %s', dom) + logging.warn("Unable to perform redirect for %s", dom) self.installer.save("Add Redirects") self.installer.restart() @@ -448,7 +448,7 @@ def _misconfigured_rollback(checkpoints, config): "Rollback was unable to solve the misconfiguration issues") -def revoke(config): +def revoke(config, no_confirm, cert, authkey): """Revoke certificates. :param config: Configuration. @@ -466,8 +466,14 @@ def revoke(config): "installed may not be available.") installer = None - revoc = revoker.Revoker(installer, config) - revoc.display_menu() + revoc = revoker.Revoker(installer, config, no_confirm) + # Cert is most selective, so it is chosen first. + if cert is not None: + revoc.revoke_from_cert(cert[0]) + elif authkey is not None: + revoc.revoke_from_key(le_util.Key(authkey[0], authkey[1])) + else: + revoc.display_menu() def view_config_changes(config): diff --git a/letsencrypt/client/constants.py b/letsencrypt/client/constants.py index e30a4b725..291506940 100644 --- a/letsencrypt/client/constants.py +++ b/letsencrypt/client/constants.py @@ -36,7 +36,7 @@ List of expected options parameters: APACHE_MOD_SSL_CONF = pkg_resources.resource_filename( - 'letsencrypt.client.apache', 'options-ssl.conf') + "letsencrypt.client.apache", "options-ssl.conf") """Path to the Apache mod_ssl config file found in the Let's Encrypt distribution.""" diff --git a/letsencrypt/client/display/revocation.py b/letsencrypt/client/display/revocation.py index 04394a11e..0646adb10 100644 --- a/letsencrypt/client/display/revocation.py +++ b/letsencrypt/client/display/revocation.py @@ -46,20 +46,17 @@ def display_certs(certs): """ list_choices = [ - ("%s | %s | %s" % ( + "%s | %s | %s" % ( str(cert.get_cn().ljust(display_util.WIDTH - 39)), cert.get_not_before().strftime("%m-%d-%y"), "Installed" if cert.installed and cert.installed != ["Unknown"] - else "") - for cert in enumerate(certs) - ) + else "") for cert in certs ] + print list_choices code, tag = util(interfaces.IDisplay).menu( "Which certificates would you like to revoke?", - "Revoke number (c to cancel): ", - choices=list_choices, help_button=True, - help_label="More Info", ok_label="Revoke", + list_choices, help_label="More Info", ok_label="Revoke", cancel_label="Exit") if not tag: tag = -1 @@ -81,8 +78,7 @@ def confirm_revocation(cert): "certificate:{0}".format(os.linesep)) text += cert.pretty_print() text += "This action cannot be reversed!" - return display_util.OK == util(interfaces.IDisplay).yesno( - text, width=display_util.WIDTH, height=display_util.HEIGHT) + return display_util.OK == util(interfaces.IDisplay).yesno(text) def more_info_cert(cert): diff --git a/letsencrypt/client/revoker.py b/letsencrypt/client/revoker.py index 1dd2c0975..53da5d028 100644 --- a/letsencrypt/client/revoker.py +++ b/letsencrypt/client/revoker.py @@ -1,4 +1,11 @@ -"""Revoker module to enable LE revocations.""" +"""Revoker module to enable LE revocations. + +The backend of this module would fit a database quite nicely, but in order to +minimize dependencies and maintain transparency, the class currently implements +its own storage system. The number of certs that will likely be stored on any +given client might not warrant requiring a database. + +""" import collections import csv import logging @@ -32,57 +39,89 @@ class Revoker(object): :type config: :class:`~letsencrypt.client.interfaces.IConfig` """ - def __init__(self, installer, config): + def __init__(self, installer, config, no_confirm=False): self.network = network.Network(config.server) self.installer = installer self.config = config + self.no_confirm = no_confirm le_util.make_or_verify_dir(config.cert_key_backup, 0o700) # TODO: Find a better solution for this... self.list_path = os.path.join(config.cert_key_backup, "LIST") - def revoke_from_interface(self, cert): - """Handle ACME "revocation" phase. + def safe_revoke(self, certs): + """Confirm and revoke certificates. - :param cert: cert intended to be revoked - :type cert: :class:`letsencrypt.client.revoker.Cert` + :param certs: certs intended to be revoked + :type certs: :class:`letsencrypt.client.revoker.Cert` """ - revoc = self.revoke(cert.backup_path, cert.backup_key_path) + success_list = [] + try: + for cert in certs: + if self.no_confirm or revocation.confirm_revocation(cert): + revoc = self._acme_revoke( + cert.backup_path, cert.backup_key_path) - self.remove_cert_key([cert.idx, cert.backup_path, cert.backup_key_path]) + if revoc is not None: + success_list.append(cert) + revocation.success_revocation(cert) + else: + # TODO: Display a nice explanation + pass + finally: + self._remove_certs_keys(success_list) - if revoc is not None: - revocation.success_revocation(cert) - else: - # TODO: Display a nice explanation - pass + def revoke_from_key(self, authkey): + """Revoke all certificates under an authorized key. - self.display_menu() + :param authkey: Authorized key used in previous transactions + :type authkey: :class:`letsencrypt.client.le_util.Key` - def revoke_from_key(self, auth_key): - marked = [] + """ + certs = [] with open(self.list_path, "r") as csvfile: csvreader = csv.reader(csvfile) for row in csvreader: # idx, cert, key # Add all keys that match to marked list - # TODO: This doesn't account for padding in file that might + # TODO: This doesn't account for padding in the file that might # differ. This should only consider the key material. # Note: The key can be different than the pub key found in the # certificate. - if auth_key.pem == open(row[2]).read(): - marked.append(row) + _, b_k = self._row_to_backup(row) + if authkey.pem == open(b_k).read(): + certs.append(Cert.fromrow(row)) - self.remove_certs_keys(marked) + self.safe_revoke(certs) - def revoke(self, cert_path, key_path): + def revoke_from_cert(self, cert_path): + """Revoke a certificate by specifying a file path. + + :param str cert_path: path to ACME certificate in pem form + + """ + # Locate the correct certificate (do not rely on filename) + cert_to_revoke = Cert(cert_path) + + with open(self.list_path, "r") as csvfile: + csvreader = csv.reader(csvfile) + for row in csvreader: + cert = Cert.fromrow(row) + + # This uses md5 but it doesn't matter and it is easier to read + if cert.get_fingerprint() == cert_to_revoke.get_fingerprint(): + self.safe_revoke([cert]) + + def _acme_revoke(self, cert_path, key_path): """Revoke the certificate with the ACME server. :param str cert_path: path to certificate file :param str key_path: path to associated private key or authorized key + :returns: TODO + """ try: cert_der = M2Crypto.X509.load_cert(cert_path).as_der() @@ -111,6 +150,8 @@ class Revoker(object): if certs: cert = revocation.choose_certs(certs) self.revoke_from_interface(cert) + # Recursive... + self.display_menu() else: logging.info( "There are not any trusted Let's Encrypt " @@ -132,15 +173,8 @@ class Revoker(object): csvreader = csv.reader(csvfile) # idx, orig_cert, orig_key for row in csvreader: - # Generate backup key/cert names - b_k = os.path.join(self.config.cert_key_backup, - os.path.basename(row[2]) + "_" + row[0]) - b_c = os.path.join(self.config.cert_key_backup, - os.path.basename(row[1]) + "_" + row[0]) + cert = Cert.fromrow(row) - cert = Cert(b_c) - # Set the meta data - cert.add_meta(int(row[0]), row[1], row[2], b_c, b_k) # If we were able to find the cert installed... update status cert.installed = csha1_vhlist.get(cert.get_fingerprint(), []) @@ -173,27 +207,26 @@ class Revoker(object): return csha1_vhlist - def remove_certs_keys(self, del_list): # pylint: disable=no-self-use + def _remove_certs_keys(self, cert_list): # pylint: disable=no-self-use """Remove certificate and key. - :param list del_list: each is a `list` in the form - [idx, cert_path, key_path] all entries must be in the original - LIST order + :param list cert_list: each is of type + :class:`letsencrypt.client.revoker.Cert` """ # This must occur first, LIST is the official key - self._remove_certs_from_list(del_list) + self._remove_certs_from_list(cert_list) # Remove files - for row in del_list: - os.remove(row[1]) - os.remove(row[2]) + for cert in cert_list: + os.remove(cert.backup_path) + os.remove(cert.backup_key_path) - def _remove_certs_from_list(self, del_list): # pylint: disable=no-self-use + def _remove_certs_from_list(self, cert_list): # pylint: disable=no-self-use """Remove a certificate from the LIST file. - :param list del_list: each is a csv row, all items must be in the - proper file order. + :param list cert_list: each is of type + :class:`letsencrypt.client.revoker.Cert` """ list_path2 = os.path.join(self.config.cert_key_backup, "LIST.tmp") @@ -201,25 +234,33 @@ class Revoker(object): idx = 0 with open(self.list_path, "rb") as orgfile: csvreader = csv.reader(orgfile) - with open(list_path2, "wb") as newfile: csvwriter = csv.writer(newfile) for row in csvreader: - if not (row[0] == str(del_list[idx][0]) and - row[1] == del_list[idx][1] and - row[2] == del_list[idx][2]): + if row != cert_list[idx].get_row(): csvwriter.writerow(row) else: - # Found one of the marked rows... on to the next idx += 1 - if idx != len(del_list): - errors.LetsEncryptRevokerError("Did not find all items in del_list") + if idx != len(cert_list): + errors.LetsEncryptRevokerError("Did not find all cert_list items") shutil.copy2(list_path2, self.list_path) os.remove(list_path2) + def _row_to_backup(self, row): + """Convenience function + + :param list row: csv file row 'idx', 'cert_path', 'key_path' + + :returns: tuple of the form ('backup_cert_path', 'backup_key_path') + :rtype: tuple + + """ + return (self._get_backup(self.config.cert_key_backup, row[0], row[1]), + self._get_backup(self.config.cert_key_backup, row[0], row[2])) + @classmethod def store_cert_key(cls, cert_path, key_path, config, encrypt=False): """Store certificate key. (Used to allow quick revocation) @@ -238,7 +279,6 @@ class Revoker(object): """ list_path = (config.cert_key_backup, "LIST") le_util.make_or_verify_dir(config.cert_key_backup, 0o700) - idx = 0 if encrypt: logging.error( @@ -247,39 +287,46 @@ class Revoker(object): "next update!") return False - cls._append_index_file(cert_path, key_path, list_path) - - shutil.copy2(key_path, - os.path.join( - config.cert_key_backup, - os.path.basename(key_path) + "_" + str(idx))) - shutil.copy2(cert_path, - os.path.join( - config.cert_key_backup, - os.path.basename(cert_path) + "_" + str(idx))) + cls._catalog_files( + config.cert_key_backup, cert_path, key_path, list_path) return True @classmethod - def _append_index_file(cls, cert_path, key_path, list_path): + def _catalog_files(cls, backup_dir, cert_path, key_path, list_path): if os.path.isfile(list_path): - with open(list_path, 'r+b') as csvfile: + with open(list_path, "r+b") as csvfile: csvreader = csv.reader(csvfile) # Find the highest index in the file for row in csvreader: idx = int(row[0]) + 1 csvwriter = csv.writer(csvfile) + # You must move the files before appending the row + cls._copy_files(backup_dir, idx, cert_path, key_path) csvwriter.writerow([str(idx), cert_path, key_path]) else: - with open(list_path, 'wb') as csvfile: + with open(list_path, "wb") as csvfile: csvwriter = csv.writer(csvfile) + # You must move the files before appending the row + cls._copy_files(backup_dir, "0", cert_path, key_path) csvwriter.writerow(["0", cert_path, key_path]) + @classmethod + def _copy_files(cls, backup_dir, idx, cert_path, key_path): + shutil.copy2(cert_path, cls._get_backup(backup_dir, idx, cert_path)) + shutil.copy2(key_path, cls._get_backup(backup_dir, idx, key_path)) + + @classmethod + def _get_backup(cls, backup_dir, idx, orig_path): + return os.path.join( + backup_dir, "{name}_{idx}".format( + name=os.path.basename(orig_path), idx=str(idx))) + class Cert(object): - """Cert object used for convenience. + """Cert object used for Revocation convenience. :ivar cert: M2Crypto X509 cert :type cert: :class:`M2Crypto.X509` @@ -309,7 +356,8 @@ class Cert(object): try: self.cert = M2Crypto.X509.load_cert(cert_path) except (IOError, M2Crypto.X509.X509Error): - self.cert = None + raise errors.LetsEncryptRevokerError( + "Error loading certificate: %s" % cert_path) self.idx = -1 @@ -320,6 +368,19 @@ class Cert(object): self.installed = ["Unknown"] + @classmethod + def fromrow(cls, row, backup_dir): + """Initialize Cert from a csv row.""" + idx = int(row[0]) + backup = Revoker._get_backup(backup_dir, idx, row[1]) + backup_key = Revoker._get_backup(backup_dir, idx, row[2]) + + obj = cls(backup) + obj.add_meta(idx, row[1], row[2], backup, backup_key) + + def get_row(self): + """Returns a list in CSV format.""" + return [str(self.idx), self.orig, self.orig_key] def add_meta(self, idx, orig, orig_key, backup, backup_key): """Add meta data to cert @@ -420,7 +481,7 @@ class Cert(object): def pretty_print(self): """Nicely frames a cert str""" - text = "-" * (display_util.WIDTH - 4) + os.linesep - text += str(self) - text += "-" * (display_util.WIDTH - 4) - return text + frame = "-" * (display_util.WIDTH - 4) + os.linesep + return "{frame}{cert}{frame}".format(frame=frame, cert=str(self)) + + diff --git a/letsencrypt/client/tests/display/revocation_test.py b/letsencrypt/client/tests/display/revocation_test.py new file mode 100644 index 000000000..c78a726e1 --- /dev/null +++ b/letsencrypt/client/tests/display/revocation_test.py @@ -0,0 +1,39 @@ +import os +import pkg_resources +import sys +import unittest + +import mock +import zope.component + +from letsencrypt.client.display import display_util + + +class ChooseCertsTest(unittest.TestCase): + def setUp(self): + from letsencrypt.client.revoker import Cert + base_package = "letsencrypt.client.tests" + self.cert1 = Cert(pkg_resources.resource_filename( + base_package, os.path.join("testdata", "cert.pem"))) + self.cert2 = Cert(pkg_resources.resource_filename( + base_package, os.path.join("testdata", "cert-san.pem"))) + + self.certs = [self.cert1, self.cert2] + + zope.component.provideUtility(display_util.FileDisplay(sys.stdout)) + + @classmethod + def _call(cls, certs): + from letsencrypt.client.display.revocation import choose_certs + return choose_certs(certs) + + #@mock.patch("letsencrypt.client.display.revocation.util") + def test_confirm_revocation(self): + pass + #mock_util().yesno.return_value = True + self._call(self.certs) + + + +if __name__ == "__main__": + unittest.main() diff --git a/letsencrypt/scripts/main.py b/letsencrypt/scripts/main.py index 44394b03d..2e3922b32 100755 --- a/letsencrypt/scripts/main.py +++ b/letsencrypt/scripts/main.py @@ -36,12 +36,18 @@ def create_parser(): add("-s", "--server", default="letsencrypt-demo.org:443", help=config_help("server")) - add("-p", "--privkey", type=read_file, - help="Path to the private key file for certificate generation.") + add("-k", "--authkey", type=read_file, + help="Path to the authorized key file") add("-B", "--rsa-key-size", type=int, default=2048, metavar="N", help=config_help("rsa_key_size")) - add("-k", "--revoke", action="store_true", help="Revoke a certificate.") + add("-R", "--revoke", action="store_true", + help="Revoke a certificate from a menu.") + add("--revoke-certificate", dest="rev_cert", type=read_file, + help="Revoke a specific certificate.") + add("--revoke-key", dest="rev_key", type=read_file, + help="Revoke all certs generated by the provided authorized key.") + add("-b", "--rollback", type=int, default=0, metavar="N", help="Revert configuration N number of checkpoints.") add("-v", "--view-config-changes", action="store_true", @@ -52,6 +58,9 @@ def create_parser(): help="Automatically redirect all HTTP traffic to HTTPS for the newly " "authenticated vhost.") + add("--no-confirm", dest="no_confirm", action="store_true", + help="Turn off confirmation screens, currently used for --revoke") + add("-e", "--agree-tos", dest="eula", action="store_true", help="Skip the end user license agreement screen.") add("-t", "--text", dest="use_curses", action="store_false", @@ -114,7 +123,7 @@ def main(): # pylint: disable=too-many-branches sys.exit() if args.revoke: - client.revoke(config) + client.revoke(config, args.no_confirm, args.rev_cert, args.rev_key) sys.exit() if args.rollback > 0: @@ -146,7 +155,7 @@ def main(): # pylint: disable=too-many-branches if args.privkey is None: privkey = client.init_key(args.rsa_key_size, config.key_dir) else: - privkey = le_util.Key(args.privkey[0], args.privkey[1]) + privkey = le_util.Key(args.authkey[0], args.authkey[1]) acme = client.Client(config, privkey, auth, installer)