Re-land proper webroot directory cleanup (#5577)

* fix(webroot): clean up directories properly

* fix(webroot): undo umask in finally

* Fix for MacOS
This commit is contained in:
sydneyli 2018-02-15 15:55:08 -08:00 committed by Brad Warren
parent 09b5927e6a
commit d5efefd979
4 changed files with 90 additions and 30 deletions

View file

@ -6,6 +6,24 @@ from certbot import util
logger = logging.getLogger(__name__)
def get_prefixes(path):
"""Retrieves all possible path prefixes of a path, in descending order
of length. For instance,
/a/b/c/ => ['/a/b/c/', '/a/b/c', '/a/b', '/a', '/']
:param str path: the path to break into prefixes
:returns: all possible path prefixes of given path in descending order
:rtype: `list` of `str`
"""
prefix = path
prefixes = []
while len(prefix) > 0:
prefixes.append(prefix)
prefix, _ = os.path.split(prefix)
# break once we hit '/'
if prefix == prefixes[-1]:
break
return prefixes
def path_surgery(cmd):
"""Attempt to perform PATH surgery to find cmd

View file

@ -5,6 +5,14 @@ import unittest
import mock
class GetPrefixTest(unittest.TestCase):
"""Tests for certbot.plugins.get_prefixes."""
def test_get_prefix(self):
from certbot.plugins.util import get_prefixes
self.assertEqual(get_prefixes("/a/b/c/"), ['/a/b/c/', '/a/b/c', '/a/b', '/a', '/'])
self.assertEqual(get_prefixes("/"), ["/"])
self.assertEqual(get_prefixes("a"), ["a"])
class PathSurgeryTest(unittest.TestCase):
"""Tests for certbot.plugins.path_surgery."""

View file

@ -18,6 +18,7 @@ from certbot import interfaces
from certbot.display import util as display_util
from certbot.display import ops
from certbot.plugins import common
from certbot.plugins import util
logger = logging.getLogger(__name__)
@ -65,6 +66,8 @@ to serve all files under specified web root ({0})."""
super(Authenticator, self).__init__(*args, **kwargs)
self.full_roots = {}
self.performed = collections.defaultdict(set)
# stack of dirs successfully created by this authenticator
self._created_dirs = []
def prepare(self): # pylint: disable=missing-docstring
pass
@ -161,27 +164,26 @@ to serve all files under specified web root ({0})."""
# Umask is used instead of chmod to ensure the client can also
# run as non-root (GH #1795)
old_umask = os.umask(0o022)
try:
# This is coupled with the "umask" call above because
# os.makedirs's "mode" parameter may not always work:
# https://stackoverflow.com/questions/5231901/permission-problems-when-creating-a-dir-with-os-makedirs-python
os.makedirs(self.full_roots[name], 0o0755)
# Set owner as parent directory if possible
try:
stat_path = os.stat(path)
os.chown(self.full_roots[name], stat_path.st_uid,
stat_path.st_gid)
except OSError as exception:
logger.info("Unable to change owner and uid of webroot directory")
logger.debug("Error was: %s", exception)
except OSError as exception:
if exception.errno != errno.EEXIST:
raise errors.PluginError(
"Couldn't create root for {0} http-01 "
"challenge responses: {1}", name, exception)
stat_path = os.stat(path)
for prefix in sorted(util.get_prefixes(self.full_roots[name]), key=len):
try:
# This is coupled with the "umask" call above because
# os.mkdir's "mode" parameter may not always work:
# https://docs.python.org/3/library/os.html#os.mkdir
os.mkdir(prefix, 0o0755)
self._created_dirs.append(prefix)
# Set owner as parent directory if possible
try:
os.chown(prefix, stat_path.st_uid, stat_path.st_gid)
except OSError as exception:
logger.info("Unable to change owner and uid of webroot directory")
logger.debug("Error was: %s", exception)
except OSError as exception:
if exception.errno not in (errno.EEXIST, errno.EISDIR):
raise errors.PluginError(
"Couldn't create root for {0} http-01 "
"challenge responses: {1}".format(name, exception))
finally:
os.umask(old_umask)
@ -217,16 +219,17 @@ to serve all files under specified web root ({0})."""
os.remove(validation_path)
self.performed[root_path].remove(achall)
for root_path, achalls in six.iteritems(self.performed):
if not achalls:
try:
os.rmdir(root_path)
logger.debug("All challenges cleaned up, removing %s",
root_path)
except OSError as exc:
logger.info(
"Unable to clean up challenge directory %s", root_path)
logger.debug("Error was: %s", exc)
not_removed = []
while len(self._created_dirs) > 0:
path = self._created_dirs.pop()
try:
os.rmdir(path)
except OSError as exc:
not_removed.insert(0, path)
logger.info("Challenge directory %s was not empty, didn't remove", path)
logger.debug("Error was: %s", exc)
self._created_dirs = not_removed
logger.debug("All challenges cleaned up")
class _WebrootMapAction(argparse.Action):

View file

@ -36,6 +36,8 @@ class AuthenticatorTest(unittest.TestCase):
def setUp(self):
from certbot.plugins.webroot import Authenticator
self.path = tempfile.mkdtemp()
self.partial_root_challenge_path = os.path.join(
self.path, ".well-known")
self.root_challenge_path = os.path.join(
self.path, ".well-known", "acme-challenge")
self.validation_path = os.path.join(
@ -199,6 +201,35 @@ class AuthenticatorTest(unittest.TestCase):
self.auth.cleanup([self.achall])
self.assertFalse(os.path.exists(self.validation_path))
self.assertFalse(os.path.exists(self.root_challenge_path))
self.assertFalse(os.path.exists(self.partial_root_challenge_path))
def test_perform_cleanup_existing_dirs(self):
os.mkdir(self.partial_root_challenge_path)
self.auth.prepare()
self.auth.perform([self.achall])
self.auth.cleanup([self.achall])
# Ensure we don't "clean up" directories that previously existed
self.assertFalse(os.path.exists(self.validation_path))
self.assertFalse(os.path.exists(self.root_challenge_path))
def test_perform_cleanup_multiple_challenges(self):
bingo_achall = achallenges.KeyAuthorizationAnnotatedChallenge(
challb=acme_util.chall_to_challb(
challenges.HTTP01(token=b"bingo"), "pending"),
domain="thing.com", account_key=KEY)
bingo_validation_path = "YmluZ28"
os.mkdir(self.partial_root_challenge_path)
self.auth.prepare()
self.auth.perform([bingo_achall, self.achall])
self.auth.cleanup([self.achall])
self.assertFalse(os.path.exists(bingo_validation_path))
self.assertTrue(os.path.exists(self.root_challenge_path))
self.auth.cleanup([bingo_achall])
self.assertFalse(os.path.exists(self.validation_path))
self.assertFalse(os.path.exists(self.root_challenge_path))
def test_cleanup_leftovers(self):
self.auth.prepare()