mirror of
https://github.com/certbot/certbot.git
synced 2026-06-09 00:32:12 -04:00
revoker progress
This commit is contained in:
parent
0bc5c8a162
commit
9a990ccfaf
7 changed files with 201 additions and 89 deletions
|
|
@ -149,10 +149,11 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
|
|||
for dom in self.domains:
|
||||
flat_client.extend(ichall.chall for ichall in self.client_c[dom])
|
||||
flat_auth.extend(ichall.chall for ichall in self.dv_c[dom])
|
||||
|
||||
try:
|
||||
client_resp = self.client_auth.perform(flat_client)
|
||||
dv_resp = self.dv_auth.perform(flat_auth)
|
||||
if flat_client:
|
||||
client_resp = self.client_auth.perform(flat_client)
|
||||
if flat_auth:
|
||||
dv_resp = self.dv_auth.perform(flat_auth)
|
||||
# This will catch both specific types of errors.
|
||||
except errors.LetsEncryptAuthHandlerError as err:
|
||||
logging.critical("Failure in setting up challenges:")
|
||||
|
|
|
|||
|
|
@ -231,7 +231,7 @@ class Client(object):
|
|||
try:
|
||||
self.installer.enhance(dom, "redirect")
|
||||
except errors.LetsEncryptConfiguratorError:
|
||||
logging.warn('Unable to perform redirect for %s', dom)
|
||||
logging.warn("Unable to perform redirect for %s", dom)
|
||||
|
||||
self.installer.save("Add Redirects")
|
||||
self.installer.restart()
|
||||
|
|
@ -448,7 +448,7 @@ def _misconfigured_rollback(checkpoints, config):
|
|||
"Rollback was unable to solve the misconfiguration issues")
|
||||
|
||||
|
||||
def revoke(config):
|
||||
def revoke(config, no_confirm, cert, authkey):
|
||||
"""Revoke certificates.
|
||||
|
||||
:param config: Configuration.
|
||||
|
|
@ -466,8 +466,14 @@ def revoke(config):
|
|||
"installed may not be available.")
|
||||
installer = None
|
||||
|
||||
revoc = revoker.Revoker(installer, config)
|
||||
revoc.display_menu()
|
||||
revoc = revoker.Revoker(installer, config, no_confirm)
|
||||
# Cert is most selective, so it is chosen first.
|
||||
if cert is not None:
|
||||
revoc.revoke_from_cert(cert[0])
|
||||
elif authkey is not None:
|
||||
revoc.revoke_from_key(le_util.Key(authkey[0], authkey[1]))
|
||||
else:
|
||||
revoc.display_menu()
|
||||
|
||||
|
||||
def view_config_changes(config):
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ List of expected options parameters:
|
|||
|
||||
|
||||
APACHE_MOD_SSL_CONF = pkg_resources.resource_filename(
|
||||
'letsencrypt.client.apache', 'options-ssl.conf')
|
||||
"letsencrypt.client.apache", "options-ssl.conf")
|
||||
"""Path to the Apache mod_ssl config file found in the Let's Encrypt
|
||||
distribution."""
|
||||
|
||||
|
|
|
|||
|
|
@ -46,20 +46,17 @@ def display_certs(certs):
|
|||
|
||||
"""
|
||||
list_choices = [
|
||||
("%s | %s | %s" % (
|
||||
"%s | %s | %s" % (
|
||||
str(cert.get_cn().ljust(display_util.WIDTH - 39)),
|
||||
cert.get_not_before().strftime("%m-%d-%y"),
|
||||
"Installed" if cert.installed and cert.installed != ["Unknown"]
|
||||
else "")
|
||||
for cert in enumerate(certs)
|
||||
)
|
||||
else "") for cert in certs
|
||||
]
|
||||
|
||||
print list_choices
|
||||
code, tag = util(interfaces.IDisplay).menu(
|
||||
"Which certificates would you like to revoke?",
|
||||
"Revoke number (c to cancel): ",
|
||||
choices=list_choices, help_button=True,
|
||||
help_label="More Info", ok_label="Revoke",
|
||||
list_choices, help_label="More Info", ok_label="Revoke",
|
||||
cancel_label="Exit")
|
||||
if not tag:
|
||||
tag = -1
|
||||
|
|
@ -81,8 +78,7 @@ def confirm_revocation(cert):
|
|||
"certificate:{0}".format(os.linesep))
|
||||
text += cert.pretty_print()
|
||||
text += "This action cannot be reversed!"
|
||||
return display_util.OK == util(interfaces.IDisplay).yesno(
|
||||
text, width=display_util.WIDTH, height=display_util.HEIGHT)
|
||||
return display_util.OK == util(interfaces.IDisplay).yesno(text)
|
||||
|
||||
|
||||
def more_info_cert(cert):
|
||||
|
|
|
|||
|
|
@ -1,4 +1,11 @@
|
|||
"""Revoker module to enable LE revocations."""
|
||||
"""Revoker module to enable LE revocations.
|
||||
|
||||
The backend of this module would fit a database quite nicely, but in order to
|
||||
minimize dependencies and maintain transparency, the class currently implements
|
||||
its own storage system. The number of certs that will likely be stored on any
|
||||
given client might not warrant requiring a database.
|
||||
|
||||
"""
|
||||
import collections
|
||||
import csv
|
||||
import logging
|
||||
|
|
@ -32,57 +39,89 @@ class Revoker(object):
|
|||
:type config: :class:`~letsencrypt.client.interfaces.IConfig`
|
||||
|
||||
"""
|
||||
def __init__(self, installer, config):
|
||||
def __init__(self, installer, config, no_confirm=False):
|
||||
self.network = network.Network(config.server)
|
||||
self.installer = installer
|
||||
self.config = config
|
||||
self.no_confirm = no_confirm
|
||||
|
||||
le_util.make_or_verify_dir(config.cert_key_backup, 0o700)
|
||||
|
||||
# TODO: Find a better solution for this...
|
||||
self.list_path = os.path.join(config.cert_key_backup, "LIST")
|
||||
|
||||
def revoke_from_interface(self, cert):
|
||||
"""Handle ACME "revocation" phase.
|
||||
def safe_revoke(self, certs):
|
||||
"""Confirm and revoke certificates.
|
||||
|
||||
:param cert: cert intended to be revoked
|
||||
:type cert: :class:`letsencrypt.client.revoker.Cert`
|
||||
:param certs: certs intended to be revoked
|
||||
:type certs: :class:`letsencrypt.client.revoker.Cert`
|
||||
|
||||
"""
|
||||
revoc = self.revoke(cert.backup_path, cert.backup_key_path)
|
||||
success_list = []
|
||||
try:
|
||||
for cert in certs:
|
||||
if self.no_confirm or revocation.confirm_revocation(cert):
|
||||
revoc = self._acme_revoke(
|
||||
cert.backup_path, cert.backup_key_path)
|
||||
|
||||
self.remove_cert_key([cert.idx, cert.backup_path, cert.backup_key_path])
|
||||
if revoc is not None:
|
||||
success_list.append(cert)
|
||||
revocation.success_revocation(cert)
|
||||
else:
|
||||
# TODO: Display a nice explanation
|
||||
pass
|
||||
finally:
|
||||
self._remove_certs_keys(success_list)
|
||||
|
||||
if revoc is not None:
|
||||
revocation.success_revocation(cert)
|
||||
else:
|
||||
# TODO: Display a nice explanation
|
||||
pass
|
||||
def revoke_from_key(self, authkey):
|
||||
"""Revoke all certificates under an authorized key.
|
||||
|
||||
self.display_menu()
|
||||
:param authkey: Authorized key used in previous transactions
|
||||
:type authkey: :class:`letsencrypt.client.le_util.Key`
|
||||
|
||||
def revoke_from_key(self, auth_key):
|
||||
marked = []
|
||||
"""
|
||||
certs = []
|
||||
with open(self.list_path, "r") as csvfile:
|
||||
csvreader = csv.reader(csvfile)
|
||||
for row in csvreader:
|
||||
# idx, cert, key
|
||||
# Add all keys that match to marked list
|
||||
# TODO: This doesn't account for padding in file that might
|
||||
# TODO: This doesn't account for padding in the file that might
|
||||
# differ. This should only consider the key material.
|
||||
# Note: The key can be different than the pub key found in the
|
||||
# certificate.
|
||||
if auth_key.pem == open(row[2]).read():
|
||||
marked.append(row)
|
||||
_, b_k = self._row_to_backup(row)
|
||||
if authkey.pem == open(b_k).read():
|
||||
certs.append(Cert.fromrow(row))
|
||||
|
||||
self.remove_certs_keys(marked)
|
||||
self.safe_revoke(certs)
|
||||
|
||||
def revoke(self, cert_path, key_path):
|
||||
def revoke_from_cert(self, cert_path):
|
||||
"""Revoke a certificate by specifying a file path.
|
||||
|
||||
:param str cert_path: path to ACME certificate in pem form
|
||||
|
||||
"""
|
||||
# Locate the correct certificate (do not rely on filename)
|
||||
cert_to_revoke = Cert(cert_path)
|
||||
|
||||
with open(self.list_path, "r") as csvfile:
|
||||
csvreader = csv.reader(csvfile)
|
||||
for row in csvreader:
|
||||
cert = Cert.fromrow(row)
|
||||
|
||||
# This uses md5 but it doesn't matter and it is easier to read
|
||||
if cert.get_fingerprint() == cert_to_revoke.get_fingerprint():
|
||||
self.safe_revoke([cert])
|
||||
|
||||
def _acme_revoke(self, cert_path, key_path):
|
||||
"""Revoke the certificate with the ACME server.
|
||||
|
||||
:param str cert_path: path to certificate file
|
||||
:param str key_path: path to associated private key or authorized key
|
||||
|
||||
:returns: TODO
|
||||
|
||||
"""
|
||||
try:
|
||||
cert_der = M2Crypto.X509.load_cert(cert_path).as_der()
|
||||
|
|
@ -111,6 +150,8 @@ class Revoker(object):
|
|||
if certs:
|
||||
cert = revocation.choose_certs(certs)
|
||||
self.revoke_from_interface(cert)
|
||||
# Recursive...
|
||||
self.display_menu()
|
||||
else:
|
||||
logging.info(
|
||||
"There are not any trusted Let's Encrypt "
|
||||
|
|
@ -132,15 +173,8 @@ class Revoker(object):
|
|||
csvreader = csv.reader(csvfile)
|
||||
# idx, orig_cert, orig_key
|
||||
for row in csvreader:
|
||||
# Generate backup key/cert names
|
||||
b_k = os.path.join(self.config.cert_key_backup,
|
||||
os.path.basename(row[2]) + "_" + row[0])
|
||||
b_c = os.path.join(self.config.cert_key_backup,
|
||||
os.path.basename(row[1]) + "_" + row[0])
|
||||
cert = Cert.fromrow(row)
|
||||
|
||||
cert = Cert(b_c)
|
||||
# Set the meta data
|
||||
cert.add_meta(int(row[0]), row[1], row[2], b_c, b_k)
|
||||
# If we were able to find the cert installed... update status
|
||||
cert.installed = csha1_vhlist.get(cert.get_fingerprint(), [])
|
||||
|
||||
|
|
@ -173,27 +207,26 @@ class Revoker(object):
|
|||
|
||||
return csha1_vhlist
|
||||
|
||||
def remove_certs_keys(self, del_list): # pylint: disable=no-self-use
|
||||
def _remove_certs_keys(self, cert_list): # pylint: disable=no-self-use
|
||||
"""Remove certificate and key.
|
||||
|
||||
:param list del_list: each is a `list` in the form
|
||||
[idx, cert_path, key_path] all entries must be in the original
|
||||
LIST order
|
||||
:param list cert_list: each is of type
|
||||
:class:`letsencrypt.client.revoker.Cert`
|
||||
|
||||
"""
|
||||
# This must occur first, LIST is the official key
|
||||
self._remove_certs_from_list(del_list)
|
||||
self._remove_certs_from_list(cert_list)
|
||||
|
||||
# Remove files
|
||||
for row in del_list:
|
||||
os.remove(row[1])
|
||||
os.remove(row[2])
|
||||
for cert in cert_list:
|
||||
os.remove(cert.backup_path)
|
||||
os.remove(cert.backup_key_path)
|
||||
|
||||
def _remove_certs_from_list(self, del_list): # pylint: disable=no-self-use
|
||||
def _remove_certs_from_list(self, cert_list): # pylint: disable=no-self-use
|
||||
"""Remove a certificate from the LIST file.
|
||||
|
||||
:param list del_list: each is a csv row, all items must be in the
|
||||
proper file order.
|
||||
:param list cert_list: each is of type
|
||||
:class:`letsencrypt.client.revoker.Cert`
|
||||
|
||||
"""
|
||||
list_path2 = os.path.join(self.config.cert_key_backup, "LIST.tmp")
|
||||
|
|
@ -201,25 +234,33 @@ class Revoker(object):
|
|||
idx = 0
|
||||
with open(self.list_path, "rb") as orgfile:
|
||||
csvreader = csv.reader(orgfile)
|
||||
|
||||
with open(list_path2, "wb") as newfile:
|
||||
csvwriter = csv.writer(newfile)
|
||||
|
||||
for row in csvreader:
|
||||
if not (row[0] == str(del_list[idx][0]) and
|
||||
row[1] == del_list[idx][1] and
|
||||
row[2] == del_list[idx][2]):
|
||||
if row != cert_list[idx].get_row():
|
||||
csvwriter.writerow(row)
|
||||
else:
|
||||
# Found one of the marked rows... on to the next
|
||||
idx += 1
|
||||
|
||||
if idx != len(del_list):
|
||||
errors.LetsEncryptRevokerError("Did not find all items in del_list")
|
||||
if idx != len(cert_list):
|
||||
errors.LetsEncryptRevokerError("Did not find all cert_list items")
|
||||
|
||||
shutil.copy2(list_path2, self.list_path)
|
||||
os.remove(list_path2)
|
||||
|
||||
def _row_to_backup(self, row):
|
||||
"""Convenience function
|
||||
|
||||
:param list row: csv file row 'idx', 'cert_path', 'key_path'
|
||||
|
||||
:returns: tuple of the form ('backup_cert_path', 'backup_key_path')
|
||||
:rtype: tuple
|
||||
|
||||
"""
|
||||
return (self._get_backup(self.config.cert_key_backup, row[0], row[1]),
|
||||
self._get_backup(self.config.cert_key_backup, row[0], row[2]))
|
||||
|
||||
@classmethod
|
||||
def store_cert_key(cls, cert_path, key_path, config, encrypt=False):
|
||||
"""Store certificate key. (Used to allow quick revocation)
|
||||
|
|
@ -238,7 +279,6 @@ class Revoker(object):
|
|||
"""
|
||||
list_path = (config.cert_key_backup, "LIST")
|
||||
le_util.make_or_verify_dir(config.cert_key_backup, 0o700)
|
||||
idx = 0
|
||||
|
||||
if encrypt:
|
||||
logging.error(
|
||||
|
|
@ -247,39 +287,46 @@ class Revoker(object):
|
|||
"next update!")
|
||||
return False
|
||||
|
||||
cls._append_index_file(cert_path, key_path, list_path)
|
||||
|
||||
shutil.copy2(key_path,
|
||||
os.path.join(
|
||||
config.cert_key_backup,
|
||||
os.path.basename(key_path) + "_" + str(idx)))
|
||||
shutil.copy2(cert_path,
|
||||
os.path.join(
|
||||
config.cert_key_backup,
|
||||
os.path.basename(cert_path) + "_" + str(idx)))
|
||||
cls._catalog_files(
|
||||
config.cert_key_backup, cert_path, key_path, list_path)
|
||||
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def _append_index_file(cls, cert_path, key_path, list_path):
|
||||
def _catalog_files(cls, backup_dir, cert_path, key_path, list_path):
|
||||
if os.path.isfile(list_path):
|
||||
with open(list_path, 'r+b') as csvfile:
|
||||
with open(list_path, "r+b") as csvfile:
|
||||
csvreader = csv.reader(csvfile)
|
||||
|
||||
# Find the highest index in the file
|
||||
for row in csvreader:
|
||||
idx = int(row[0]) + 1
|
||||
csvwriter = csv.writer(csvfile)
|
||||
# You must move the files before appending the row
|
||||
cls._copy_files(backup_dir, idx, cert_path, key_path)
|
||||
csvwriter.writerow([str(idx), cert_path, key_path])
|
||||
|
||||
else:
|
||||
with open(list_path, 'wb') as csvfile:
|
||||
with open(list_path, "wb") as csvfile:
|
||||
csvwriter = csv.writer(csvfile)
|
||||
# You must move the files before appending the row
|
||||
cls._copy_files(backup_dir, "0", cert_path, key_path)
|
||||
csvwriter.writerow(["0", cert_path, key_path])
|
||||
|
||||
@classmethod
|
||||
def _copy_files(cls, backup_dir, idx, cert_path, key_path):
|
||||
shutil.copy2(cert_path, cls._get_backup(backup_dir, idx, cert_path))
|
||||
shutil.copy2(key_path, cls._get_backup(backup_dir, idx, key_path))
|
||||
|
||||
@classmethod
|
||||
def _get_backup(cls, backup_dir, idx, orig_path):
|
||||
return os.path.join(
|
||||
backup_dir, "{name}_{idx}".format(
|
||||
name=os.path.basename(orig_path), idx=str(idx)))
|
||||
|
||||
|
||||
class Cert(object):
|
||||
"""Cert object used for convenience.
|
||||
"""Cert object used for Revocation convenience.
|
||||
|
||||
:ivar cert: M2Crypto X509 cert
|
||||
:type cert: :class:`M2Crypto.X509`
|
||||
|
|
@ -309,7 +356,8 @@ class Cert(object):
|
|||
try:
|
||||
self.cert = M2Crypto.X509.load_cert(cert_path)
|
||||
except (IOError, M2Crypto.X509.X509Error):
|
||||
self.cert = None
|
||||
raise errors.LetsEncryptRevokerError(
|
||||
"Error loading certificate: %s" % cert_path)
|
||||
|
||||
self.idx = -1
|
||||
|
||||
|
|
@ -320,6 +368,19 @@ class Cert(object):
|
|||
|
||||
self.installed = ["Unknown"]
|
||||
|
||||
@classmethod
|
||||
def fromrow(cls, row, backup_dir):
|
||||
"""Initialize Cert from a csv row."""
|
||||
idx = int(row[0])
|
||||
backup = Revoker._get_backup(backup_dir, idx, row[1])
|
||||
backup_key = Revoker._get_backup(backup_dir, idx, row[2])
|
||||
|
||||
obj = cls(backup)
|
||||
obj.add_meta(idx, row[1], row[2], backup, backup_key)
|
||||
|
||||
def get_row(self):
|
||||
"""Returns a list in CSV format."""
|
||||
return [str(self.idx), self.orig, self.orig_key]
|
||||
|
||||
def add_meta(self, idx, orig, orig_key, backup, backup_key):
|
||||
"""Add meta data to cert
|
||||
|
|
@ -420,7 +481,7 @@ class Cert(object):
|
|||
|
||||
def pretty_print(self):
|
||||
"""Nicely frames a cert str"""
|
||||
text = "-" * (display_util.WIDTH - 4) + os.linesep
|
||||
text += str(self)
|
||||
text += "-" * (display_util.WIDTH - 4)
|
||||
return text
|
||||
frame = "-" * (display_util.WIDTH - 4) + os.linesep
|
||||
return "{frame}{cert}{frame}".format(frame=frame, cert=str(self))
|
||||
|
||||
|
||||
|
|
|
|||
39
letsencrypt/client/tests/display/revocation_test.py
Normal file
39
letsencrypt/client/tests/display/revocation_test.py
Normal file
|
|
@ -0,0 +1,39 @@
|
|||
import os
|
||||
import pkg_resources
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
import zope.component
|
||||
|
||||
from letsencrypt.client.display import display_util
|
||||
|
||||
|
||||
class ChooseCertsTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
from letsencrypt.client.revoker import Cert
|
||||
base_package = "letsencrypt.client.tests"
|
||||
self.cert1 = Cert(pkg_resources.resource_filename(
|
||||
base_package, os.path.join("testdata", "cert.pem")))
|
||||
self.cert2 = Cert(pkg_resources.resource_filename(
|
||||
base_package, os.path.join("testdata", "cert-san.pem")))
|
||||
|
||||
self.certs = [self.cert1, self.cert2]
|
||||
|
||||
zope.component.provideUtility(display_util.FileDisplay(sys.stdout))
|
||||
|
||||
@classmethod
|
||||
def _call(cls, certs):
|
||||
from letsencrypt.client.display.revocation import choose_certs
|
||||
return choose_certs(certs)
|
||||
|
||||
#@mock.patch("letsencrypt.client.display.revocation.util")
|
||||
def test_confirm_revocation(self):
|
||||
pass
|
||||
#mock_util().yesno.return_value = True
|
||||
self._call(self.certs)
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
@ -36,12 +36,18 @@ def create_parser():
|
|||
add("-s", "--server", default="letsencrypt-demo.org:443",
|
||||
help=config_help("server"))
|
||||
|
||||
add("-p", "--privkey", type=read_file,
|
||||
help="Path to the private key file for certificate generation.")
|
||||
add("-k", "--authkey", type=read_file,
|
||||
help="Path to the authorized key file")
|
||||
add("-B", "--rsa-key-size", type=int, default=2048, metavar="N",
|
||||
help=config_help("rsa_key_size"))
|
||||
|
||||
add("-k", "--revoke", action="store_true", help="Revoke a certificate.")
|
||||
add("-R", "--revoke", action="store_true",
|
||||
help="Revoke a certificate from a menu.")
|
||||
add("--revoke-certificate", dest="rev_cert", type=read_file,
|
||||
help="Revoke a specific certificate.")
|
||||
add("--revoke-key", dest="rev_key", type=read_file,
|
||||
help="Revoke all certs generated by the provided authorized key.")
|
||||
|
||||
add("-b", "--rollback", type=int, default=0, metavar="N",
|
||||
help="Revert configuration N number of checkpoints.")
|
||||
add("-v", "--view-config-changes", action="store_true",
|
||||
|
|
@ -52,6 +58,9 @@ def create_parser():
|
|||
help="Automatically redirect all HTTP traffic to HTTPS for the newly "
|
||||
"authenticated vhost.")
|
||||
|
||||
add("--no-confirm", dest="no_confirm", action="store_true",
|
||||
help="Turn off confirmation screens, currently used for --revoke")
|
||||
|
||||
add("-e", "--agree-tos", dest="eula", action="store_true",
|
||||
help="Skip the end user license agreement screen.")
|
||||
add("-t", "--text", dest="use_curses", action="store_false",
|
||||
|
|
@ -114,7 +123,7 @@ def main(): # pylint: disable=too-many-branches
|
|||
sys.exit()
|
||||
|
||||
if args.revoke:
|
||||
client.revoke(config)
|
||||
client.revoke(config, args.no_confirm, args.rev_cert, args.rev_key)
|
||||
sys.exit()
|
||||
|
||||
if args.rollback > 0:
|
||||
|
|
@ -146,7 +155,7 @@ def main(): # pylint: disable=too-many-branches
|
|||
if args.privkey is None:
|
||||
privkey = client.init_key(args.rsa_key_size, config.key_dir)
|
||||
else:
|
||||
privkey = le_util.Key(args.privkey[0], args.privkey[1])
|
||||
privkey = le_util.Key(args.authkey[0], args.authkey[1])
|
||||
|
||||
acme = client.Client(config, privkey, auth, installer)
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue