Merge pull request #2480 from filipochnik/remove-acme-challenge

Remove acme-challenge after cleaning up all challenges
This commit is contained in:
Peter Eckersley 2016-02-18 12:43:13 -08:00
commit 9b21efc6b8
2 changed files with 62 additions and 9 deletions

View file

@ -2,8 +2,10 @@
import errno
import logging
import os
from collections import defaultdict
import zope.interface
import six
from acme import challenges
@ -44,6 +46,7 @@ to serve all files under specified web root ({0})."""
def __init__(self, *args, **kwargs):
super(Authenticator, self).__init__(*args, **kwargs)
self.full_roots = {}
self.performed = defaultdict(set)
def prepare(self): # pylint: disable=missing-docstring
path_map = self.conf("map")
@ -97,7 +100,7 @@ to serve all files under specified web root ({0})."""
assert self.full_roots, "Webroot plugin appears to be missing webroot map"
return [self._perform_single(achall) for achall in achalls]
def _path_for_achall(self, achall):
def _get_root_path(self, achall):
try:
path = self.full_roots[achall.domain]
except KeyError:
@ -106,27 +109,48 @@ to serve all files under specified web root ({0})."""
if not os.path.exists(path):
raise errors.PluginError("Mysteriously missing path {0} for domain: {1}"
.format(path, achall.domain))
return os.path.join(path, achall.chall.encode("token"))
return path
def _get_validation_path(self, root_path, achall):
return os.path.join(root_path, achall.chall.encode("token"))
def _perform_single(self, achall):
response, validation = achall.response_and_validation()
path = self._path_for_achall(achall)
logger.debug("Attempting to save validation to %s", path)
root_path = self._get_root_path(achall)
validation_path = self._get_validation_path(root_path, achall)
logger.debug("Attempting to save validation to %s", validation_path)
# Change permissions to be world-readable, owner-writable (GH #1795)
old_umask = os.umask(0o022)
try:
with open(path, "w") as validation_file:
with open(validation_path, "w") as validation_file:
validation_file.write(validation.encode())
finally:
os.umask(old_umask)
self.performed[root_path].add(achall)
return response
def cleanup(self, achalls): # pylint: disable=missing-docstring
for achall in achalls:
path = self._path_for_achall(achall)
logger.debug("Removing %s", path)
os.remove(path)
root_path = self._get_root_path(achall)
validation_path = self._get_validation_path(root_path, achall)
logger.debug("Removing %s", validation_path)
os.remove(validation_path)
self.performed[root_path].remove(achall)
for root_path, achalls in six.iteritems(self.performed):
if not achalls:
try:
os.rmdir(root_path)
logger.debug("All challenges cleaned up, removing %s",
root_path)
except OSError as exc:
if exc.errno == errno.ENOTEMPTY:
logger.debug("Challenges cleaned up but %s not empty",
root_path)
else:
raise

View file

@ -30,8 +30,10 @@ class AuthenticatorTest(unittest.TestCase):
def setUp(self):
from letsencrypt.plugins.webroot import Authenticator
self.path = tempfile.mkdtemp()
self.root_challenge_path = os.path.join(
self.path, ".well-known", "acme-challenge")
self.validation_path = os.path.join(
self.path, ".well-known", "acme-challenge",
self.root_challenge_path,
"ZXZhR3hmQURzNnBTUmIyTEF2OUlaZjE3RHQzanV4R0orUEN0OTJ3citvQQ")
self.config = mock.MagicMock(webroot_path=self.path,
webroot_map={"thing.com": self.path})
@ -137,6 +139,33 @@ class AuthenticatorTest(unittest.TestCase):
self.auth.cleanup([self.achall])
self.assertFalse(os.path.exists(self.validation_path))
self.assertFalse(os.path.exists(self.root_challenge_path))
def test_cleanup_leftovers(self):
self.auth.prepare()
self.auth.perform([self.achall])
leftover_path = os.path.join(self.root_challenge_path, 'leftover')
os.mkdir(leftover_path)
self.auth.cleanup([self.achall])
self.assertFalse(os.path.exists(self.validation_path))
self.assertTrue(os.path.exists(self.root_challenge_path))
os.rmdir(leftover_path)
@mock.patch('os.rmdir')
def test_cleanup_oserror(self, mock_rmdir):
self.auth.prepare()
self.auth.perform([self.achall])
os_error = OSError()
os_error.errno = errno.EACCES
mock_rmdir.side_effect = os_error
self.assertRaises(OSError, self.auth.cleanup, [self.achall])
self.assertFalse(os.path.exists(self.validation_path))
self.assertTrue(os.path.exists(self.root_challenge_path))
if __name__ == "__main__":