mirror of
https://github.com/certbot/certbot.git
synced 2026-06-08 16:22:18 -04:00
* Switching from old branch (issue-4109) and addressing changes requested
in last iteration of review:
80aa857fd2
Requested changes that were addressed:
- fixed outdated docstring for `cert_path_to_lineage`
- removed `full_archive_dir_from_renewal_conf` amd replaced with `full_archive_path` (and `_full_archive_path` -> `full_archive_path`)
- matching on `cert` instead of `chain` in `cert_manager.cert_path_to_lineage`
- fixed the two coding wrongs make a right issue
Requested changes which were not addressed:
- moving `cert_path_to_lineage` from `cert_manager` to `storage`,
as it would introduce a hard to resolve circular dependency.
* Update integration tests to handle default deletion after revoke.
* Swapping test domains.
* Addressing PR feedback:
- calling storage.full_archive_path with a ConfigObj instead of None
- Removing lambda x: x.chain_path as an option to match against
* Addressing PR feedback: it's expected that len(pattern) is 0, so handle that case properly.
* Testing of conflicting values of --cert-name and --cert-path non-interactive mode.
* Silly test for when neither certname nor cert-path were specified.
* Changing archive_files to a private function, because mocking nested functions seems impossible.
* Tests for storage.cert_path_for_cert_name
* Splitting out _acceptable_matches
* Some tests for cert_manager.cert_path_to_lineage
* Offerings to the Lint God
* Cleaner way of dealing with files in archive dirs
* Handling the two different use cases of match_and_check_overlaps a bit better
* late night syntax errors
* Test for when multiple lineages share an archive dir
* Tests for certbot.cert_manager.match_and_check_overlaps
* Removing unneeded nesting
* Lint errors that Travis caught that didn't show up locally
* Adding two integration tests (matching & mismatched --cert-path, --cert-name) based on feedback.
* Asking the user if they want to delete in interactive mode.
This commit is contained in:
parent
356471cdf6
commit
3087b436f3
8 changed files with 601 additions and 49 deletions
|
|
@ -3,6 +3,7 @@ import datetime
|
|||
import logging
|
||||
import os
|
||||
import pytz
|
||||
import re
|
||||
import traceback
|
||||
import zope.component
|
||||
|
||||
|
|
@ -142,6 +143,129 @@ def find_duplicative_certs(config, domains):
|
|||
|
||||
return _search_lineages(config, update_certs_for_domain_matches, (None, None))
|
||||
|
||||
def _archive_files(candidate_lineage, filetype):
|
||||
""" In order to match things like:
|
||||
/etc/letsencrypt/archive/example.com/chain1.pem.
|
||||
|
||||
Anonymous functions which call this function are eventually passed (in a list) to
|
||||
`match_and_check_overlaps` to help specify the acceptable_matches.
|
||||
|
||||
:param `.storage.RenewableCert` candidate_lineage: Lineage whose archive dir is to
|
||||
be searched.
|
||||
:param str filetype: main file name prefix e.g. "fullchain" or "chain".
|
||||
|
||||
:returns: Files in candidate_lineage's archive dir that match the provided filetype.
|
||||
:rtype: list of str or None
|
||||
"""
|
||||
archive_dir = candidate_lineage.archive_dir
|
||||
pattern = [os.path.join(archive_dir, f) for f in os.listdir(archive_dir)
|
||||
if re.match("{0}[0-9]*.pem".format(filetype), f)]
|
||||
if len(pattern) > 0:
|
||||
return pattern
|
||||
else:
|
||||
return None
|
||||
|
||||
def _acceptable_matches():
|
||||
""" Generates the list that's passed to match_and_check_overlaps. Is its own function to
|
||||
make unit testing easier.
|
||||
|
||||
:returns: list of functions
|
||||
:rtype: list
|
||||
"""
|
||||
return [lambda x: x.fullchain_path, lambda x: x.cert_path,
|
||||
lambda x: _archive_files(x, "cert"), lambda x: _archive_files(x, "fullchain")]
|
||||
|
||||
def cert_path_to_lineage(cli_config):
|
||||
""" If config.cert_path is defined, try to find an appropriate value for config.certname.
|
||||
|
||||
:param `configuration.NamespaceConfig` cli_config: parsed command line arguments
|
||||
|
||||
:returns: a lineage name
|
||||
:rtype: str
|
||||
|
||||
:raises `errors.Error`: If the specified cert path can't be matched to a lineage name.
|
||||
:raises `errors.OverlappingMatchFound`: If the matched lineage's archive is shared.
|
||||
"""
|
||||
acceptable_matches = _acceptable_matches()
|
||||
match = match_and_check_overlaps(cli_config, acceptable_matches,
|
||||
lambda x: cli_config.cert_path[0], lambda x: x.lineagename)
|
||||
return match[0]
|
||||
|
||||
def match_and_check_overlaps(cli_config, acceptable_matches, match_func, rv_func):
|
||||
""" Searches through all lineages for a match, and checks for duplicates.
|
||||
If a duplicate is found, an error is raised, as performing operations on lineages
|
||||
that have their properties incorrectly duplicated elsewhere is probably a bad idea.
|
||||
|
||||
:param `configuration.NamespaceConfig` cli_config: parsed command line arguments
|
||||
:param list acceptable_matches: a list of functions that specify acceptable matches
|
||||
:param function match_func: specifies what to match
|
||||
:param function rv_func: specifies what to return
|
||||
|
||||
"""
|
||||
def find_matches(candidate_lineage, return_value, acceptable_matches):
|
||||
"""Returns a list of matches using _search_lineages."""
|
||||
acceptable_matches = [func(candidate_lineage) for func in acceptable_matches]
|
||||
acceptable_matches_rv = []
|
||||
for item in acceptable_matches:
|
||||
if isinstance(item, list):
|
||||
acceptable_matches_rv += item
|
||||
else:
|
||||
acceptable_matches_rv.append(item)
|
||||
match = match_func(candidate_lineage)
|
||||
if match in acceptable_matches_rv:
|
||||
return_value.append(rv_func(candidate_lineage))
|
||||
return return_value
|
||||
|
||||
matched = _search_lineages(cli_config, find_matches, [], acceptable_matches)
|
||||
if not matched:
|
||||
raise errors.Error("No match found for cert-path {0}!".format(cli_config.cert_path[0]))
|
||||
elif len(matched) > 1:
|
||||
raise errors.OverlappingMatchFound()
|
||||
else:
|
||||
return matched
|
||||
|
||||
def human_readable_cert_info(config, cert, skip_filter_checks=False):
|
||||
""" Returns a human readable description of info about a RenewableCert object"""
|
||||
certinfo = []
|
||||
checker = ocsp.RevocationChecker()
|
||||
|
||||
if config.certname and cert.lineagename != config.certname and not skip_filter_checks:
|
||||
return ""
|
||||
if config.domains and not set(config.domains).issubset(cert.names()):
|
||||
return ""
|
||||
now = pytz.UTC.fromutc(datetime.datetime.utcnow())
|
||||
|
||||
reasons = []
|
||||
if cert.is_test_cert:
|
||||
reasons.append('TEST_CERT')
|
||||
if cert.target_expiry <= now:
|
||||
reasons.append('EXPIRED')
|
||||
if checker.ocsp_revoked(cert.cert, cert.chain):
|
||||
reasons.append('REVOKED')
|
||||
|
||||
if reasons:
|
||||
status = "INVALID: " + ", ".join(reasons)
|
||||
else:
|
||||
diff = cert.target_expiry - now
|
||||
if diff.days == 1:
|
||||
status = "VALID: 1 day"
|
||||
elif diff.days < 1:
|
||||
status = "VALID: {0} hour(s)".format(diff.seconds // 3600)
|
||||
else:
|
||||
status = "VALID: {0} days".format(diff.days)
|
||||
|
||||
valid_string = "{0} ({1})".format(cert.target_expiry, status)
|
||||
certinfo.append(" Certificate Name: {0}\n"
|
||||
" Domains: {1}\n"
|
||||
" Expiry Date: {2}\n"
|
||||
" Certificate Path: {3}\n"
|
||||
" Private Key Path: {4}".format(
|
||||
cert.lineagename,
|
||||
" ".join(cert.names()),
|
||||
valid_string,
|
||||
cert.fullchain,
|
||||
cert.privkey))
|
||||
return "".join(certinfo)
|
||||
|
||||
###################
|
||||
# Private Helpers
|
||||
|
|
@ -183,44 +307,8 @@ def _report_lines(msgs):
|
|||
def _report_human_readable(config, parsed_certs):
|
||||
"""Format a results report for a parsed cert"""
|
||||
certinfo = []
|
||||
checker = ocsp.RevocationChecker()
|
||||
for cert in parsed_certs:
|
||||
if config.certname and cert.lineagename != config.certname:
|
||||
continue
|
||||
if config.domains and not set(config.domains).issubset(cert.names()):
|
||||
continue
|
||||
now = pytz.UTC.fromutc(datetime.datetime.utcnow())
|
||||
|
||||
reasons = []
|
||||
if cert.is_test_cert:
|
||||
reasons.append('TEST_CERT')
|
||||
if cert.target_expiry <= now:
|
||||
reasons.append('EXPIRED')
|
||||
if checker.ocsp_revoked(cert.cert, cert.chain):
|
||||
reasons.append('REVOKED')
|
||||
|
||||
if reasons:
|
||||
status = "INVALID: " + ", ".join(reasons)
|
||||
else:
|
||||
diff = cert.target_expiry - now
|
||||
if diff.days == 1:
|
||||
status = "VALID: 1 day"
|
||||
elif diff.days < 1:
|
||||
status = "VALID: {0} hour(s)".format(diff.seconds // 3600)
|
||||
else:
|
||||
status = "VALID: {0} days".format(diff.days)
|
||||
|
||||
valid_string = "{0} ({1})".format(cert.target_expiry, status)
|
||||
certinfo.append(" Certificate Name: {0}\n"
|
||||
" Domains: {1}\n"
|
||||
" Expiry Date: {2}\n"
|
||||
" Certificate Path: {3}\n"
|
||||
" Private Key Path: {4}".format(
|
||||
cert.lineagename,
|
||||
",".join(cert.names()),
|
||||
valid_string,
|
||||
cert.fullchain,
|
||||
cert.privkey))
|
||||
certinfo.append(human_readable_cert_info(config, cert))
|
||||
return "\n".join(certinfo)
|
||||
|
||||
def _describe_certs(config, parsed_certs, parse_failures):
|
||||
|
|
@ -244,11 +332,17 @@ def _describe_certs(config, parsed_certs, parse_failures):
|
|||
disp = zope.component.getUtility(interfaces.IDisplay)
|
||||
disp.notification("\n".join(out), pause=False, wrap=False)
|
||||
|
||||
def _search_lineages(cli_config, func, initial_rv):
|
||||
def _search_lineages(cli_config, func, initial_rv, *args):
|
||||
"""Iterate func over unbroken lineages, allowing custom return conditions.
|
||||
|
||||
Allows flexible customization of return values, including multiple
|
||||
return values and complex checks.
|
||||
|
||||
:param `configuration.NamespaceConfig` cli_config: parsed command line arguments
|
||||
:param function func: function used while searching over lineages
|
||||
:param initial_rv: initial return value of the function (any type)
|
||||
|
||||
:returns: Whatever was specified by `func` if a match is found.
|
||||
"""
|
||||
configs_dir = cli_config.renewal_configs_dir
|
||||
# Verify the directory is there
|
||||
|
|
@ -262,5 +356,5 @@ def _search_lineages(cli_config, func, initial_rv):
|
|||
logger.debug("Renewal conf file %s is broken. Skipping.", renewal_file)
|
||||
logger.debug("Traceback was:\n%s", traceback.format_exc())
|
||||
continue
|
||||
rv = func(candidate_lineage, rv)
|
||||
rv = func(candidate_lineage, rv, *args)
|
||||
return rv
|
||||
|
|
|
|||
|
|
@ -32,6 +32,8 @@ class HookCommandNotFound(Error):
|
|||
class SignalExit(Error):
|
||||
"""A Unix signal was received while in the ErrorHandler context manager."""
|
||||
|
||||
class OverlappingMatchFound(Error):
|
||||
"""Multiple lineages matched what should have been a unique result."""
|
||||
|
||||
class LockError(Error):
|
||||
"""File locking error."""
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import logging.handlers
|
|||
import os
|
||||
import sys
|
||||
|
||||
import configobj
|
||||
import zope.component
|
||||
|
||||
from acme import jose
|
||||
|
|
@ -26,6 +27,7 @@ from certbot import interfaces
|
|||
from certbot import log
|
||||
from certbot import renewal
|
||||
from certbot import reporter
|
||||
from certbot import storage
|
||||
from certbot import util
|
||||
|
||||
from certbot.display import util as display_util, ops as display_ops
|
||||
|
|
@ -385,6 +387,92 @@ def _determine_account(config):
|
|||
return acc, acme
|
||||
|
||||
|
||||
def _delete_if_appropriate(config): # pylint: disable=too-many-locals,too-many-branches
|
||||
"""Does the user want to delete their now-revoked certs? If run in non-interactive mode,
|
||||
deleting happens automatically, unless if both `--cert-name` and `--cert-path` were
|
||||
specified with conflicting values.
|
||||
|
||||
:param `configuration.NamespaceConfig` config: parsed command line arguments
|
||||
|
||||
:raises `error.Errors`: If anything goes wrong, including bad user input, if an overlapping
|
||||
archive dir is found for the specified lineage, etc ...
|
||||
"""
|
||||
display = zope.component.getUtility(interfaces.IDisplay)
|
||||
reporter_util = zope.component.getUtility(interfaces.IReporter)
|
||||
|
||||
msg = ("Would you like to delete the cert(s) you just revoked?")
|
||||
attempt_deletion = display.yesno(msg, yes_label="Yes (recommended)", no_label="No",
|
||||
force_interactive=True, default=True)
|
||||
|
||||
if not attempt_deletion:
|
||||
reporter_util.add_message("Not deleting revoked certs.", reporter_util.LOW_PRIORITY)
|
||||
return
|
||||
|
||||
if not (config.certname or config.cert_path):
|
||||
raise errors.Error('At least one of --cert-path or --cert-name must be specified.')
|
||||
|
||||
if config.certname and config.cert_path:
|
||||
# first, check if certname and cert_path imply the same certs
|
||||
implied_cert_name = cert_manager.cert_path_to_lineage(config)
|
||||
|
||||
if implied_cert_name != config.certname:
|
||||
cert_path_implied_cert_name = cert_manager.cert_path_to_lineage(config)
|
||||
cert_path_implied_conf = storage.renewal_file_for_certname(config,
|
||||
cert_path_implied_cert_name)
|
||||
cert_path_cert = storage.RenewableCert(cert_path_implied_conf, config)
|
||||
cert_path_info = cert_manager.human_readable_cert_info(config, cert_path_cert,
|
||||
skip_filter_checks=True)
|
||||
|
||||
cert_name_implied_conf = storage.renewal_file_for_certname(config, config.certname)
|
||||
cert_name_cert = storage.RenewableCert(cert_name_implied_conf, config)
|
||||
cert_name_info = cert_manager.human_readable_cert_info(config, cert_name_cert)
|
||||
|
||||
msg = ("You specified conflicting values for --cert-path and --cert-name. "
|
||||
"Which did you mean to select?")
|
||||
choices = [cert_path_info, cert_name_info]
|
||||
try:
|
||||
code, index = display.menu(msg,
|
||||
choices, ok_label="Select", force_interactive=True)
|
||||
except errors.MissingCommandlineFlag:
|
||||
error_msg = ('To run in non-interactive mode, you must either specify only one of '
|
||||
'--cert-path or --cert-name, or both must point to the same certificate lineages.')
|
||||
raise errors.Error(error_msg)
|
||||
|
||||
if code != display_util.OK or not index in range(0, len(choices)):
|
||||
raise errors.Error("User ended interaction.")
|
||||
|
||||
if index == 0:
|
||||
config.certname = cert_path_implied_cert_name
|
||||
else:
|
||||
config.cert_path = storage.cert_path_for_cert_name(config, config.certname)
|
||||
|
||||
elif config.cert_path:
|
||||
config.certname = cert_manager.cert_path_to_lineage(config)
|
||||
|
||||
else: # if only config.certname was specified
|
||||
config.cert_path = storage.cert_path_for_cert_name(config, config.certname)
|
||||
|
||||
# don't delete if the archive_dir is used by some other lineage
|
||||
archive_dir = storage.full_archive_path(
|
||||
configobj.ConfigObj(storage.renewal_file_for_certname(config, config.certname)),
|
||||
config, config.certname)
|
||||
try:
|
||||
cert_manager.match_and_check_overlaps(config, [lambda x: archive_dir],
|
||||
lambda x: x.archive_dir, lambda x: x)
|
||||
except errors.OverlappingMatchFound:
|
||||
msg = ('Not deleting revoked certs due to overlapping archive dirs. More than '
|
||||
'one lineage is using {0}'.format(archive_dir))
|
||||
reporter_util.add_message(''.join(msg), reporter_util.MEDIUM_PRIORITY)
|
||||
return
|
||||
except Exception as e:
|
||||
msg = ('config.default_archive_dir: {0}, config.live_dir: {1}, archive_dir: {2},'
|
||||
'original exception: {3}')
|
||||
msg = msg.format(config.default_archive_dir, config.live_dir, archive_dir, e)
|
||||
raise errors.Error(msg)
|
||||
|
||||
cert_manager.delete(config)
|
||||
|
||||
|
||||
def _init_le_client(config, authenticator, installer):
|
||||
if authenticator is not None:
|
||||
# if authenticator was given, then we will need account...
|
||||
|
|
@ -582,6 +670,7 @@ def revoke(config, unused_plugins): # TODO: coop with renewal config
|
|||
|
||||
try:
|
||||
acme.revoke(jose.ComparableX509(cert), config.reason)
|
||||
_delete_if_appropriate(config)
|
||||
except acme_errors.ClientError as e:
|
||||
return str(e)
|
||||
|
||||
|
|
|
|||
|
|
@ -49,6 +49,21 @@ def renewal_file_for_certname(config, certname):
|
|||
"{1}).".format(certname, path))
|
||||
return path
|
||||
|
||||
|
||||
def cert_path_for_cert_name(config, cert_name):
|
||||
""" If `--cert-name` was specified, but you need a value for `--cert-path`.
|
||||
|
||||
:param `configuration.NamespaceConfig` config: parsed command line arguments
|
||||
:param str cert_name: cert name.
|
||||
|
||||
"""
|
||||
cert_name_implied_conf = renewal_file_for_certname(config, cert_name)
|
||||
fullchain_path = configobj.ConfigObj(cert_name_implied_conf)["fullchain"]
|
||||
with open(fullchain_path) as f:
|
||||
cert_path = (fullchain_path, f.read())
|
||||
return cert_path
|
||||
|
||||
|
||||
def config_with_defaults(config=None):
|
||||
"""Merge supplied config, if provided, on top of builtin defaults."""
|
||||
defaults_copy = configobj.ConfigObj(constants.RENEWER_DEFAULTS)
|
||||
|
|
@ -246,7 +261,7 @@ def _relpath_from_file(archive_dir, from_file):
|
|||
"""Path to a directory from a file"""
|
||||
return os.path.relpath(archive_dir, os.path.dirname(from_file))
|
||||
|
||||
def _full_archive_path(config_obj, cli_config, lineagename):
|
||||
def full_archive_path(config_obj, cli_config, lineagename):
|
||||
"""Returns the full archive path for a lineagename
|
||||
|
||||
Uses cli_config to determine archive path if not available from config_obj.
|
||||
|
|
@ -271,7 +286,7 @@ def delete_files(config, certname):
|
|||
"""
|
||||
renewal_filename = renewal_file_for_certname(config, certname)
|
||||
# file exists
|
||||
full_default_archive_dir = _full_archive_path(None, config, certname)
|
||||
full_default_archive_dir = full_archive_path(None, config, certname)
|
||||
full_default_live_dir = _full_live_path(config, certname)
|
||||
try:
|
||||
renewal_config = configobj.ConfigObj(renewal_filename)
|
||||
|
|
@ -323,7 +338,7 @@ def delete_files(config, certname):
|
|||
|
||||
# archive directory
|
||||
try:
|
||||
archive_path = _full_archive_path(renewal_config, config, certname)
|
||||
archive_path = full_archive_path(renewal_config, config, certname)
|
||||
shutil.rmtree(archive_path)
|
||||
logger.debug("Removed %s", archive_path)
|
||||
except OSError:
|
||||
|
|
@ -450,7 +465,7 @@ class RenewableCert(object):
|
|||
@property
|
||||
def archive_dir(self):
|
||||
"""Returns the default or specified archive directory"""
|
||||
return _full_archive_path(self.configuration,
|
||||
return full_archive_path(self.configuration,
|
||||
self.cli_config, self.lineagename)
|
||||
|
||||
def relative_archive_dir(self, from_file):
|
||||
|
|
@ -992,7 +1007,7 @@ class RenewableCert(object):
|
|||
# lineagename will now potentially be modified based on which
|
||||
# renewal configuration file could actually be created
|
||||
lineagename = lineagename_for_filename(config_filename)
|
||||
archive = _full_archive_path(None, cli_config, lineagename)
|
||||
archive = full_archive_path(None, cli_config, lineagename)
|
||||
live_dir = _full_live_path(cli_config, lineagename)
|
||||
if os.path.exists(archive):
|
||||
raise errors.CertStorageError(
|
||||
|
|
|
|||
|
|
@ -478,5 +478,95 @@ class DuplicativeCertsTest(storage_test.BaseRenewableCertTest):
|
|||
self.assertEqual(result, (None, None))
|
||||
|
||||
|
||||
class CertPathToLineageTest(storage_test.BaseRenewableCertTest):
|
||||
"""Tests for certbot.cert_manager.cert_path_to_lineage"""
|
||||
|
||||
def setUp(self):
|
||||
super(CertPathToLineageTest, self).setUp()
|
||||
self.config_file.write()
|
||||
self._write_out_ex_kinds()
|
||||
self.fullchain = os.path.join(self.config.config_dir, 'live', 'example.org',
|
||||
'fullchain.pem')
|
||||
self.config.cert_path = (self.fullchain, '')
|
||||
|
||||
def _call(self, cli_config):
|
||||
from certbot.cert_manager import cert_path_to_lineage
|
||||
return cert_path_to_lineage(cli_config)
|
||||
|
||||
def _archive_files(self, cli_config, filetype):
|
||||
from certbot.cert_manager import _archive_files
|
||||
return _archive_files(cli_config, filetype)
|
||||
|
||||
def test_basic_match(self):
|
||||
self.assertEqual('example.org', self._call(self.config))
|
||||
|
||||
def test_no_match_exists(self):
|
||||
bad_test_config = self.config
|
||||
bad_test_config.cert_path = os.path.join(self.config.config_dir, 'live',
|
||||
'SailorMoon', 'fullchain.pem')
|
||||
self.assertRaises(errors.Error, self._call, bad_test_config)
|
||||
|
||||
@mock.patch('certbot.cert_manager._acceptable_matches')
|
||||
def test_options_fullchain(self, mock_acceptable_matches):
|
||||
mock_acceptable_matches.return_value = [lambda x: x.fullchain_path]
|
||||
self.config.fullchain_path = self.fullchain
|
||||
self.assertEqual('example.org', self._call(self.config))
|
||||
|
||||
@mock.patch('certbot.cert_manager._acceptable_matches')
|
||||
def test_options_cert_path(self, mock_acceptable_matches):
|
||||
mock_acceptable_matches.return_value = [lambda x: x.cert_path]
|
||||
test_cert_path = os.path.join(self.config.config_dir, 'live', 'example.org',
|
||||
'cert.pem')
|
||||
self.config.cert_path = (test_cert_path, '')
|
||||
self.assertEqual('example.org', self._call(self.config))
|
||||
|
||||
@mock.patch('certbot.cert_manager._acceptable_matches')
|
||||
def test_options_archive_cert(self, mock_acceptable_matches):
|
||||
# Also this and the next test check that the regex of _archive_files is working.
|
||||
self.config.cert_path = (os.path.join(self.config.config_dir, 'archive', 'example.org',
|
||||
'cert11.pem'), '')
|
||||
mock_acceptable_matches.return_value = [lambda x: self._archive_files(x, 'cert')]
|
||||
self.assertEqual('example.org', self._call(self.config))
|
||||
|
||||
@mock.patch('certbot.cert_manager._acceptable_matches')
|
||||
def test_options_archive_fullchain(self, mock_acceptable_matches):
|
||||
self.config.cert_path = (os.path.join(self.config.config_dir, 'archive',
|
||||
'example.org', 'fullchain11.pem'), '')
|
||||
mock_acceptable_matches.return_value = [lambda x:
|
||||
self._archive_files(x, 'fullchain')]
|
||||
self.assertEqual('example.org', self._call(self.config))
|
||||
|
||||
|
||||
class MatchAndCheckOverlaps(storage_test.BaseRenewableCertTest):
|
||||
"""Tests for certbot.cert_manager.match_and_check_overlaps w/o overlapping archive dirs."""
|
||||
# A test with real overlapping archive dirs can be found in tests/boulder_integration.sh
|
||||
def setUp(self):
|
||||
super(MatchAndCheckOverlaps, self).setUp()
|
||||
self.config_file.write()
|
||||
self._write_out_ex_kinds()
|
||||
self.fullchain = os.path.join(self.config.config_dir, 'live', 'example.org',
|
||||
'fullchain.pem')
|
||||
self.config.cert_path = (self.fullchain, '')
|
||||
|
||||
def _call(self, cli_config, acceptable_matches, match_func, rv_func):
|
||||
from certbot.cert_manager import match_and_check_overlaps
|
||||
return match_and_check_overlaps(cli_config, acceptable_matches, match_func, rv_func)
|
||||
|
||||
def test_basic_match(self):
|
||||
from certbot.cert_manager import _acceptable_matches
|
||||
self.assertEqual(['example.org'], self._call(self.config, _acceptable_matches(),
|
||||
lambda x: self.config.cert_path[0], lambda x: x.lineagename))
|
||||
|
||||
@mock.patch('certbot.cert_manager._search_lineages')
|
||||
def test_no_matches(self, mock_search_lineages):
|
||||
mock_search_lineages.return_value = []
|
||||
self.assertRaises(errors.Error, self._call, self.config, None, None, None)
|
||||
|
||||
@mock.patch('certbot.cert_manager._search_lineages')
|
||||
def test_too_many_matches(self, mock_search_lineages):
|
||||
mock_search_lineages.return_value = ['spider', 'dance']
|
||||
self.assertRaises(errors.OverlappingMatchFound, self._call, self.config, None, None, None)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main() # pragma: no cover
|
||||
|
|
|
|||
|
|
@ -263,8 +263,11 @@ class RevokeTest(test_util.TempDirTestCase):
|
|||
from certbot.main import revoke
|
||||
revoke(config, plugins)
|
||||
|
||||
@mock.patch('certbot.main._delete_if_appropriate')
|
||||
@mock.patch('certbot.main.client.acme_client')
|
||||
def test_revoke_with_reason(self, mock_acme_client):
|
||||
def test_revoke_with_reason(self, mock_acme_client,
|
||||
mock_delete_if_appropriate):
|
||||
mock_delete_if_appropriate.return_value = False
|
||||
mock_revoke = mock_acme_client.Client().revoke
|
||||
expected = []
|
||||
for reason, code in constants.REVOCATION_REASONS.items():
|
||||
|
|
@ -274,8 +277,10 @@ class RevokeTest(test_util.TempDirTestCase):
|
|||
expected.append(mock.call(mock.ANY, code))
|
||||
self.assertEqual(expected, mock_revoke.call_args_list)
|
||||
|
||||
def test_revocation_success(self):
|
||||
@mock.patch('certbot.main._delete_if_appropriate')
|
||||
def test_revocation_success(self, mock_delete_if_appropriate):
|
||||
self._call()
|
||||
mock_delete_if_appropriate.return_value = False
|
||||
self.mock_success_revoke.assert_called_once_with(self.tmp_cert_path)
|
||||
|
||||
def test_revocation_error(self):
|
||||
|
|
@ -284,6 +289,198 @@ class RevokeTest(test_util.TempDirTestCase):
|
|||
self.assertRaises(acme_errors.ClientError, self._call)
|
||||
self.mock_success_revoke.assert_not_called()
|
||||
|
||||
@mock.patch('certbot.main._delete_if_appropriate')
|
||||
@mock.patch('certbot.cert_manager.delete')
|
||||
@test_util.patch_get_utility()
|
||||
def test_revocation_with_prompt(self, mock_get_utility,
|
||||
mock_delete, mock_delete_if_appropriate):
|
||||
mock_get_utility().yesno.return_value = False
|
||||
mock_delete_if_appropriate.return_value = False
|
||||
self._call()
|
||||
self.assertFalse(mock_delete.called)
|
||||
|
||||
class DeleteIfAppropriateTest(unittest.TestCase):
|
||||
"""Tests for certbot.main._delete_if_appropriate """
|
||||
|
||||
def setUp(self):
|
||||
self.config = mock.Mock()
|
||||
self.config.namespace = mock.Mock()
|
||||
self.config.namespace.noninteractive_mode = False
|
||||
|
||||
def _call(self, mock_config):
|
||||
from certbot.main import _delete_if_appropriate
|
||||
_delete_if_appropriate(mock_config)
|
||||
|
||||
@mock.patch('certbot.cert_manager.delete')
|
||||
@test_util.patch_get_utility()
|
||||
def test_delete_opt_out(self, mock_get_utility, mock_delete):
|
||||
util_mock = mock_get_utility()
|
||||
util_mock.yesno.return_value = False
|
||||
self._call(self.config)
|
||||
mock_delete.assert_not_called()
|
||||
|
||||
# pylint: disable=too-many-arguments
|
||||
@mock.patch('certbot.storage.renewal_file_for_certname')
|
||||
@mock.patch('certbot.cert_manager.delete')
|
||||
@mock.patch('certbot.cert_manager.match_and_check_overlaps')
|
||||
@mock.patch('certbot.storage.full_archive_path')
|
||||
@mock.patch('certbot.cert_manager.cert_path_to_lineage')
|
||||
@test_util.patch_get_utility()
|
||||
def test_overlapping_archive_dirs(self, mock_get_utility,
|
||||
mock_cert_path_to_lineage, mock_archive,
|
||||
mock_match_and_check_overlaps, mock_delete,
|
||||
mock_renewal_file_for_certname):
|
||||
# pylint: disable = unused-argument
|
||||
config = self.config
|
||||
config.cert_path = "/some/reasonable/path"
|
||||
config.certname = ""
|
||||
mock_cert_path_to_lineage.return_value = "example.com"
|
||||
mock_match_and_check_overlaps.side_effect = errors.OverlappingMatchFound()
|
||||
self._call(config)
|
||||
mock_delete.assert_not_called()
|
||||
|
||||
# pylint: disable=too-many-arguments
|
||||
@mock.patch('certbot.storage.renewal_file_for_certname')
|
||||
@mock.patch('certbot.cert_manager.match_and_check_overlaps')
|
||||
@mock.patch('certbot.storage.full_archive_path')
|
||||
@mock.patch('certbot.cert_manager.delete')
|
||||
@mock.patch('certbot.storage.cert_path_for_cert_name')
|
||||
@test_util.patch_get_utility()
|
||||
def test_cert_name_only(self, mock_get_utility,
|
||||
mock_cert_path_for_cert_name, mock_delete, mock_archive,
|
||||
mock_overlapping_archive_dirs, mock_renewal_file_for_certname):
|
||||
# pylint: disable = unused-argument
|
||||
config = self.config
|
||||
config.certname = "example.com"
|
||||
config.cert_path = ""
|
||||
mock_cert_path_for_cert_name.return_value = "/some/reasonable/path"
|
||||
mock_overlapping_archive_dirs.return_value = False
|
||||
self._call(config)
|
||||
mock_delete.assert_called_once()
|
||||
|
||||
# pylint: disable=too-many-arguments
|
||||
@mock.patch('certbot.storage.renewal_file_for_certname')
|
||||
@mock.patch('certbot.cert_manager.match_and_check_overlaps')
|
||||
@mock.patch('certbot.storage.full_archive_path')
|
||||
@mock.patch('certbot.cert_manager.delete')
|
||||
@mock.patch('certbot.cert_manager.cert_path_to_lineage')
|
||||
@test_util.patch_get_utility()
|
||||
def test_cert_path_only(self, mock_get_utility,
|
||||
mock_cert_path_to_lineage, mock_delete, mock_archive,
|
||||
mock_overlapping_archive_dirs, mock_renewal_file_for_certname):
|
||||
# pylint: disable = unused-argument
|
||||
config = self.config
|
||||
config.cert_path = "/some/reasonable/path"
|
||||
config.certname = ""
|
||||
mock_cert_path_to_lineage.return_value = "example.com"
|
||||
mock_overlapping_archive_dirs.return_value = False
|
||||
self._call(config)
|
||||
mock_delete.assert_called_once()
|
||||
|
||||
# pylint: disable=too-many-arguments
|
||||
@mock.patch('certbot.storage.renewal_file_for_certname')
|
||||
@mock.patch('certbot.cert_manager.match_and_check_overlaps')
|
||||
@mock.patch('certbot.storage.full_archive_path')
|
||||
@mock.patch('certbot.cert_manager.cert_path_to_lineage')
|
||||
@mock.patch('certbot.cert_manager.delete')
|
||||
@test_util.patch_get_utility()
|
||||
def test_noninteractive_deletion(self, mock_get_utility, mock_delete,
|
||||
mock_cert_path_to_lineage, mock_full_archive_dir,
|
||||
mock_match_and_check_overlaps, mock_renewal_file_for_certname):
|
||||
# pylint: disable = unused-argument
|
||||
config = self.config
|
||||
config.namespace.noninteractive_mode = True
|
||||
config.cert_path = "/some/reasonable/path"
|
||||
config.certname = ""
|
||||
mock_cert_path_to_lineage.return_value = "example.com"
|
||||
mock_full_archive_dir.return_value = ""
|
||||
mock_match_and_check_overlaps.return_value = ""
|
||||
self._call(config)
|
||||
mock_delete.assert_called_once()
|
||||
|
||||
# pylint: disable=too-many-arguments
|
||||
@mock.patch('certbot.storage.renewal_file_for_certname')
|
||||
@mock.patch('certbot.cert_manager.match_and_check_overlaps')
|
||||
@mock.patch('certbot.storage.full_archive_path')
|
||||
@mock.patch('certbot.cert_manager.delete')
|
||||
@mock.patch('certbot.cert_manager.cert_path_to_lineage')
|
||||
@test_util.patch_get_utility()
|
||||
def test_certname_and_cert_path_match(self, mock_get_utility,
|
||||
mock_cert_path_to_lineage, mock_delete, mock_archive,
|
||||
mock_overlapping_archive_dirs, mock_renewal_file_for_certname):
|
||||
# pylint: disable = unused-argument
|
||||
config = self.config
|
||||
config.certname = "example.com"
|
||||
config.cert_path = "/some/reasonable/path"
|
||||
mock_cert_path_to_lineage.return_value = config.certname
|
||||
mock_overlapping_archive_dirs.return_value = False
|
||||
self._call(config)
|
||||
mock_delete.assert_called_once()
|
||||
|
||||
# pylint: disable=too-many-arguments
|
||||
@mock.patch('certbot.cert_manager.match_and_check_overlaps')
|
||||
@mock.patch('certbot.storage.full_archive_path')
|
||||
@mock.patch('certbot.cert_manager.delete')
|
||||
@mock.patch('certbot.cert_manager.human_readable_cert_info')
|
||||
@mock.patch('certbot.storage.RenewableCert')
|
||||
@mock.patch('certbot.storage.renewal_file_for_certname')
|
||||
@mock.patch('certbot.cert_manager.cert_path_to_lineage')
|
||||
@test_util.patch_get_utility()
|
||||
def test_certname_and_cert_path_mismatch(self, mock_get_utility,
|
||||
mock_cert_path_to_lineage, mock_renewal_file_for_certname,
|
||||
mock_RenewableCert, mock_human_readable_cert_info,
|
||||
mock_delete, mock_archive, mock_overlapping_archive_dirs):
|
||||
# pylint: disable=unused-argument
|
||||
config = self.config
|
||||
config.certname = "example.com"
|
||||
config.cert_path = "/some/reasonable/path"
|
||||
mock_cert_path_to_lineage = "something else"
|
||||
mock_RenewableCert.return_value = mock.Mock()
|
||||
mock_human_readable_cert_info.return_value = ""
|
||||
mock_overlapping_archive_dirs.return_value = False
|
||||
from certbot.display import util as display_util
|
||||
util_mock = mock_get_utility()
|
||||
util_mock.menu.return_value = (display_util.OK, 0)
|
||||
self._call(config)
|
||||
mock_delete.assert_called_once()
|
||||
|
||||
# pylint: disable=too-many-arguments
|
||||
@mock.patch('certbot.cert_manager.match_and_check_overlaps')
|
||||
@mock.patch('certbot.storage.full_archive_path')
|
||||
@mock.patch('certbot.cert_manager.delete')
|
||||
@mock.patch('certbot.cert_manager.human_readable_cert_info')
|
||||
@mock.patch('certbot.storage.RenewableCert')
|
||||
@mock.patch('certbot.storage.renewal_file_for_certname')
|
||||
@mock.patch('certbot.cert_manager.cert_path_to_lineage')
|
||||
@test_util.patch_get_utility()
|
||||
def test_noninteractive_certname_cert_path_mismatch(self, mock_get_utility,
|
||||
mock_cert_path_to_lineage, mock_renewal_file_for_certname,
|
||||
mock_RenewableCert, mock_human_readable_cert_info,
|
||||
mock_delete, mock_archive, mock_overlapping_archive_dirs):
|
||||
# pylint: disable=unused-argument
|
||||
config = self.config
|
||||
config.certname = "example.com"
|
||||
config.cert_path = "/some/reasonable/path"
|
||||
mock_cert_path_to_lineage.return_value = "some-reasonable-path.com"
|
||||
mock_RenewableCert.return_value = mock.Mock()
|
||||
mock_human_readable_cert_info.return_value = ""
|
||||
mock_overlapping_archive_dirs.return_value = False
|
||||
# Test for non-interactive mode
|
||||
util_mock = mock_get_utility()
|
||||
util_mock.menu.side_effect = errors.MissingCommandlineFlag("Oh no.")
|
||||
self.assertRaises(errors.Error, self._call, config)
|
||||
mock_delete.assert_not_called()
|
||||
|
||||
@mock.patch('certbot.cert_manager.delete')
|
||||
@test_util.patch_get_utility()
|
||||
def test_no_certname_or_cert_path(self, mock_get_utility, mock_delete):
|
||||
# pylint: disable=unused-argument
|
||||
config = self.config
|
||||
config.certname = None
|
||||
config.cert_path = None
|
||||
self.assertRaises(errors.Error, self._call, config)
|
||||
mock_delete.assert_not_called()
|
||||
|
||||
|
||||
class DetermineAccountTest(test_util.ConfigTestCase):
|
||||
"""Tests for certbot.main._determine_account."""
|
||||
|
|
@ -1032,8 +1229,11 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
|
|||
self.assertTrue(
|
||||
'dry run' in mock_get_utility().add_message.call_args[0][0])
|
||||
|
||||
@mock.patch('certbot.main._delete_if_appropriate')
|
||||
@mock.patch('certbot.main.client.acme_client')
|
||||
def test_revoke_with_key(self, mock_acme_client):
|
||||
def test_revoke_with_key(self, mock_acme_client,
|
||||
mock_delete_if_appropriate):
|
||||
mock_delete_if_appropriate.return_value = False
|
||||
server = 'foo.bar'
|
||||
self._call_no_clientmock(['--cert-path', SS_CERT_PATH, '--key-path', RSA2048_KEY_PATH,
|
||||
'--server', server, 'revoke'])
|
||||
|
|
@ -1053,8 +1253,11 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
|
|||
['--cert-path', CERT, '--key-path', KEY,
|
||||
'--server', server, 'revoke'])
|
||||
|
||||
@mock.patch('certbot.main._delete_if_appropriate')
|
||||
@mock.patch('certbot.main._determine_account')
|
||||
def test_revoke_without_key(self, mock_determine_account):
|
||||
def test_revoke_without_key(self, mock_determine_account,
|
||||
mock_delete_if_appropriate):
|
||||
mock_delete_if_appropriate.return_value = False
|
||||
mock_determine_account.return_value = (mock.MagicMock(), None)
|
||||
_, _, _, client = self._call(['--cert-path', CERT, 'revoke'])
|
||||
with open(CERT) as f:
|
||||
|
|
|
|||
|
|
@ -884,6 +884,25 @@ class DeleteFilesTest(BaseRenewableCertTest):
|
|||
self.config.live_dir, "example.org")))
|
||||
self.assertFalse(os.path.exists(archive_dir))
|
||||
|
||||
class CertPathForCertNameTest(BaseRenewableCertTest):
|
||||
"""Test for certbot.storage.cert_path_for_cert_name"""
|
||||
def setUp(self):
|
||||
super(CertPathForCertNameTest, self).setUp()
|
||||
self.config_file.write()
|
||||
self._write_out_ex_kinds()
|
||||
self.fullchain = os.path.join(self.config.config_dir, 'live', 'example.org',
|
||||
'fullchain.pem')
|
||||
self.config.cert_path = (self.fullchain, '')
|
||||
|
||||
def _call(self, cli_config, certname):
|
||||
from certbot.storage import cert_path_for_cert_name
|
||||
return cert_path_for_cert_name(cli_config, certname)
|
||||
|
||||
def test_simple_cert_name(self):
|
||||
self.assertEqual(self._call(self.config, 'example.org'), (self.fullchain, 'fullchain'))
|
||||
|
||||
def test_no_such_cert_name(self):
|
||||
self.assertRaises(errors.CertStorageError, self._call, self.config, 'fake-example.org')
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main() # pragma: no cover
|
||||
|
|
|
|||
|
|
@ -372,7 +372,7 @@ common revoke --cert-path "$root/conf/live/le2.wtf/cert.pem" \
|
|||
common unregister
|
||||
|
||||
out=$(common certificates)
|
||||
subdomains="le le2 dns.le newname.le must-staple.le"
|
||||
subdomains="le dns.le newname.le must-staple.le"
|
||||
for subdomain in $subdomains; do
|
||||
domain="$subdomain.wtf"
|
||||
if ! echo $out | grep "$domain"; then
|
||||
|
|
@ -381,6 +381,46 @@ for subdomain in $subdomains; do
|
|||
fi
|
||||
done
|
||||
|
||||
# Testing that revocation also deletes by default
|
||||
subdomains="le1 le2"
|
||||
for subdomain in $subdomains; do
|
||||
domain="$subdomain.wtf"
|
||||
if echo $out | grep "$domain"; then
|
||||
echo "Revoked $domain in certificates output! Should not be!"
|
||||
exit 1;
|
||||
fi
|
||||
done
|
||||
|
||||
# Test that revocation raises correct error if --cert-name and --cert-path don't match
|
||||
common --domains le1.wtf
|
||||
common --domains le2.wtf
|
||||
out=$(common revoke --cert-path "$root/conf/live/le1.wtf/fullchain.pem" --cert-name "le2.wtf" 2>&1) || true
|
||||
if ! echo $out | grep "or both must point to the same certificate lineages."; then
|
||||
echo "Non-interactive revoking with mismatched --cert-name and --cert-path "
|
||||
echo "did not raise the correct error!"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Revoking by matching --cert-name and --cert-path deletes
|
||||
common --domains le1.wtf
|
||||
common revoke --cert-path "$root/conf/live/le1.wtf/fullchain.pem" --cert-name "le1.wtf"
|
||||
out=$(common certificates)
|
||||
if echo $out | grep "le1.wtf"; then
|
||||
echo "Cert le1.wtf should've been deleted! Was revoked via matching --cert-path & --cert-name"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Test that revocation doesn't delete if multiple lineages share an archive dir
|
||||
common --domains le1.wtf
|
||||
common --domains le2.wtf
|
||||
sed -i "s|^archive_dir = .*$|archive_dir = $root/conf/archive/le1.wtf|" "$root/conf/renewal/le2.wtf.conf"
|
||||
#common update_symlinks # not needed, but a bit more context for what this test is about
|
||||
out=$(common revoke --cert-path "$root/conf/live/le1.wtf/cert.pem")
|
||||
if ! echo $out | grep "Not deleting revoked certs due to overlapping archive dirs"; then
|
||||
echo "Deleted a cert that had an overlapping archive dir with another lineage!"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
cert_name="must-staple.le.wtf"
|
||||
common delete --cert-name $cert_name
|
||||
archive="$root/conf/archive/$cert_name"
|
||||
|
|
|
|||
Loading…
Reference in a new issue