diff --git a/letsencrypt/client/display/revocation.py b/letsencrypt/client/display/revocation.py index 812b298b5..2f162626d 100644 --- a/letsencrypt/client/display/revocation.py +++ b/letsencrypt/client/display/revocation.py @@ -65,7 +65,7 @@ def display_certs(certs): return code, (int(tag) - 1) -def confirm_revocation(self, cert): +def confirm_revocation(cert): """Confirm revocation screen. :param cert: certificate object diff --git a/letsencrypt/client/errors.py b/letsencrypt/client/errors.py index d49611ce7..086af84d4 100644 --- a/letsencrypt/client/errors.py +++ b/letsencrypt/client/errors.py @@ -39,3 +39,6 @@ class LetsEncryptNoInstallationError(LetsEncryptConfiguratorError): class LetsEncryptMisconfigurationError(LetsEncryptConfiguratorError): """Let's Encrypt Misconfiguration error.""" + +class LetsEncryptRevokerError(LetsEncryptClientError): + """Let's Encrypt Revoker error.""" \ No newline at end of file diff --git a/letsencrypt/client/le_util.py b/letsencrypt/client/le_util.py index 9087ff7a3..f78d734a1 100644 --- a/letsencrypt/client/le_util.py +++ b/letsencrypt/client/le_util.py @@ -76,6 +76,14 @@ def unique_file(path, mode=0o777): count += 1 +def safely_remove(path): + """Remove a file that may not exist.""" + try: + os.remove(path) + except OSError as err: + if err.errno != errno.ENOENT: + raise + # https://tools.ietf.org/html/draft-ietf-jose-json-web-signature-37#appendix-C # # Jose Base64: diff --git a/letsencrypt/client/recovery_token.py b/letsencrypt/client/recovery_token.py index 2c328a46d..84e91e891 100644 --- a/letsencrypt/client/recovery_token.py +++ b/letsencrypt/client/recovery_token.py @@ -50,7 +50,7 @@ class RecoveryToken(object): """ try: - os.remove(os.path.join(self.token_dir, chall.domain)) + le_util.safely_remove(os.path.join(self.token_dir, chall.domain)) except OSError as err: if err.errno != errno.ENOENT: raise diff --git a/letsencrypt/client/revoker.py b/letsencrypt/client/revoker.py index e49b8f652..d2c4a3b93 100644 --- a/letsencrypt/client/revoker.py +++ b/letsencrypt/client/revoker.py @@ -9,6 +9,7 @@ import M2Crypto from letsencrypt.client import acme from letsencrypt.client import CONFIG +from letsencrypt.client import errors from letsencrypt.client import le_util from letsencrypt.client import network @@ -17,37 +18,95 @@ from letsencrypt.client.display import revocation class Revoker(object): - """A revocation class for LE.""" + """A revocation class for LE. + + ..todo:: Add a method to specify your own certificate for revocation - CLI + + :ivar network: Network object + :type network: :class:`letsencrypt.client.network` + + :ivar installer: Installer object + :type installer: :class:`letsencrypt.client.interfaces.IInstaller` + + """ list_path = os.path.join(CONFIG.CERT_KEY_BACKUP, "LIST") + marked_path = os.path.join(CONFIG.CERT_KEY_BACKUP, "MARKED") def __init__(self, server, installer): self.network = network.Network(server) self.installer = installer + # This will go through and make sure that nothing almost got revoked... + # but didn't quite make it... also, guarantees no orphan cert/key files + self.recovery_routine() - def acme_revocation(self, cert): + def revoke_from_interface(self, cert): """Handle ACME "revocation" phase. :param cert: cert intended to be revoked :type cert: :class:`letsencrypt.client.revoker.Cert` - :returns: ACME "revocation" message. - :rtype: dict - """ - cert_der = M2Crypto.X509.load_cert(cert["backup_cert_file"]).as_der() - with open(cert.backup_key_path, "rU") as backup_key_file: - key = backup_key_file.read() + self._mark_for_revocation(cert) - revoc = self.network.send_and_receive_expected( - acme.revocation_request(cert_der, key), "revocation") - - revocation.success_revocation(cert) + revoc = self.revoke(cert.backup_path, cert.backup_key_path) self.remove_cert_key(cert) + self._remove_mark() + + if revoc is not None: + revocation.success_revocation(cert) + else: + # TODO: Display a nice explanation + pass + self.display_menu() - return revoc + def revoke(self, cert_path, key_path): + """Revoke the certificate with the ACME server. + + :param str cert_path: path to certificate file + :param str key_path: path to associated private key or authorized key + + """ + try: + cert_der = M2Crypto.X509.load_cert(cert_path).as_der() + with open(key_path, "rU") as backup_key_file: + key = backup_key_file.read() + + # If either of the files don't exist... or are corrupted + except (OSError, IOError, M2Crypto.X509.X509Error): + return None + + # TODO: Catch error associated with already revoked and proceed. + return self.network.send_and_receive_expected( + acme.revocation_request(cert_der, key), "revocation") + + def recovery_routine(self): + """Intended to make sure files aren't orphaned.""" + if not os.path.isfile(Revoker.marked_path): + return + with open(Revoker.marked_path, "r") as marked_file: + csvreader = csv.reader(marked_file) + for row in csvreader: + self.revoke(row[0], row[1]) + le_util.safely_remove(row[0]) + le_util.safely_remove(row[1]) + + self._remove_mark() + + def _mark_for_revocation(self, cert): + """Marks a cert for revocation.""" + if os.path.isfile(Revoker.marked_path): + raise errors.LetsEncryptRevokerError( + "MARKED file was never cleaned.") + with open(Revoker.marked_path, "w") as marked_file: + csvwriter = csv.writer(marked_file) + csvwriter.writerow([cert.backup_path, cert.backup_key_path]) + + def _remove_mark(self): + """Remove the marked file.""" + os.remove(Revoker.marked_path) def display_menu(self): """List trusted Let's Encrypt certificates.""" @@ -62,7 +121,7 @@ class Revoker(object): if certs: cert = revocation.choose_certs(certs) - self.acme_revocation(cert) + self.revoke_from_interface(cert) else: logging.info( "There are not any trusted Let's Encrypt " @@ -70,7 +129,6 @@ class Revoker(object): def _populate_saved_certs(self, csha1_vhlist): """Populate a list of all the saved certs.""" - certs = [] with open(Revoker.list_path, "rb") as csvfile: csvreader = csv.reader(csvfile) @@ -86,9 +144,7 @@ class Revoker(object): # Set the meta data cert.add_meta(int(row[0]), row[1], row[2], b_c, b_k) # If we were able to find the cert installed... update status - if self.installer is not None: - cert.installed = csha1_vhlist.get( - cert.get_fingerprint, []) + cert.installed = csha1_vhlist.get(cert.get_fingerprint(), []) certs.append(cert) @@ -129,8 +185,8 @@ class Revoker(object): self._remove_cert_from_list(cert) # Remove files - os.remove(cert["backup_cert_file"]) - os.remove(cert["backup_key_file"]) + os.remove(cert.backup_path) + os.remove(cert.backup_key_path) def _remove_cert_from_list(self, cert): """Remove a certificate from the LIST file.""" @@ -259,29 +315,29 @@ class Cert(object): :param str backup_key: backup key filepath """ - DELETED_MSG = "This file has been moved or deleted" - CHANGED_MSG = "This file has changed" + deleted_msg = "This file has been moved or deleted" + changed_msg = "This file has changed" status = "" key_status = "" # Verify original cert path if not os.path.isfile(orig): - status = DELETED_MSG + status = deleted_msg else: o_cert = M2Crypto.X509.load_cert(orig) if self.get_fingerprint() != o_cert.get_fingerprint(md="sha1"): - status = CHANGED_MSG + status = changed_msg # Verify original key path if not os.path.isfile(orig_key): - key_status = DELETED_MSG + key_status = deleted_msg else: with open(orig_key, "r") as fd: key_pem = fd.read() with open(backup_key, "r") as fd: backup_key_pem = fd.read() if key_pem != backup_key_pem: - key_status = CHANGED_MSG + key_status = changed_msg self.idx = idx self.orig = Cert.PathStatus(orig, status) @@ -342,4 +398,3 @@ class Cert(object): text += str(self) text += "-" * (display_util.WIDTH - 4) return text -