add failsafe, further revise

This commit is contained in:
James Kasten 2015-02-09 01:43:54 -08:00
parent 0cf4329936
commit 8b184ca82c
5 changed files with 95 additions and 29 deletions

View file

@ -65,7 +65,7 @@ def display_certs(certs):
return code, (int(tag) - 1)
def confirm_revocation(self, cert):
def confirm_revocation(cert):
"""Confirm revocation screen.
:param cert: certificate object

View file

@ -39,3 +39,6 @@ class LetsEncryptNoInstallationError(LetsEncryptConfiguratorError):
class LetsEncryptMisconfigurationError(LetsEncryptConfiguratorError):
"""Let's Encrypt Misconfiguration error."""
class LetsEncryptRevokerError(LetsEncryptClientError):
"""Let's Encrypt Revoker error."""

View file

@ -76,6 +76,14 @@ def unique_file(path, mode=0o777):
count += 1
def safely_remove(path):
"""Remove a file that may not exist."""
try:
os.remove(path)
except OSError as err:
if err.errno != errno.ENOENT:
raise
# https://tools.ietf.org/html/draft-ietf-jose-json-web-signature-37#appendix-C
#
# Jose Base64:

View file

@ -50,7 +50,7 @@ class RecoveryToken(object):
"""
try:
os.remove(os.path.join(self.token_dir, chall.domain))
le_util.safely_remove(os.path.join(self.token_dir, chall.domain))
except OSError as err:
if err.errno != errno.ENOENT:
raise

View file

@ -9,6 +9,7 @@ import M2Crypto
from letsencrypt.client import acme
from letsencrypt.client import CONFIG
from letsencrypt.client import errors
from letsencrypt.client import le_util
from letsencrypt.client import network
@ -17,37 +18,95 @@ from letsencrypt.client.display import revocation
class Revoker(object):
"""A revocation class for LE."""
"""A revocation class for LE.
..todo:: Add a method to specify your own certificate for revocation - CLI
:ivar network: Network object
:type network: :class:`letsencrypt.client.network`
:ivar installer: Installer object
:type installer: :class:`letsencrypt.client.interfaces.IInstaller`
"""
list_path = os.path.join(CONFIG.CERT_KEY_BACKUP, "LIST")
marked_path = os.path.join(CONFIG.CERT_KEY_BACKUP, "MARKED")
def __init__(self, server, installer):
self.network = network.Network(server)
self.installer = installer
# This will go through and make sure that nothing almost got revoked...
# but didn't quite make it... also, guarantees no orphan cert/key files
self.recovery_routine()
def acme_revocation(self, cert):
def revoke_from_interface(self, cert):
"""Handle ACME "revocation" phase.
:param cert: cert intended to be revoked
:type cert: :class:`letsencrypt.client.revoker.Cert`
:returns: ACME "revocation" message.
:rtype: dict
"""
cert_der = M2Crypto.X509.load_cert(cert["backup_cert_file"]).as_der()
with open(cert.backup_key_path, "rU") as backup_key_file:
key = backup_key_file.read()
self._mark_for_revocation(cert)
revoc = self.network.send_and_receive_expected(
acme.revocation_request(cert_der, key), "revocation")
revocation.success_revocation(cert)
revoc = self.revoke(cert.backup_path, cert.backup_key_path)
self.remove_cert_key(cert)
self._remove_mark()
if revoc is not None:
revocation.success_revocation(cert)
else:
# TODO: Display a nice explanation
pass
self.display_menu()
return revoc
def revoke(self, cert_path, key_path):
"""Revoke the certificate with the ACME server.
:param str cert_path: path to certificate file
:param str key_path: path to associated private key or authorized key
"""
try:
cert_der = M2Crypto.X509.load_cert(cert_path).as_der()
with open(key_path, "rU") as backup_key_file:
key = backup_key_file.read()
# If either of the files don't exist... or are corrupted
except (OSError, IOError, M2Crypto.X509.X509Error):
return None
# TODO: Catch error associated with already revoked and proceed.
return self.network.send_and_receive_expected(
acme.revocation_request(cert_der, key), "revocation")
def recovery_routine(self):
"""Intended to make sure files aren't orphaned."""
if not os.path.isfile(Revoker.marked_path):
return
with open(Revoker.marked_path, "r") as marked_file:
csvreader = csv.reader(marked_file)
for row in csvreader:
self.revoke(row[0], row[1])
le_util.safely_remove(row[0])
le_util.safely_remove(row[1])
self._remove_mark()
def _mark_for_revocation(self, cert):
"""Marks a cert for revocation."""
if os.path.isfile(Revoker.marked_path):
raise errors.LetsEncryptRevokerError(
"MARKED file was never cleaned.")
with open(Revoker.marked_path, "w") as marked_file:
csvwriter = csv.writer(marked_file)
csvwriter.writerow([cert.backup_path, cert.backup_key_path])
def _remove_mark(self):
"""Remove the marked file."""
os.remove(Revoker.marked_path)
def display_menu(self):
"""List trusted Let's Encrypt certificates."""
@ -62,7 +121,7 @@ class Revoker(object):
if certs:
cert = revocation.choose_certs(certs)
self.acme_revocation(cert)
self.revoke_from_interface(cert)
else:
logging.info(
"There are not any trusted Let's Encrypt "
@ -70,7 +129,6 @@ class Revoker(object):
def _populate_saved_certs(self, csha1_vhlist):
"""Populate a list of all the saved certs."""
certs = []
with open(Revoker.list_path, "rb") as csvfile:
csvreader = csv.reader(csvfile)
@ -86,9 +144,7 @@ class Revoker(object):
# Set the meta data
cert.add_meta(int(row[0]), row[1], row[2], b_c, b_k)
# If we were able to find the cert installed... update status
if self.installer is not None:
cert.installed = csha1_vhlist.get(
cert.get_fingerprint, [])
cert.installed = csha1_vhlist.get(cert.get_fingerprint(), [])
certs.append(cert)
@ -129,8 +185,8 @@ class Revoker(object):
self._remove_cert_from_list(cert)
# Remove files
os.remove(cert["backup_cert_file"])
os.remove(cert["backup_key_file"])
os.remove(cert.backup_path)
os.remove(cert.backup_key_path)
def _remove_cert_from_list(self, cert):
"""Remove a certificate from the LIST file."""
@ -259,29 +315,29 @@ class Cert(object):
:param str backup_key: backup key filepath
"""
DELETED_MSG = "This file has been moved or deleted"
CHANGED_MSG = "This file has changed"
deleted_msg = "This file has been moved or deleted"
changed_msg = "This file has changed"
status = ""
key_status = ""
# Verify original cert path
if not os.path.isfile(orig):
status = DELETED_MSG
status = deleted_msg
else:
o_cert = M2Crypto.X509.load_cert(orig)
if self.get_fingerprint() != o_cert.get_fingerprint(md="sha1"):
status = CHANGED_MSG
status = changed_msg
# Verify original key path
if not os.path.isfile(orig_key):
key_status = DELETED_MSG
key_status = deleted_msg
else:
with open(orig_key, "r") as fd:
key_pem = fd.read()
with open(backup_key, "r") as fd:
backup_key_pem = fd.read()
if key_pem != backup_key_pem:
key_status = CHANGED_MSG
key_status = changed_msg
self.idx = idx
self.orig = Cert.PathStatus(orig, status)
@ -342,4 +398,3 @@ class Cert(object):
text += str(self)
text += "-" * (display_util.WIDTH - 4)
return text