mirror of
https://github.com/certbot/certbot.git
synced 2026-05-28 04:34:11 -04:00
Merge branch 'master' into red_errors
This commit is contained in:
commit
d2a64166c6
15 changed files with 246 additions and 1220 deletions
|
|
@ -11,6 +11,7 @@ from acme import messages
|
|||
from letsencrypt import achallenges
|
||||
from letsencrypt import constants
|
||||
from letsencrypt import errors
|
||||
from letsencrypt import error_handler
|
||||
from letsencrypt import interfaces
|
||||
|
||||
|
||||
|
|
@ -106,17 +107,16 @@ class AuthHandler(object):
|
|||
"""Get Responses for challenges from authenticators."""
|
||||
cont_resp = []
|
||||
dv_resp = []
|
||||
try:
|
||||
if self.cont_c:
|
||||
cont_resp = self.cont_auth.perform(self.cont_c)
|
||||
if self.dv_c:
|
||||
dv_resp = self.dv_auth.perform(self.dv_c)
|
||||
# This will catch both specific types of errors.
|
||||
except errors.AuthorizationError:
|
||||
logger.critical("Failure in setting up challenges.")
|
||||
logger.info("Attempting to clean up outstanding challenges...")
|
||||
self._cleanup_challenges()
|
||||
raise
|
||||
with error_handler.ErrorHandler(self._cleanup_challenges):
|
||||
try:
|
||||
if self.cont_c:
|
||||
cont_resp = self.cont_auth.perform(self.cont_c)
|
||||
if self.dv_c:
|
||||
dv_resp = self.dv_auth.perform(self.dv_c)
|
||||
except errors.AuthorizationError:
|
||||
logger.critical("Failure in setting up challenges.")
|
||||
logger.info("Attempting to clean up outstanding challenges...")
|
||||
raise
|
||||
|
||||
assert len(cont_resp) == len(self.cont_c)
|
||||
assert len(dv_resp) == len(self.dv_c)
|
||||
|
|
|
|||
|
|
@ -18,10 +18,10 @@ from letsencrypt import constants
|
|||
from letsencrypt import continuity_auth
|
||||
from letsencrypt import crypto_util
|
||||
from letsencrypt import errors
|
||||
from letsencrypt import error_handler
|
||||
from letsencrypt import interfaces
|
||||
from letsencrypt import le_util
|
||||
from letsencrypt import reverter
|
||||
from letsencrypt import revoker
|
||||
from letsencrypt import storage
|
||||
|
||||
from letsencrypt.display import ops as display_ops
|
||||
|
|
@ -211,7 +211,7 @@ class Client(object):
|
|||
# Create CSR from names
|
||||
key = crypto_util.init_save_key(
|
||||
self.config.rsa_key_size, self.config.key_dir)
|
||||
csr = crypto_util.init_save_csr(key, domains, self.config.cert_dir)
|
||||
csr = crypto_util.init_save_csr(key, domains, self.config.csr_dir)
|
||||
|
||||
return self._obtain_certificate(domains, csr) + (key, csr)
|
||||
|
||||
|
|
@ -364,16 +364,17 @@ class Client(object):
|
|||
|
||||
chain_path = None if chain_path is None else os.path.abspath(chain_path)
|
||||
|
||||
for dom in domains:
|
||||
# TODO: Provide a fullchain reference for installers like
|
||||
# nginx that want it
|
||||
self.installer.deploy_cert(
|
||||
dom, os.path.abspath(cert_path),
|
||||
os.path.abspath(privkey_path), chain_path)
|
||||
with error_handler.ErrorHandler(self.installer.recovery_routine):
|
||||
for dom in domains:
|
||||
# TODO: Provide a fullchain reference for installers like
|
||||
# nginx that want it
|
||||
self.installer.deploy_cert(
|
||||
dom, os.path.abspath(cert_path),
|
||||
os.path.abspath(privkey_path), chain_path)
|
||||
|
||||
self.installer.save("Deployed Let's Encrypt Certificate")
|
||||
# sites may have been enabled / final cleanup
|
||||
self.installer.restart()
|
||||
self.installer.save("Deployed Let's Encrypt Certificate")
|
||||
# sites may have been enabled / final cleanup
|
||||
self.installer.restart()
|
||||
|
||||
def enhance_config(self, domains, redirect=None):
|
||||
"""Enhance the configuration.
|
||||
|
|
@ -399,6 +400,8 @@ class Client(object):
|
|||
if redirect is None:
|
||||
redirect = enhancements.ask("redirect")
|
||||
|
||||
# When support for more enhancements are added, the call to the
|
||||
# plugin's `enhance` function should be wrapped by an ErrorHandler
|
||||
if redirect:
|
||||
self.redirect_to_ssl(domains)
|
||||
|
||||
|
|
@ -409,14 +412,16 @@ class Client(object):
|
|||
:type vhost: :class:`letsencrypt.interfaces.IInstaller`
|
||||
|
||||
"""
|
||||
for dom in domains:
|
||||
try:
|
||||
self.installer.enhance(dom, "redirect")
|
||||
except errors.PluginError:
|
||||
logger.warn("Unable to perform redirect for %s", dom)
|
||||
with error_handler.ErrorHandler(self.installer.recovery_routine):
|
||||
for dom in domains:
|
||||
try:
|
||||
self.installer.enhance(dom, "redirect")
|
||||
except errors.PluginError:
|
||||
logger.warn("Unable to perform redirect for %s", dom)
|
||||
raise
|
||||
|
||||
self.installer.save("Add Redirects")
|
||||
self.installer.restart()
|
||||
self.installer.save("Add Redirects")
|
||||
self.installer.restart()
|
||||
|
||||
|
||||
def validate_key_csr(privkey, csr=None):
|
||||
|
|
@ -485,27 +490,6 @@ def rollback(default_installer, checkpoints, config, plugins):
|
|||
installer.restart()
|
||||
|
||||
|
||||
def revoke(default_installer, config, plugins, no_confirm, cert, authkey):
|
||||
"""Revoke certificates.
|
||||
|
||||
:param config: Configuration.
|
||||
:type config: :class:`letsencrypt.interfaces.IConfig`
|
||||
|
||||
"""
|
||||
installer = display_ops.pick_installer(
|
||||
config, default_installer, plugins, question="Which installer "
|
||||
"should be used for certificate revocation?")
|
||||
|
||||
revoc = revoker.Revoker(installer, config, no_confirm)
|
||||
# Cert is most selective, so it is chosen first.
|
||||
if cert is not None:
|
||||
revoc.revoke_from_cert(cert[0])
|
||||
elif authkey is not None:
|
||||
revoc.revoke_from_key(le_util.Key(authkey[0], authkey[1]))
|
||||
else:
|
||||
revoc.revoke_from_menu()
|
||||
|
||||
|
||||
def view_config_changes(config):
|
||||
"""View checkpoints and associated configuration changes.
|
||||
|
||||
|
|
|
|||
|
|
@ -18,8 +18,7 @@ class NamespaceConfig(object):
|
|||
paths defined in :py:mod:`letsencrypt.constants`:
|
||||
|
||||
- `accounts_dir`
|
||||
- `cert_dir`
|
||||
- `cert_key_backup`
|
||||
- `csr_dir`
|
||||
- `in_progress_dir`
|
||||
- `key_dir`
|
||||
- `renewer_config_file`
|
||||
|
|
@ -54,13 +53,8 @@ class NamespaceConfig(object):
|
|||
return os.path.join(self.namespace.work_dir, constants.BACKUP_DIR)
|
||||
|
||||
@property
|
||||
def cert_dir(self): # pylint: disable=missing-docstring
|
||||
return os.path.join(self.namespace.config_dir, constants.CERT_DIR)
|
||||
|
||||
@property
|
||||
def cert_key_backup(self): # pylint: disable=missing-docstring
|
||||
return os.path.join(self.namespace.work_dir,
|
||||
constants.CERT_KEY_BACKUP_DIR, self.server_path)
|
||||
def csr_dir(self): # pylint: disable=missing-docstring
|
||||
return os.path.join(self.namespace.config_dir, constants.CSR_DIR)
|
||||
|
||||
@property
|
||||
def in_progress_dir(self): # pylint: disable=missing-docstring
|
||||
|
|
|
|||
|
|
@ -68,12 +68,8 @@ ACCOUNTS_DIR = "accounts"
|
|||
BACKUP_DIR = "backups"
|
||||
"""Directory (relative to `IConfig.work_dir`) where backups are kept."""
|
||||
|
||||
CERT_DIR = "certs"
|
||||
"""See `.IConfig.cert_dir`."""
|
||||
|
||||
CERT_KEY_BACKUP_DIR = "keys-certs"
|
||||
"""Directory where all certificates and keys are stored (relative to
|
||||
`IConfig.work_dir`). Used for easy revocation."""
|
||||
CSR_DIR = "csr"
|
||||
"""See `.IConfig.csr_dir`."""
|
||||
|
||||
IN_PROGRESS_DIR = "IN_PROGRESS"
|
||||
"""Directory used before a permanent checkpoint is finalized (relative to
|
||||
|
|
|
|||
|
|
@ -1,77 +0,0 @@
|
|||
"""Revocation UI class."""
|
||||
import os
|
||||
|
||||
import zope.component
|
||||
|
||||
from letsencrypt import interfaces
|
||||
from letsencrypt.display import util as display_util
|
||||
|
||||
# Define a helper function to avoid verbose code
|
||||
util = zope.component.getUtility # pylint: disable=invalid-name
|
||||
|
||||
|
||||
def display_certs(certs):
|
||||
"""Display the certificates in a menu for revocation.
|
||||
|
||||
:param list certs: each is a :class:`letsencrypt.revoker.Cert`
|
||||
|
||||
:returns: tuple of the form (code, selection) where
|
||||
code is a display exit code
|
||||
selection is the user's int selection
|
||||
:rtype: tuple
|
||||
|
||||
"""
|
||||
list_choices = [
|
||||
"%s | %s | %s" % (
|
||||
str(cert.get_cn().ljust(display_util.WIDTH - 39)),
|
||||
cert.get_not_before().strftime("%m-%d-%y"),
|
||||
"Installed" if cert.installed and cert.installed != ["Unknown"]
|
||||
else "") for cert in certs
|
||||
]
|
||||
|
||||
code, tag = util(interfaces.IDisplay).menu(
|
||||
"Which certificates would you like to revoke?",
|
||||
list_choices, help_label="More Info", ok_label="Revoke",
|
||||
cancel_label="Exit")
|
||||
|
||||
return code, tag
|
||||
|
||||
|
||||
def confirm_revocation(cert):
|
||||
"""Confirm revocation screen.
|
||||
|
||||
:param cert: certificate object
|
||||
:type cert: :class:
|
||||
|
||||
:returns: True if user would like to revoke, False otherwise
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
return util(interfaces.IDisplay).yesno(
|
||||
"Are you sure you would like to revoke the following "
|
||||
"certificate:{0}{cert}This action cannot be reversed!".format(
|
||||
os.linesep, cert=cert.pretty_print()))
|
||||
|
||||
|
||||
def more_info_cert(cert):
|
||||
"""Displays more info about the cert.
|
||||
|
||||
:param dict cert: cert dict used throughout revoker.py
|
||||
|
||||
"""
|
||||
util(interfaces.IDisplay).notification(
|
||||
"Certificate Information:{0}{1}".format(
|
||||
os.linesep, cert.pretty_print()),
|
||||
height=display_util.HEIGHT)
|
||||
|
||||
|
||||
def success_revocation(cert):
|
||||
"""Display a success message.
|
||||
|
||||
:param cert: cert that was revoked
|
||||
:type cert: :class:`letsencrypt.revoker.Cert`
|
||||
|
||||
"""
|
||||
util(interfaces.IDisplay).notification(
|
||||
"You have successfully revoked the certificate for "
|
||||
"%s" % cert.get_cn())
|
||||
98
letsencrypt/error_handler.py
Normal file
98
letsencrypt/error_handler.py
Normal file
|
|
@ -0,0 +1,98 @@
|
|||
"""Registers functions to be called if an exception or signal occurs."""
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import traceback
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# _SIGNALS stores the signals that will be handled by the ErrorHandler. These
|
||||
# signals were chosen as their default handler terminates the process and could
|
||||
# potentially occur from inside Python. Signals such as SIGILL were not
|
||||
# included as they could be a sign of something devious and we should terminate
|
||||
# immediately.
|
||||
_SIGNALS = ([signal.SIGTERM] if os.name == "nt" else
|
||||
[signal.SIGTERM, signal.SIGHUP, signal.SIGQUIT,
|
||||
signal.SIGXCPU, signal.SIGXFSZ, signal.SIGPWR])
|
||||
|
||||
|
||||
class ErrorHandler(object):
|
||||
"""Registers functions to be called if an exception or signal occurs.
|
||||
|
||||
This class allows you to register functions that will be called when
|
||||
an exception or signal is encountered. The class works best as a
|
||||
context manager. For example:
|
||||
|
||||
with ErrorHandler(cleanup_func):
|
||||
do_something()
|
||||
|
||||
If an exception is raised out of do_something, cleanup_func will be
|
||||
called. The exception is not caught by the ErrorHandler. Similarly,
|
||||
if a signal is encountered, cleanup_func is called followed by the
|
||||
previously registered signal handler.
|
||||
|
||||
Every registered function is attempted to be run to completion
|
||||
exactly once. If a registered function raises an exception, it is
|
||||
logged and the next function is called. If a (different) handled
|
||||
signal occurs while calling a registered function, it is attempted
|
||||
to be called again by the next signal handler.
|
||||
|
||||
"""
|
||||
def __init__(self, func=None):
|
||||
self.funcs = []
|
||||
self.prev_handlers = {}
|
||||
if func is not None:
|
||||
self.register(func)
|
||||
|
||||
def __enter__(self):
|
||||
self.set_signal_handlers()
|
||||
|
||||
def __exit__(self, exec_type, exec_value, trace):
|
||||
if exec_value is not None:
|
||||
logger.debug("Encountered exception:\n%s", "".join(
|
||||
traceback.format_exception(exec_type, exec_value, trace)))
|
||||
self.call_registered()
|
||||
self.reset_signal_handlers()
|
||||
|
||||
def register(self, func):
|
||||
"""Registers func to be called if an error occurs."""
|
||||
self.funcs.append(func)
|
||||
|
||||
def call_registered(self):
|
||||
"""Calls all registered functions"""
|
||||
logger.debug("Calling registered functions")
|
||||
while self.funcs:
|
||||
try:
|
||||
self.funcs[-1]()
|
||||
except Exception as error: # pylint: disable=broad-except
|
||||
logger.error("Encountered exception during recovery")
|
||||
logger.exception(error)
|
||||
self.funcs.pop()
|
||||
|
||||
def set_signal_handlers(self):
|
||||
"""Sets signal handlers for signals in _SIGNALS."""
|
||||
for signum in _SIGNALS:
|
||||
prev_handler = signal.getsignal(signum)
|
||||
# If prev_handler is None, the handler was set outside of Python
|
||||
if prev_handler is not None:
|
||||
self.prev_handlers[signum] = prev_handler
|
||||
signal.signal(signum, self._signal_handler)
|
||||
|
||||
def reset_signal_handlers(self):
|
||||
"""Resets signal handlers for signals in _SIGNALS."""
|
||||
for signum in self.prev_handlers:
|
||||
signal.signal(signum, self.prev_handlers[signum])
|
||||
self.prev_handlers.clear()
|
||||
|
||||
def _signal_handler(self, signum, unused_frame):
|
||||
"""Calls registered functions and the previous signal handler.
|
||||
|
||||
:param int signum: number of current signal
|
||||
|
||||
"""
|
||||
logger.debug("Singal %s encountered", signum)
|
||||
self.call_registered()
|
||||
signal.signal(signum, self.prev_handlers[signum])
|
||||
os.kill(os.getpid(), signum)
|
||||
|
|
@ -205,12 +205,9 @@ class IConfig(zope.interface.Interface):
|
|||
accounts_dir = zope.interface.Attribute(
|
||||
"Directory where all account information is stored.")
|
||||
backup_dir = zope.interface.Attribute("Configuration backups directory.")
|
||||
cert_dir = zope.interface.Attribute(
|
||||
csr_dir = zope.interface.Attribute(
|
||||
"Directory where newly generated Certificate Signing Requests "
|
||||
"(CSRs) and certificates not enrolled in the renewer are saved.")
|
||||
cert_key_backup = zope.interface.Attribute(
|
||||
"Directory where all certificates and keys are stored. "
|
||||
"Used for easy revocation.")
|
||||
"(CSRs) are saved.")
|
||||
in_progress_dir = zope.interface.Attribute(
|
||||
"Directory used before a permanent checkpoint is finalized.")
|
||||
key_dir = zope.interface.Attribute("Keys storage.")
|
||||
|
|
@ -321,6 +318,17 @@ class IInstaller(IPlugin):
|
|||
|
||||
"""
|
||||
|
||||
def recovery_routine():
|
||||
"""Revert configuration to most recent finalized checkpoint.
|
||||
|
||||
Remove all changes (temporary and permanent) that have not been
|
||||
finalized. This is useful to protect against crashes and other
|
||||
execution interruptions.
|
||||
|
||||
:raises .errors.PluginError: If unable to recover the configuration
|
||||
|
||||
"""
|
||||
|
||||
def view_config_changes():
|
||||
"""Display all of the LE config changes.
|
||||
|
||||
|
|
|
|||
|
|
@ -47,6 +47,9 @@ class Installer(common.Plugin):
|
|||
def rollback_checkpoints(self, rollback=1):
|
||||
pass # pragma: no cover
|
||||
|
||||
def recovery_routine(self):
|
||||
pass # pragma: no cover
|
||||
|
||||
def view_config_changes(self):
|
||||
pass # pragma: no cover
|
||||
|
||||
|
|
|
|||
|
|
@ -1,560 +0,0 @@
|
|||
"""Revoker module to enable LE revocations.
|
||||
|
||||
The backend of this module would fit a database quite nicely, but in order to
|
||||
minimize dependencies and maintain transparency, the class currently implements
|
||||
its own storage system. The number of certs that will likely be stored on any
|
||||
given client might not warrant requiring a database.
|
||||
|
||||
"""
|
||||
import collections
|
||||
import csv
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
|
||||
import OpenSSL
|
||||
|
||||
from acme import client as acme_client
|
||||
from acme import crypto_util as acme_crypto_util
|
||||
from acme.jose import util as jose_util
|
||||
|
||||
from letsencrypt import crypto_util
|
||||
from letsencrypt import errors
|
||||
from letsencrypt import le_util
|
||||
|
||||
from letsencrypt.display import util as display_util
|
||||
from letsencrypt.display import revocation
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Revoker(object):
|
||||
"""A revocation class for LE.
|
||||
|
||||
.. todo:: Add a method to specify your own certificate for revocation - CLI
|
||||
|
||||
:ivar .acme.client.Client acme: ACME client
|
||||
|
||||
:ivar installer: Installer object
|
||||
:type installer: :class:`~letsencrypt.interfaces.IInstaller`
|
||||
|
||||
:ivar config: Configuration.
|
||||
:type config: :class:`~letsencrypt.interfaces.IConfig`
|
||||
|
||||
:ivar bool no_confirm: Whether or not to ask for confirmation for revocation
|
||||
|
||||
"""
|
||||
def __init__(self, installer, config, no_confirm=False):
|
||||
# XXX
|
||||
self.acme = acme_client.Client(directory=None, key=None, alg=None)
|
||||
|
||||
self.installer = installer
|
||||
self.config = config
|
||||
self.no_confirm = no_confirm
|
||||
|
||||
le_util.make_or_verify_dir(config.cert_key_backup, 0o700, os.geteuid(),
|
||||
self.config.strict_permissions)
|
||||
|
||||
# TODO: Find a better solution for this...
|
||||
self.list_path = os.path.join(config.cert_key_backup, "LIST")
|
||||
# Make sure that the file is available for use for rest of class
|
||||
open(self.list_path, "a").close()
|
||||
|
||||
def revoke_from_key(self, authkey):
|
||||
"""Revoke all certificates under an authorized key.
|
||||
|
||||
:param authkey: Authorized key used in previous transactions
|
||||
:type authkey: :class:`letsencrypt.le_util.Key`
|
||||
|
||||
"""
|
||||
certs = []
|
||||
try:
|
||||
clean_pem = OpenSSL.crypto.dump_privatekey(
|
||||
OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.load_privatekey(
|
||||
OpenSSL.crypto.FILETYPE_PEM, authkey.pem))
|
||||
except OpenSSL.crypto.Error as error:
|
||||
logger.debug(error, exc_info=True)
|
||||
raise errors.RevokerError(
|
||||
"Invalid key file specified to revoke_from_key")
|
||||
|
||||
with open(self.list_path, "rb") as csvfile:
|
||||
csvreader = csv.reader(csvfile)
|
||||
for row in csvreader:
|
||||
# idx, cert, key
|
||||
# Add all keys that match to marked list
|
||||
# Note: The key can be different than the pub key found in the
|
||||
# certificate.
|
||||
_, b_k = self._row_to_backup(row)
|
||||
try:
|
||||
test_pem = OpenSSL.crypto.dump_privatekey(
|
||||
OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.load_privatekey(
|
||||
OpenSSL.crypto.FILETYPE_PEM, open(b_k).read()))
|
||||
except OpenSSL.crypto.Error as error:
|
||||
logger.debug(error, exc_info=True)
|
||||
# This should never happen given the assumptions of the
|
||||
# module. If it does, it is probably best to delete the
|
||||
# the offending key/cert. For now... just raise an exception
|
||||
raise errors.RevokerError("%s - backup file is corrupted.")
|
||||
|
||||
if clean_pem == test_pem:
|
||||
certs.append(
|
||||
Cert.fromrow(row, self.config.cert_key_backup))
|
||||
if certs:
|
||||
self._safe_revoke(certs)
|
||||
else:
|
||||
logger.info("No certificates using the authorized key were found.")
|
||||
|
||||
def revoke_from_cert(self, cert_path):
|
||||
"""Revoke a certificate by specifying a file path.
|
||||
|
||||
.. todo:: Add the ability to revoke the certificate even if the cert
|
||||
is not stored locally. A path to the auth key will need to be
|
||||
attained from the user.
|
||||
|
||||
:param str cert_path: path to ACME certificate in pem form
|
||||
|
||||
"""
|
||||
# Locate the correct certificate (do not rely on filename)
|
||||
cert_to_revoke = Cert(cert_path)
|
||||
|
||||
with open(self.list_path, "rb") as csvfile:
|
||||
csvreader = csv.reader(csvfile)
|
||||
for row in csvreader:
|
||||
cert = Cert.fromrow(row, self.config.cert_key_backup)
|
||||
|
||||
if cert.get_der() == cert_to_revoke.get_der():
|
||||
self._safe_revoke([cert])
|
||||
return
|
||||
|
||||
logger.info("Associated ACME certificate was not found.")
|
||||
|
||||
def revoke_from_menu(self):
|
||||
"""List trusted Let's Encrypt certificates."""
|
||||
|
||||
csha1_vhlist = self._get_installed_locations()
|
||||
certs = self._populate_saved_certs(csha1_vhlist)
|
||||
|
||||
while True:
|
||||
if certs:
|
||||
code, selection = revocation.display_certs(certs)
|
||||
|
||||
if code == display_util.OK:
|
||||
revoked_certs = self._safe_revoke([certs[selection]])
|
||||
# Since we are currently only revoking one cert at a time...
|
||||
if revoked_certs:
|
||||
del certs[selection]
|
||||
elif code == display_util.HELP:
|
||||
revocation.more_info_cert(certs[selection])
|
||||
else:
|
||||
return
|
||||
else:
|
||||
logger.info(
|
||||
"There are not any trusted Let's Encrypt "
|
||||
"certificates for this server.")
|
||||
return
|
||||
|
||||
def _populate_saved_certs(self, csha1_vhlist):
|
||||
# pylint: disable=no-self-use
|
||||
"""Populate a list of all the saved certs.
|
||||
|
||||
It is important to read from the file rather than the directory.
|
||||
We assume that the LIST file is the master record and depending on
|
||||
program crashes, this may differ from what is actually in the directory.
|
||||
Namely, additional certs/keys may exist. There should never be any
|
||||
certs/keys in the LIST that don't exist in the directory however.
|
||||
|
||||
:param dict csha1_vhlist: map from cert sha1 fingerprints to a list
|
||||
of it's installed location paths.
|
||||
|
||||
"""
|
||||
certs = []
|
||||
with open(self.list_path, "rb") as csvfile:
|
||||
csvreader = csv.reader(csvfile)
|
||||
# idx, orig_cert, orig_key
|
||||
for row in csvreader:
|
||||
cert = Cert.fromrow(row, self.config.cert_key_backup)
|
||||
|
||||
# If we were able to find the cert installed... update status
|
||||
cert.installed = csha1_vhlist.get(cert.get_fingerprint(), [])
|
||||
|
||||
certs.append(cert)
|
||||
|
||||
return certs
|
||||
|
||||
def _get_installed_locations(self):
|
||||
"""Get installed locations of certificates.
|
||||
|
||||
:returns: map from cert sha1 fingerprint to :class:`list` of vhosts
|
||||
where the certificate is installed.
|
||||
|
||||
"""
|
||||
csha1_vhlist = {}
|
||||
|
||||
if self.installer is None:
|
||||
return csha1_vhlist
|
||||
|
||||
for (cert_path, _, path) in self.installer.get_all_certs_keys():
|
||||
try:
|
||||
with open(cert_path) as cert_file:
|
||||
cert_data = cert_file.read()
|
||||
except IOError:
|
||||
continue
|
||||
try:
|
||||
cert_obj, _ = crypto_util.pyopenssl_load_certificate(cert_data)
|
||||
except errors.Error:
|
||||
continue
|
||||
cert_sha1 = cert_obj.digest("sha1")
|
||||
if cert_sha1 in csha1_vhlist:
|
||||
csha1_vhlist[cert_sha1].append(path)
|
||||
else:
|
||||
csha1_vhlist[cert_sha1] = [path]
|
||||
|
||||
return csha1_vhlist
|
||||
|
||||
def _safe_revoke(self, certs):
|
||||
"""Confirm and revoke certificates.
|
||||
|
||||
:param certs: certs intended to be revoked
|
||||
:type certs: :class:`list` of :class:`letsencrypt.revoker.Cert`
|
||||
|
||||
:returns: certs successfully revoked
|
||||
:rtype: :class:`list` of :class:`letsencrypt.revoker.Cert`
|
||||
|
||||
"""
|
||||
success_list = []
|
||||
try:
|
||||
for cert in certs:
|
||||
if self.no_confirm or revocation.confirm_revocation(cert):
|
||||
try:
|
||||
self._acme_revoke(cert)
|
||||
except errors.Error:
|
||||
# TODO: Improve error handling when networking is set...
|
||||
logger.error(
|
||||
"Unable to revoke cert:%s%s", os.linesep, str(cert))
|
||||
success_list.append(cert)
|
||||
revocation.success_revocation(cert)
|
||||
finally:
|
||||
if success_list:
|
||||
self._remove_certs_keys(success_list)
|
||||
|
||||
return success_list
|
||||
|
||||
def _acme_revoke(self, cert):
|
||||
"""Revoke the certificate with the ACME server.
|
||||
|
||||
:param cert: certificate to revoke
|
||||
:type cert: :class:`letsencrypt.revoker.Cert`
|
||||
|
||||
:returns: TODO
|
||||
|
||||
"""
|
||||
# XXX | pylint: disable=unused-variable
|
||||
|
||||
# pylint: disable=protected-access
|
||||
certificate = jose_util.ComparableX509(cert._cert)
|
||||
try:
|
||||
with open(cert.backup_key_path, "rU") as backup_key_file:
|
||||
key = OpenSSL.crypto.load_privatekey(
|
||||
OpenSSL.crypto.FILETYPE_PEM, backup_key_file.read())
|
||||
# If the key file doesn't exist... or is corrupted
|
||||
except OpenSSL.crypto.Error as error:
|
||||
logger.debug(error, exc_info=True)
|
||||
raise errors.RevokerError(
|
||||
"Corrupted backup key file: %s" % cert.backup_key_path)
|
||||
|
||||
return self.acme.revoke(cert=None) # XXX
|
||||
|
||||
def _remove_certs_keys(self, cert_list): # pylint: disable=no-self-use
|
||||
"""Remove certificate and key.
|
||||
|
||||
:param list cert_list: Must contain certs, each is of type
|
||||
:class:`letsencrypt.revoker.Cert`
|
||||
|
||||
"""
|
||||
# This must occur first, LIST is the official key
|
||||
self._remove_certs_from_list(cert_list)
|
||||
|
||||
# Remove files
|
||||
for cert in cert_list:
|
||||
os.remove(cert.backup_path)
|
||||
os.remove(cert.backup_key_path)
|
||||
|
||||
def _remove_certs_from_list(self, cert_list): # pylint: disable=no-self-use
|
||||
"""Remove a certificate from the LIST file.
|
||||
|
||||
:param list cert_list: Must contain valid certs, each is of type
|
||||
:class:`letsencrypt.revoker.Cert`
|
||||
|
||||
"""
|
||||
newfile_handle, list_path2 = tempfile.mkstemp(".tmp", "LIST")
|
||||
idx = 0
|
||||
|
||||
with open(self.list_path, "rb") as orgfile:
|
||||
csvreader = csv.reader(orgfile)
|
||||
with os.fdopen(newfile_handle, "wb") as newfile:
|
||||
csvwriter = csv.writer(newfile)
|
||||
|
||||
for row in csvreader:
|
||||
if idx >= len(cert_list) or row != cert_list[idx].get_row():
|
||||
csvwriter.writerow(row)
|
||||
else:
|
||||
idx += 1
|
||||
|
||||
# This should never happen...
|
||||
if idx != len(cert_list):
|
||||
raise errors.RevokerError(
|
||||
"Did not find all cert_list items to remove from LIST")
|
||||
|
||||
shutil.copy2(list_path2, self.list_path)
|
||||
os.remove(list_path2)
|
||||
|
||||
def _row_to_backup(self, row):
|
||||
"""Convenience function
|
||||
|
||||
:param list row: csv file row 'idx', 'cert_path', 'key_path'
|
||||
|
||||
:returns: tuple of the form ('backup_cert_path', 'backup_key_path')
|
||||
:rtype: tuple
|
||||
|
||||
"""
|
||||
return (self._get_backup(self.config.cert_key_backup, row[0], row[1]),
|
||||
self._get_backup(self.config.cert_key_backup, row[0], row[2]))
|
||||
|
||||
@classmethod
|
||||
def store_cert_key(cls, cert_path, key_path, config):
|
||||
"""Store certificate key. (Used to allow quick revocation)
|
||||
|
||||
:param str cert_path: Path to a certificate file.
|
||||
:param str key_path: Path to authorized key for certificate
|
||||
|
||||
:ivar config: Configuration.
|
||||
:type config: :class:`~letsencrypt.interfaces.IConfig`
|
||||
|
||||
"""
|
||||
list_path = os.path.join(config.cert_key_backup, "LIST")
|
||||
le_util.make_or_verify_dir(config.cert_key_backup, 0o700, os.geteuid(),
|
||||
config.strict_permissions)
|
||||
|
||||
cls._catalog_files(
|
||||
config.cert_key_backup, cert_path, key_path, list_path)
|
||||
|
||||
@classmethod
|
||||
def _catalog_files(cls, backup_dir, cert_path, key_path, list_path):
|
||||
idx = 0
|
||||
if os.path.isfile(list_path):
|
||||
with open(list_path, "r+b") as csvfile:
|
||||
csvreader = csv.reader(csvfile)
|
||||
|
||||
# Find the highest index in the file
|
||||
for row in csvreader:
|
||||
idx = int(row[0]) + 1
|
||||
csvwriter = csv.writer(csvfile)
|
||||
# You must move the files before appending the row
|
||||
cls._copy_files(backup_dir, idx, cert_path, key_path)
|
||||
csvwriter.writerow([str(idx), cert_path, key_path])
|
||||
|
||||
else:
|
||||
with open(list_path, "wb") as csvfile:
|
||||
csvwriter = csv.writer(csvfile)
|
||||
# You must move the files before appending the row
|
||||
cls._copy_files(backup_dir, idx, cert_path, key_path)
|
||||
csvwriter.writerow([str(idx), cert_path, key_path])
|
||||
|
||||
@classmethod
|
||||
def _copy_files(cls, backup_dir, idx, cert_path, key_path):
|
||||
"""Copies the files into the backup dir appropriately."""
|
||||
shutil.copy2(cert_path, cls._get_backup(backup_dir, idx, cert_path))
|
||||
shutil.copy2(key_path, cls._get_backup(backup_dir, idx, key_path))
|
||||
|
||||
@classmethod
|
||||
def _get_backup(cls, backup_dir, idx, orig_path):
|
||||
"""Returns the path to the backup."""
|
||||
return os.path.join(
|
||||
backup_dir, "{name}_{idx}".format(
|
||||
name=os.path.basename(orig_path), idx=str(idx)))
|
||||
|
||||
|
||||
class Cert(object):
|
||||
"""Cert object used for Revocation convenience.
|
||||
|
||||
:ivar _cert: Certificate
|
||||
:type _cert: :class:`OpenSSL.crypto.X509`
|
||||
|
||||
:ivar int idx: convenience index used for listing
|
||||
:ivar orig: (`str` path - original certificate, `str` status)
|
||||
:type orig: :class:`PathStatus`
|
||||
:ivar orig_key: (`str` path - original auth key, `str` status)
|
||||
:type orig_key: :class:`PathStatus`
|
||||
:ivar str backup_path: backup filepath of the certificate
|
||||
:ivar str backup_key_path: backup filepath of the authorized key
|
||||
|
||||
:ivar list installed: `list` of `str` describing all locations the cert
|
||||
is installed
|
||||
|
||||
"""
|
||||
PathStatus = collections.namedtuple("PathStatus", "path status")
|
||||
"""Convenience container to hold path and status info"""
|
||||
|
||||
DELETED_MSG = "This file has been moved or deleted"
|
||||
CHANGED_MSG = "This file has changed"
|
||||
|
||||
def __init__(self, cert_path):
|
||||
"""Cert initialization
|
||||
|
||||
:param str cert_filepath: Name of file containing certificate in
|
||||
PEM format.
|
||||
|
||||
"""
|
||||
try:
|
||||
with open(cert_path) as cert_file:
|
||||
cert_data = cert_file.read()
|
||||
except IOError:
|
||||
raise errors.RevokerError(
|
||||
"Error loading certificate: %s" % cert_path)
|
||||
|
||||
try:
|
||||
self._cert = OpenSSL.crypto.load_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, cert_data)
|
||||
except OpenSSL.crypto.Error:
|
||||
raise errors.RevokerError(
|
||||
"Error loading certificate: %s" % cert_path)
|
||||
|
||||
self.idx = -1
|
||||
|
||||
self.orig = None
|
||||
self.orig_key = None
|
||||
self.backup_path = ""
|
||||
self.backup_key_path = ""
|
||||
|
||||
self.installed = ["Unknown"]
|
||||
|
||||
@classmethod
|
||||
def fromrow(cls, row, backup_dir):
|
||||
# pylint: disable=protected-access
|
||||
"""Initialize Cert from a csv row."""
|
||||
idx = int(row[0])
|
||||
backup = Revoker._get_backup(backup_dir, idx, row[1])
|
||||
backup_key = Revoker._get_backup(backup_dir, idx, row[2])
|
||||
|
||||
obj = cls(backup)
|
||||
obj.add_meta(idx, row[1], row[2], backup, backup_key)
|
||||
return obj
|
||||
|
||||
def get_row(self):
|
||||
"""Returns a list in CSV format. If meta data is available."""
|
||||
if self.orig is not None and self.orig_key is not None:
|
||||
return [str(self.idx), self.orig.path, self.orig_key.path]
|
||||
return None
|
||||
|
||||
def add_meta(self, idx, orig, orig_key, backup, backup_key):
|
||||
"""Add meta data to cert
|
||||
|
||||
:param int idx: convenience index for revoker
|
||||
:param tuple orig: (`str` original certificate filepath, `str` status)
|
||||
:param tuple orig_key: (`str` original auth key path, `str` status)
|
||||
:param str backup: backup certificate filepath
|
||||
:param str backup_key: backup key filepath
|
||||
|
||||
"""
|
||||
status = ""
|
||||
key_status = ""
|
||||
|
||||
# Verify original cert path
|
||||
if not os.path.isfile(orig):
|
||||
status = Cert.DELETED_MSG
|
||||
else:
|
||||
with open(orig) as orig_file:
|
||||
orig_data = orig_file.read()
|
||||
o_cert = OpenSSL.crypto.load_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, orig_data)
|
||||
if self.get_fingerprint() != o_cert.digest("sha1"):
|
||||
status = Cert.CHANGED_MSG
|
||||
|
||||
# Verify original key path
|
||||
if not os.path.isfile(orig_key):
|
||||
key_status = Cert.DELETED_MSG
|
||||
else:
|
||||
with open(orig_key, "r") as fd:
|
||||
key_pem = fd.read()
|
||||
with open(backup_key, "r") as fd:
|
||||
backup_key_pem = fd.read()
|
||||
if key_pem != backup_key_pem:
|
||||
key_status = Cert.CHANGED_MSG
|
||||
|
||||
self.idx = idx
|
||||
self.orig = Cert.PathStatus(orig, status)
|
||||
self.orig_key = Cert.PathStatus(orig_key, key_status)
|
||||
self.backup_path = backup
|
||||
self.backup_key_path = backup_key
|
||||
|
||||
def get_cn(self):
|
||||
"""Get common name."""
|
||||
return self._cert.get_subject().CN
|
||||
|
||||
def get_fingerprint(self):
|
||||
"""Get SHA1 fingerprint."""
|
||||
return self._cert.digest("sha1")
|
||||
|
||||
def get_not_before(self):
|
||||
"""Get not_valid_before field."""
|
||||
return crypto_util.asn1_generalizedtime_to_dt(
|
||||
self._cert.get_notBefore())
|
||||
|
||||
def get_not_after(self):
|
||||
"""Get not_valid_after field."""
|
||||
return crypto_util.asn1_generalizedtime_to_dt(
|
||||
self._cert.get_notAfter())
|
||||
|
||||
def get_der(self):
|
||||
"""Get certificate in der format."""
|
||||
return OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_ASN1, self._cert)
|
||||
|
||||
def get_pub_key(self):
|
||||
"""Get public key size.
|
||||
|
||||
.. todo:: Support for ECC
|
||||
|
||||
"""
|
||||
return "RSA {0}".format(self._cert.get_pubkey().bits)
|
||||
|
||||
def get_san(self):
|
||||
"""Get subject alternative name if available."""
|
||||
# pylint: disable=protected-access
|
||||
return ", ".join(acme_crypto_util._pyopenssl_cert_or_req_san(self._cert))
|
||||
|
||||
def __str__(self):
|
||||
text = [
|
||||
"Subject: %s" % crypto_util.pyopenssl_x509_name_as_text(
|
||||
self._cert.get_subject()),
|
||||
"SAN: %s" % self.get_san(),
|
||||
"Issuer: %s" % crypto_util.pyopenssl_x509_name_as_text(
|
||||
self._cert.get_issuer()),
|
||||
"Public Key: %s" % self.get_pub_key(),
|
||||
"Not Before: %s" % str(self.get_not_before()),
|
||||
"Not After: %s" % str(self.get_not_after()),
|
||||
"Serial Number: %s" % self._cert.get_serial_number(),
|
||||
"SHA1: %s%s" % (self.get_fingerprint(), os.linesep),
|
||||
"Installed: %s" % ", ".join(self.installed),
|
||||
]
|
||||
|
||||
if self.orig is not None:
|
||||
if self.orig.status == "":
|
||||
text.append("Path: %s" % self.orig.path)
|
||||
else:
|
||||
text.append("Orig Path: %s (%s)" % self.orig)
|
||||
if self.orig_key is not None:
|
||||
if self.orig_key.status == "":
|
||||
text.append("Auth Key Path: %s" % self.orig_key.path)
|
||||
else:
|
||||
text.append("Orig Auth Key Path: %s (%s)" % self.orig_key)
|
||||
|
||||
text.append("")
|
||||
return os.linesep.join(text)
|
||||
|
||||
def pretty_print(self):
|
||||
"""Nicely frames a cert str"""
|
||||
frame = "-" * (display_util.WIDTH - 4) + os.linesep
|
||||
return "{frame}{cert}{frame}".format(frame=frame, cert=str(self))
|
||||
|
|
@ -113,7 +113,7 @@ class ClientTest(unittest.TestCase):
|
|||
mock_crypto_util.init_save_key.assert_called_once_with(
|
||||
self.config.rsa_key_size, self.config.key_dir)
|
||||
mock_crypto_util.init_save_csr.assert_called_once_with(
|
||||
mock.sentinel.key, domains, self.config.cert_dir)
|
||||
mock.sentinel.key, domains, self.config.csr_dir)
|
||||
self._check_obtain_certificate()
|
||||
|
||||
@mock.patch("letsencrypt.client.zope.component.getUtility")
|
||||
|
|
@ -178,6 +178,39 @@ class ClientTest(unittest.TestCase):
|
|||
|
||||
shutil.rmtree(tmp_path)
|
||||
|
||||
def test_deploy_certificate(self):
|
||||
self.assertRaises(errors.Error, self.client.deploy_certificate,
|
||||
["foo.bar"], "key", "cert", "chain")
|
||||
|
||||
installer = mock.MagicMock()
|
||||
self.client.installer = installer
|
||||
|
||||
self.client.deploy_certificate(["foo.bar"], "key", "cert", "chain")
|
||||
installer.deploy_cert.assert_called_once_with(
|
||||
"foo.bar", os.path.abspath("cert"),
|
||||
os.path.abspath("key"), os.path.abspath("chain"))
|
||||
self.assertEqual(installer.save.call_count, 1)
|
||||
installer.restart.assert_called_once_with()
|
||||
|
||||
@mock.patch("letsencrypt.client.enhancements")
|
||||
def test_enhance_config(self, mock_enhancements):
|
||||
self.assertRaises(errors.Error,
|
||||
self.client.enhance_config, ["foo.bar"])
|
||||
|
||||
mock_enhancements.ask.return_value = True
|
||||
installer = mock.MagicMock()
|
||||
self.client.installer = installer
|
||||
|
||||
self.client.enhance_config(["foo.bar"])
|
||||
installer.enhance.assert_called_once_with("foo.bar", "redirect")
|
||||
self.assertEqual(installer.save.call_count, 1)
|
||||
installer.restart.assert_called_once_with()
|
||||
|
||||
installer.enhance.side_effect = errors.PluginError
|
||||
self.assertRaises(errors.PluginError,
|
||||
self.client.enhance_config, ["foo.bar"], True)
|
||||
installer.recovery_routine.assert_called_once_with()
|
||||
|
||||
|
||||
class RollbackTest(unittest.TestCase):
|
||||
"""Tests for letsencrypt.client.rollback."""
|
||||
|
|
|
|||
|
|
@ -32,8 +32,8 @@ class NamespaceConfigTest(unittest.TestCase):
|
|||
def test_dynamic_dirs(self, constants):
|
||||
constants.ACCOUNTS_DIR = 'acc'
|
||||
constants.BACKUP_DIR = 'backups'
|
||||
constants.CERT_KEY_BACKUP_DIR = 'c/'
|
||||
constants.CERT_DIR = 'certs'
|
||||
constants.CSR_DIR = 'csr'
|
||||
|
||||
constants.IN_PROGRESS_DIR = '../p'
|
||||
constants.KEY_DIR = 'keys'
|
||||
constants.TEMP_CHECKPOINT_DIR = 't'
|
||||
|
|
@ -41,9 +41,7 @@ class NamespaceConfigTest(unittest.TestCase):
|
|||
self.assertEqual(
|
||||
self.config.accounts_dir, '/tmp/config/acc/acme-server.org:443/new')
|
||||
self.assertEqual(self.config.backup_dir, '/tmp/foo/backups')
|
||||
self.assertEqual(self.config.cert_dir, '/tmp/config/certs')
|
||||
self.assertEqual(
|
||||
self.config.cert_key_backup, '/tmp/foo/c/acme-server.org:443/new')
|
||||
self.assertEqual(self.config.csr_dir, '/tmp/config/csr')
|
||||
self.assertEqual(self.config.in_progress_dir, '/tmp/foo/../p')
|
||||
self.assertEqual(self.config.key_dir, '/tmp/config/keys')
|
||||
self.assertEqual(self.config.temp_checkpoint_dir, '/tmp/foo/t')
|
||||
|
|
|
|||
|
|
@ -1,97 +0,0 @@
|
|||
"""Test :mod:`letsencrypt.display.revocation`."""
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
import zope.component
|
||||
|
||||
from letsencrypt.display import util as display_util
|
||||
|
||||
from letsencrypt.tests import test_util
|
||||
|
||||
|
||||
class DisplayCertsTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
from letsencrypt.revoker import Cert
|
||||
self.cert0 = Cert(test_util.vector_path("cert.pem"))
|
||||
self.cert1 = Cert(test_util.vector_path("cert-san.pem"))
|
||||
|
||||
self.certs = [self.cert0, self.cert1]
|
||||
|
||||
zope.component.provideUtility(display_util.FileDisplay(sys.stdout))
|
||||
|
||||
@classmethod
|
||||
def _call(cls, certs):
|
||||
from letsencrypt.display.revocation import display_certs
|
||||
return display_certs(certs)
|
||||
|
||||
@mock.patch("letsencrypt.display.revocation.util")
|
||||
def test_revocation(self, mock_util):
|
||||
mock_util().menu.return_value = (display_util.OK, 0)
|
||||
|
||||
code, choice = self._call(self.certs)
|
||||
|
||||
self.assertEqual(display_util.OK, code)
|
||||
self.assertEqual(self.certs[choice], self.cert0)
|
||||
|
||||
@mock.patch("letsencrypt.display.revocation.util")
|
||||
def test_cancel(self, mock_util):
|
||||
mock_util().menu.return_value = (display_util.CANCEL, -1)
|
||||
|
||||
code, _ = self._call(self.certs)
|
||||
self.assertEqual(display_util.CANCEL, code)
|
||||
|
||||
|
||||
class MoreInfoCertTest(unittest.TestCase):
|
||||
# pylint: disable=too-few-public-methods
|
||||
@classmethod
|
||||
def _call(cls, cert):
|
||||
from letsencrypt.display.revocation import more_info_cert
|
||||
more_info_cert(cert)
|
||||
|
||||
@mock.patch("letsencrypt.display.revocation.util")
|
||||
def test_more_info(self, mock_util):
|
||||
self._call(mock.MagicMock())
|
||||
|
||||
self.assertEqual(mock_util().notification.call_count, 1)
|
||||
|
||||
|
||||
class SuccessRevocationTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
from letsencrypt.revoker import Cert
|
||||
self.cert = Cert(test_util.vector_path("cert.pem"))
|
||||
|
||||
@classmethod
|
||||
def _call(cls, cert):
|
||||
from letsencrypt.display.revocation import success_revocation
|
||||
success_revocation(cert)
|
||||
|
||||
# Pretty trivial test... something is displayed...
|
||||
@mock.patch("letsencrypt.display.revocation.util")
|
||||
def test_success_revocation(self, mock_util):
|
||||
self._call(self.cert)
|
||||
|
||||
self.assertEqual(mock_util().notification.call_count, 1)
|
||||
|
||||
|
||||
class ConfirmRevocationTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
from letsencrypt.revoker import Cert
|
||||
self.cert = Cert(test_util.vector_path("cert.pem"))
|
||||
|
||||
@classmethod
|
||||
def _call(cls, cert):
|
||||
from letsencrypt.display.revocation import confirm_revocation
|
||||
return confirm_revocation(cert)
|
||||
|
||||
@mock.patch("letsencrypt.display.revocation.util")
|
||||
def test_confirm_revocation(self, mock_util):
|
||||
mock_util().yesno.return_value = True
|
||||
self.assertTrue(self._call(self.cert))
|
||||
|
||||
mock_util().yesno.return_value = False
|
||||
self.assertFalse(self._call(self.cert))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main() # pragma: no cover
|
||||
55
letsencrypt/tests/error_handler_test.py
Normal file
55
letsencrypt/tests/error_handler_test.py
Normal file
|
|
@ -0,0 +1,55 @@
|
|||
"""Tests for letsencrypt.error_handler."""
|
||||
import signal
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
|
||||
|
||||
class ErrorHandlerTest(unittest.TestCase):
|
||||
"""Tests for letsencrypt.error_handler."""
|
||||
|
||||
def setUp(self):
|
||||
from letsencrypt import error_handler
|
||||
|
||||
self.init_func = mock.MagicMock()
|
||||
self.handler = error_handler.ErrorHandler(self.init_func)
|
||||
# pylint: disable=protected-access
|
||||
self.signals = error_handler._SIGNALS
|
||||
|
||||
def test_context_manager(self):
|
||||
try:
|
||||
with self.handler:
|
||||
raise ValueError
|
||||
except ValueError:
|
||||
pass
|
||||
self.init_func.assert_called_once_with()
|
||||
|
||||
@mock.patch('letsencrypt.error_handler.os')
|
||||
@mock.patch('letsencrypt.error_handler.signal')
|
||||
def test_signal_handler(self, mock_signal, mock_os):
|
||||
# pylint: disable=protected-access
|
||||
mock_signal.getsignal.return_value = signal.SIG_DFL
|
||||
self.handler.set_signal_handlers()
|
||||
signal_handler = self.handler._signal_handler
|
||||
for signum in self.signals:
|
||||
mock_signal.signal.assert_any_call(signum, signal_handler)
|
||||
|
||||
signum = self.signals[0]
|
||||
signal_handler(signum, None)
|
||||
self.init_func.assert_called_once_with()
|
||||
mock_os.kill.assert_called_once_with(mock_os.getpid(), signum)
|
||||
|
||||
self.handler.reset_signal_handlers()
|
||||
for signum in self.signals:
|
||||
mock_signal.signal.assert_any_call(signum, signal.SIG_DFL)
|
||||
|
||||
def test_bad_recovery(self):
|
||||
bad_func = mock.MagicMock(side_effect=[ValueError])
|
||||
self.handler.register(bad_func)
|
||||
self.handler.call_registered()
|
||||
self.init_func.assert_called_once_with()
|
||||
bad_func.assert_called_once_with()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main() # pragma: no cover
|
||||
|
|
@ -1,409 +0,0 @@
|
|||
"""Test letsencrypt.revoker."""
|
||||
import csv
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
import OpenSSL
|
||||
|
||||
from letsencrypt import errors
|
||||
from letsencrypt import le_util
|
||||
from letsencrypt.display import util as display_util
|
||||
|
||||
from letsencrypt.tests import test_util
|
||||
|
||||
|
||||
KEY = OpenSSL.crypto.load_privatekey(
|
||||
OpenSSL.crypto.FILETYPE_PEM, test_util.load_vector("rsa512_key.pem"))
|
||||
|
||||
|
||||
class RevokerBase(unittest.TestCase): # pylint: disable=too-few-public-methods
|
||||
"""Base Class for Revoker Tests."""
|
||||
def setUp(self):
|
||||
self.paths, self.certs, self.key_path = create_revoker_certs()
|
||||
|
||||
self.backup_dir = tempfile.mkdtemp("cert_backup")
|
||||
self.mock_config = mock.MagicMock(cert_key_backup=self.backup_dir)
|
||||
|
||||
self.list_path = os.path.join(self.backup_dir, "LIST")
|
||||
|
||||
def _store_certs(self):
|
||||
# pylint: disable=protected-access
|
||||
from letsencrypt.revoker import Revoker
|
||||
Revoker.store_cert_key(self.paths[0], self.key_path, self.mock_config)
|
||||
Revoker.store_cert_key(self.paths[1], self.key_path, self.mock_config)
|
||||
|
||||
# Set metadata
|
||||
for i in xrange(2):
|
||||
self.certs[i].add_meta(
|
||||
i, self.paths[i], self.key_path,
|
||||
Revoker._get_backup(self.backup_dir, i, self.paths[i]),
|
||||
Revoker._get_backup(self.backup_dir, i, self.key_path))
|
||||
|
||||
def _get_rows(self):
|
||||
with open(self.list_path, "rb") as csvfile:
|
||||
return [row for row in csv.reader(csvfile)]
|
||||
|
||||
def _write_rows(self, rows):
|
||||
with open(self.list_path, "wb") as csvfile:
|
||||
csvwriter = csv.writer(csvfile)
|
||||
for row in rows:
|
||||
csvwriter.writerow(row)
|
||||
|
||||
|
||||
class RevokerTest(RevokerBase):
|
||||
def setUp(self):
|
||||
from letsencrypt.revoker import Revoker
|
||||
super(RevokerTest, self).setUp()
|
||||
|
||||
with open(self.key_path) as key_file:
|
||||
self.key = le_util.Key(self.key_path, key_file.read())
|
||||
|
||||
self._store_certs()
|
||||
|
||||
self.revoker = Revoker(
|
||||
installer=mock.MagicMock(), config=self.mock_config)
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.backup_dir)
|
||||
|
||||
@mock.patch("acme.client.Client.revoke")
|
||||
@mock.patch("letsencrypt.revoker.revocation")
|
||||
def test_revoke_by_key_all(self, mock_display, mock_acme):
|
||||
mock_display().confirm_revocation.return_value = True
|
||||
|
||||
self.revoker.revoke_from_key(self.key)
|
||||
self.assertEqual(self._get_rows(), [])
|
||||
|
||||
# Check to make sure backups were eliminated
|
||||
for i in xrange(2):
|
||||
self.assertFalse(self._backups_exist(self.certs[i].get_row()))
|
||||
|
||||
self.assertEqual(mock_acme.call_count, 2)
|
||||
|
||||
@mock.patch("letsencrypt.revoker.OpenSSL.crypto.load_privatekey")
|
||||
def test_revoke_by_invalid_keys(self, mock_load_privatekey):
|
||||
mock_load_privatekey.side_effect = OpenSSL.crypto.Error
|
||||
self.assertRaises(
|
||||
errors.RevokerError, self.revoker.revoke_from_key, self.key)
|
||||
|
||||
mock_load_privatekey.side_effect = [KEY, OpenSSL.crypto.Error]
|
||||
self.assertRaises(
|
||||
errors.RevokerError, self.revoker.revoke_from_key, self.key)
|
||||
|
||||
@mock.patch("acme.client.Client.revoke")
|
||||
@mock.patch("letsencrypt.revoker.revocation")
|
||||
def test_revoke_by_wrong_key(self, mock_display, mock_acme):
|
||||
mock_display().confirm_revocation.return_value = True
|
||||
|
||||
key_path = test_util.vector_path("rsa256_key.pem")
|
||||
|
||||
wrong_key = le_util.Key(key_path, open(key_path).read())
|
||||
self.revoker.revoke_from_key(wrong_key)
|
||||
|
||||
# Nothing was removed
|
||||
self.assertEqual(len(self._get_rows()), 2)
|
||||
# No revocation went through
|
||||
self.assertEqual(mock_acme.call_count, 0)
|
||||
|
||||
@mock.patch("acme.client.Client.revoke")
|
||||
@mock.patch("letsencrypt.revoker.revocation")
|
||||
def test_revoke_by_cert(self, mock_display, mock_acme):
|
||||
mock_display().confirm_revocation.return_value = True
|
||||
|
||||
self.revoker.revoke_from_cert(self.paths[1])
|
||||
|
||||
row0 = self.certs[0].get_row()
|
||||
row1 = self.certs[1].get_row()
|
||||
|
||||
self.assertEqual(self._get_rows(), [row0])
|
||||
|
||||
self.assertTrue(self._backups_exist(row0))
|
||||
self.assertFalse(self._backups_exist(row1))
|
||||
|
||||
self.assertEqual(mock_acme.call_count, 1)
|
||||
|
||||
@mock.patch("acme.client.Client.revoke")
|
||||
@mock.patch("letsencrypt.revoker.revocation")
|
||||
def test_revoke_by_cert_not_found(self, mock_display, mock_acme):
|
||||
mock_display().confirm_revocation.return_value = True
|
||||
|
||||
self.revoker.revoke_from_cert(self.paths[0])
|
||||
self.revoker.revoke_from_cert(self.paths[0])
|
||||
|
||||
row0 = self.certs[0].get_row()
|
||||
row1 = self.certs[1].get_row()
|
||||
|
||||
# Same check as last time... just reversed.
|
||||
self.assertEqual(self._get_rows(), [row1])
|
||||
|
||||
self.assertTrue(self._backups_exist(row1))
|
||||
self.assertFalse(self._backups_exist(row0))
|
||||
|
||||
self.assertEqual(mock_acme.call_count, 1)
|
||||
|
||||
@mock.patch("acme.client.Client.revoke")
|
||||
@mock.patch("letsencrypt.revoker.revocation")
|
||||
def test_revoke_by_menu(self, mock_display, mock_acme):
|
||||
mock_display().confirm_revocation.return_value = True
|
||||
mock_display.display_certs.side_effect = [
|
||||
(display_util.HELP, 0),
|
||||
(display_util.OK, 0),
|
||||
(display_util.CANCEL, -1),
|
||||
]
|
||||
|
||||
self.revoker.revoke_from_menu()
|
||||
|
||||
row0 = self.certs[0].get_row()
|
||||
row1 = self.certs[1].get_row()
|
||||
|
||||
self.assertEqual(self._get_rows(), [row1])
|
||||
|
||||
self.assertFalse(self._backups_exist(row0))
|
||||
self.assertTrue(self._backups_exist(row1))
|
||||
|
||||
self.assertEqual(mock_acme.call_count, 1)
|
||||
self.assertEqual(mock_display.more_info_cert.call_count, 1)
|
||||
|
||||
@mock.patch("letsencrypt.revoker.logger")
|
||||
@mock.patch("acme.client.Client.revoke")
|
||||
@mock.patch("letsencrypt.revoker.revocation")
|
||||
def test_revoke_by_menu_delete_all(self, mock_display, mock_acme, mock_log):
|
||||
mock_display().confirm_revocation.return_value = True
|
||||
mock_display.display_certs.return_value = (display_util.OK, 0)
|
||||
|
||||
self.revoker.revoke_from_menu()
|
||||
|
||||
self.assertEqual(self._get_rows(), [])
|
||||
|
||||
# Everything should be deleted...
|
||||
for i in xrange(2):
|
||||
self.assertFalse(self._backups_exist(self.certs[i].get_row()))
|
||||
|
||||
self.assertEqual(mock_acme.call_count, 2)
|
||||
# Info is called when there aren't any certs left...
|
||||
self.assertTrue(mock_log.info.called)
|
||||
|
||||
@mock.patch("letsencrypt.revoker.revocation")
|
||||
@mock.patch("letsencrypt.revoker.Revoker._acme_revoke")
|
||||
@mock.patch("letsencrypt.revoker.logger")
|
||||
def test_safe_revoke_acme_fail(self, mock_log, mock_revoke, mock_display):
|
||||
# pylint: disable=protected-access
|
||||
mock_revoke.side_effect = errors.Error
|
||||
mock_display().confirm_revocation.return_value = True
|
||||
|
||||
self.revoker._safe_revoke(self.certs)
|
||||
self.assertTrue(mock_log.error.called)
|
||||
|
||||
@mock.patch("letsencrypt.revoker.OpenSSL.crypto.load_privatekey")
|
||||
def test_acme_revoke_failure(self, mock_load_privatekey):
|
||||
# pylint: disable=protected-access
|
||||
mock_load_privatekey.side_effect = OpenSSL.crypto.Error
|
||||
self.assertRaises(
|
||||
errors.Error, self.revoker._acme_revoke, self.certs[0])
|
||||
|
||||
def test_remove_certs_from_list_bad_certs(self):
|
||||
# pylint: disable=protected-access
|
||||
from letsencrypt.revoker import Cert
|
||||
|
||||
new_cert = Cert(self.paths[0])
|
||||
|
||||
# This isn't stored in the db
|
||||
new_cert.idx = 10
|
||||
new_cert.backup_path = self.paths[0]
|
||||
new_cert.backup_key_path = self.key_path
|
||||
new_cert.orig = Cert.PathStatus("false path", "not here")
|
||||
new_cert.orig_key = Cert.PathStatus("false path", "not here")
|
||||
|
||||
self.assertRaises(errors.RevokerError,
|
||||
self.revoker._remove_certs_from_list, [new_cert])
|
||||
|
||||
def _backups_exist(self, row):
|
||||
# pylint: disable=protected-access
|
||||
cert_path, key_path = self.revoker._row_to_backup(row)
|
||||
return os.path.isfile(cert_path) and os.path.isfile(key_path)
|
||||
|
||||
|
||||
class RevokerInstallerTest(RevokerBase):
|
||||
def setUp(self):
|
||||
super(RevokerInstallerTest, self).setUp()
|
||||
|
||||
self.installs = [
|
||||
["installation/path0a", "installation/path0b"],
|
||||
["installation/path1"],
|
||||
]
|
||||
|
||||
self.certs_keys = [
|
||||
(self.paths[0], self.key_path, self.installs[0][0]),
|
||||
(self.paths[0], self.key_path, self.installs[0][1]),
|
||||
(self.paths[1], self.key_path, self.installs[1][0]),
|
||||
]
|
||||
|
||||
self._store_certs()
|
||||
|
||||
def _get_revoker(self, installer):
|
||||
from letsencrypt.revoker import Revoker
|
||||
return Revoker(installer, self.mock_config)
|
||||
|
||||
def test_no_installer_get_installed_locations(self):
|
||||
# pylint: disable=protected-access
|
||||
revoker = self._get_revoker(None)
|
||||
self.assertEqual(revoker._get_installed_locations(), {})
|
||||
|
||||
def test_get_installed_locations(self):
|
||||
# pylint: disable=protected-access
|
||||
mock_installer = mock.MagicMock()
|
||||
mock_installer.get_all_certs_keys.return_value = self.certs_keys
|
||||
|
||||
revoker = self._get_revoker(mock_installer)
|
||||
sha_vh = revoker._get_installed_locations()
|
||||
|
||||
self.assertEqual(len(sha_vh), 2)
|
||||
for i, cert in enumerate(self.certs):
|
||||
self.assertTrue(cert.get_fingerprint() in sha_vh)
|
||||
self.assertEqual(
|
||||
sha_vh[cert.get_fingerprint()], self.installs[i])
|
||||
|
||||
@mock.patch("letsencrypt.revoker.OpenSSL.crypto.load_certificate")
|
||||
def test_get_installed_load_failure(self, mock_load_certificate):
|
||||
mock_installer = mock.MagicMock()
|
||||
mock_installer.get_all_certs_keys.return_value = self.certs_keys
|
||||
|
||||
mock_load_certificate.side_effect = OpenSSL.crypto.Error
|
||||
|
||||
revoker = self._get_revoker(mock_installer)
|
||||
|
||||
# pylint: disable=protected-access
|
||||
self.assertEqual(revoker._get_installed_locations(), {})
|
||||
|
||||
def test_get_installed_load_failure_open(self):
|
||||
tmp = tempfile.mkdtemp()
|
||||
mock_installer = mock.MagicMock()
|
||||
mock_installer.get_all_certs_keys.return_value = [(
|
||||
os.path.join(tmp, 'missing'), None, None)]
|
||||
revoker = self._get_revoker(mock_installer)
|
||||
# pylint: disable=protected-access
|
||||
self.assertEqual(revoker._get_installed_locations(), {})
|
||||
os.rmdir(tmp)
|
||||
|
||||
|
||||
class RevokerClassMethodsTest(RevokerBase):
|
||||
def setUp(self):
|
||||
super(RevokerClassMethodsTest, self).setUp()
|
||||
self.mock_config = mock.MagicMock(cert_key_backup=self.backup_dir)
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.backup_dir)
|
||||
|
||||
def _call(self, cert_path, key_path):
|
||||
from letsencrypt.revoker import Revoker
|
||||
Revoker.store_cert_key(cert_path, key_path, self.mock_config)
|
||||
|
||||
def test_store_two(self):
|
||||
from letsencrypt.revoker import Revoker
|
||||
self._call(self.paths[0], self.key_path)
|
||||
self._call(self.paths[1], self.key_path)
|
||||
|
||||
self.assertTrue(os.path.isfile(self.list_path))
|
||||
rows = self._get_rows()
|
||||
|
||||
for i, row in enumerate(rows):
|
||||
# pylint: disable=protected-access
|
||||
self.assertTrue(os.path.isfile(
|
||||
Revoker._get_backup(self.backup_dir, i, self.paths[i])))
|
||||
self.assertTrue(os.path.isfile(
|
||||
Revoker._get_backup(self.backup_dir, i, self.key_path)))
|
||||
self.assertEqual([str(i), self.paths[i], self.key_path], row)
|
||||
|
||||
self.assertEqual(len(rows), 2)
|
||||
|
||||
def test_store_one_mixed(self):
|
||||
from letsencrypt.revoker import Revoker
|
||||
self._write_rows(
|
||||
[["5", "blank", "blank"], ["18", "dc", "dc"], ["21", "b", "b"]])
|
||||
self._call(self.paths[0], self.key_path)
|
||||
|
||||
self.assertEqual(
|
||||
self._get_rows()[3], ["22", self.paths[0], self.key_path])
|
||||
|
||||
# pylint: disable=protected-access
|
||||
self.assertTrue(os.path.isfile(
|
||||
Revoker._get_backup(self.backup_dir, 22, self.paths[0])))
|
||||
self.assertTrue(os.path.isfile(
|
||||
Revoker._get_backup(self.backup_dir, 22, self.key_path)))
|
||||
|
||||
|
||||
class CertTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.paths, self.certs, self.key_path = create_revoker_certs()
|
||||
|
||||
def test_failed_load(self):
|
||||
from letsencrypt.revoker import Cert
|
||||
self.assertRaises(errors.RevokerError, Cert, self.key_path)
|
||||
|
||||
def test_failed_load_open(self):
|
||||
tmp = tempfile.mkdtemp()
|
||||
from letsencrypt.revoker import Cert
|
||||
self.assertRaises(
|
||||
errors.RevokerError, Cert, os.path.join(tmp, 'missing'))
|
||||
os.rmdir(tmp)
|
||||
|
||||
def test_no_row(self):
|
||||
self.assertEqual(self.certs[0].get_row(), None)
|
||||
|
||||
def test_meta_moved_files(self):
|
||||
from letsencrypt.revoker import Cert
|
||||
fake_path = "/not/a/real/path/r72d3t6"
|
||||
self.certs[0].add_meta(
|
||||
0, fake_path, fake_path, self.paths[0], self.key_path)
|
||||
|
||||
self.assertEqual(self.certs[0].orig.status, Cert.DELETED_MSG)
|
||||
self.assertEqual(self.certs[0].orig_key.status, Cert.DELETED_MSG)
|
||||
|
||||
def test_meta_changed_files(self):
|
||||
from letsencrypt.revoker import Cert
|
||||
self.certs[0].add_meta(
|
||||
0, self.paths[1], self.paths[1], self.paths[0], self.key_path)
|
||||
|
||||
self.assertEqual(self.certs[0].orig.status, Cert.CHANGED_MSG)
|
||||
self.assertEqual(self.certs[0].orig_key.status, Cert.CHANGED_MSG)
|
||||
|
||||
def test_meta_no_status(self):
|
||||
self.certs[0].add_meta(
|
||||
0, self.paths[0], self.key_path, self.paths[0], self.key_path)
|
||||
|
||||
self.assertEqual(self.certs[0].orig.status, "")
|
||||
self.assertEqual(self.certs[0].orig_key.status, "")
|
||||
|
||||
def test_print_meta(self):
|
||||
"""Just make sure there aren't any major errors."""
|
||||
self.certs[0].add_meta(
|
||||
0, self.paths[0], self.key_path, self.paths[0], self.key_path)
|
||||
# Changed path and deleted file
|
||||
self.certs[1].add_meta(
|
||||
1, self.paths[0], "/not/a/path", self.paths[1], self.key_path)
|
||||
self.assertTrue(self.certs[0].pretty_print())
|
||||
self.assertTrue(self.certs[1].pretty_print())
|
||||
|
||||
def test_print_no_meta(self):
|
||||
self.assertTrue(self.certs[0].pretty_print())
|
||||
self.assertTrue(self.certs[1].pretty_print())
|
||||
|
||||
|
||||
def create_revoker_certs():
|
||||
"""Create a few revoker.Cert objects."""
|
||||
cert0_path = test_util.vector_path("cert.pem")
|
||||
cert1_path = test_util.vector_path("cert-san.pem")
|
||||
key_path = test_util.vector_path("rsa512_key.pem")
|
||||
|
||||
from letsencrypt.revoker import Cert
|
||||
cert0 = Cert(cert0_path)
|
||||
cert1 = Cert(cert1_path)
|
||||
|
||||
return [cert0_path, cert1_path], [cert0, cert1], key_path
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main() # pragma: no cover
|
||||
|
|
@ -16,7 +16,7 @@ fi
|
|||
|
||||
cover () {
|
||||
if [ "$1" = "letsencrypt" ]; then
|
||||
min=97
|
||||
min=96
|
||||
elif [ "$1" = "acme" ]; then
|
||||
min=100
|
||||
elif [ "$1" = "letsencrypt_apache" ]; then
|
||||
|
|
|
|||
Loading…
Reference in a new issue