mirror of
https://github.com/certbot/certbot.git
synced 2026-06-03 13:59:02 -04:00
Fixes #6207.
As noted by Erica:
- we no longer need to check if it exists before linking to it, because we delete properly.
- the previously excisting check on if server is in `LE_REUSE_SERVERS` before unlinking is nice, but probably not necessary, especially since we don't officially support people doing weird things with symlinks in our directories, and because we rmdir which will fail if it's not empty anyway.
* Create single account symlink.
* refactor _delete_accounts_dir_for_server_path
* add symlinked account dir deletion
* add tests
(cherry picked from commit 9b0d2714c1)
This commit is contained in:
parent
95e271bfcd
commit
9974963887
2 changed files with 74 additions and 27 deletions
|
|
@ -1,5 +1,6 @@
|
|||
"""Creates ACME accounts for server."""
|
||||
import datetime
|
||||
import functools
|
||||
import hashlib
|
||||
import logging
|
||||
import os
|
||||
|
|
@ -191,6 +192,11 @@ class AccountFileStorage(interfaces.AccountStorage):
|
|||
def find_all(self):
|
||||
return self._find_all_for_server_path(self.config.server_path)
|
||||
|
||||
def _symlink_to_account_dir(self, prev_server_path, server_path, account_id):
|
||||
prev_account_dir = self._account_dir_path_for_server_path(account_id, prev_server_path)
|
||||
new_account_dir = self._account_dir_path_for_server_path(account_id, server_path)
|
||||
os.symlink(prev_account_dir, new_account_dir)
|
||||
|
||||
def _symlink_to_accounts_dir(self, prev_server_path, server_path):
|
||||
accounts_dir = self.config.accounts_dir_for_server_path(server_path)
|
||||
if os.path.islink(accounts_dir):
|
||||
|
|
@ -207,7 +213,12 @@ class AccountFileStorage(interfaces.AccountStorage):
|
|||
prev_server_path = constants.LE_REUSE_SERVERS[server_path]
|
||||
prev_loaded_account = self._load_for_server_path(account_id, prev_server_path)
|
||||
# we didn't error so we found something, so create a symlink to that
|
||||
self._symlink_to_accounts_dir(prev_server_path, server_path)
|
||||
accounts_dir = self.config.accounts_dir_for_server_path(server_path)
|
||||
# If accounts_dir isn't empty, make an account specific symlink
|
||||
if os.listdir(accounts_dir):
|
||||
self._symlink_to_account_dir(prev_server_path, server_path, account_id)
|
||||
else:
|
||||
self._symlink_to_accounts_dir(prev_server_path, server_path)
|
||||
return prev_loaded_account
|
||||
else:
|
||||
raise errors.AccountNotFound(
|
||||
|
|
@ -250,49 +261,65 @@ class AccountFileStorage(interfaces.AccountStorage):
|
|||
:param account_id: id of account which should be deleted
|
||||
|
||||
"""
|
||||
# Step 1: remove the account itself
|
||||
account_dir_path = self._account_dir_path(account_id)
|
||||
if not os.path.isdir(account_dir_path):
|
||||
raise errors.AccountNotFound(
|
||||
"Account at %s does not exist" % account_dir_path)
|
||||
shutil.rmtree(account_dir_path)
|
||||
# Step 1: Delete account specific links and the directory
|
||||
self._delete_account_dir_for_server_path(account_id, self.config.server_path)
|
||||
|
||||
# Step 2: remove the directory if it's empty, and linked directories
|
||||
# Step 2: Remove any accounts links and directories that are now empty
|
||||
if not os.listdir(self.config.accounts_dir):
|
||||
self._delete_accounts_dir_for_server_path(self.config.server_path)
|
||||
|
||||
def _delete_account_dir_for_server_path(self, account_id, server_path):
|
||||
link_func = functools.partial(self._account_dir_path_for_server_path, account_id)
|
||||
nonsymlinked_dir = self._delete_links_and_find_target_dir(server_path, link_func)
|
||||
shutil.rmtree(nonsymlinked_dir)
|
||||
|
||||
def _delete_accounts_dir_for_server_path(self, server_path):
|
||||
accounts_dir_path = self.config.accounts_dir_for_server_path(server_path)
|
||||
link_func = self.config.accounts_dir_for_server_path
|
||||
nonsymlinked_dir = self._delete_links_and_find_target_dir(server_path, link_func)
|
||||
os.rmdir(nonsymlinked_dir)
|
||||
|
||||
def _delete_links_and_find_target_dir(self, server_path, link_func):
|
||||
"""Delete symlinks and return the nonsymlinked directory path.
|
||||
|
||||
:param str server_path: file path based on server
|
||||
:param callable link_func: callable that returns possible links
|
||||
given a server_path
|
||||
|
||||
:returns: the final, non-symlinked target
|
||||
:rtype: str
|
||||
|
||||
"""
|
||||
dir_path = link_func(server_path)
|
||||
|
||||
# does an appropriate directory link to me? if so, make sure that's gone
|
||||
reused_servers = {}
|
||||
for k in constants.LE_REUSE_SERVERS:
|
||||
reused_servers[constants.LE_REUSE_SERVERS[k]] = k
|
||||
|
||||
# is there a next one up? call that and be done
|
||||
if server_path in reused_servers:
|
||||
next_server_path = reused_servers[server_path]
|
||||
next_accounts_dir_path = self.config.accounts_dir_for_server_path(next_server_path)
|
||||
if os.path.islink(next_accounts_dir_path) \
|
||||
and os.readlink(next_accounts_dir_path) == accounts_dir_path:
|
||||
self._delete_accounts_dir_for_server_path(next_server_path)
|
||||
return
|
||||
# is there a next one up?
|
||||
possible_next_link = True
|
||||
while possible_next_link:
|
||||
possible_next_link = False
|
||||
if server_path in reused_servers:
|
||||
next_server_path = reused_servers[server_path]
|
||||
next_dir_path = link_func(next_server_path)
|
||||
if os.path.islink(next_dir_path) and os.readlink(next_dir_path) == dir_path:
|
||||
possible_next_link = True
|
||||
server_path = next_server_path
|
||||
dir_path = next_dir_path
|
||||
|
||||
# if there's not a next one up to delete, then delete me
|
||||
# and whatever I link to if applicable
|
||||
if os.path.islink(accounts_dir_path):
|
||||
# save my info then delete me
|
||||
target = os.readlink(accounts_dir_path)
|
||||
os.unlink(accounts_dir_path)
|
||||
# then delete whatever I linked to, if appropriate
|
||||
if server_path in constants.LE_REUSE_SERVERS:
|
||||
prev_server_path = constants.LE_REUSE_SERVERS[server_path]
|
||||
prev_accounts_dir_path = self.config.accounts_dir_for_server_path(prev_server_path)
|
||||
if target == prev_accounts_dir_path:
|
||||
self._delete_accounts_dir_for_server_path(prev_server_path)
|
||||
else:
|
||||
# just delete me
|
||||
os.rmdir(accounts_dir_path)
|
||||
# and whatever I link to
|
||||
while os.path.islink(dir_path):
|
||||
target = os.readlink(dir_path)
|
||||
os.unlink(dir_path)
|
||||
dir_path = target
|
||||
|
||||
return dir_path
|
||||
|
||||
def _save(self, account, acme, regr_only):
|
||||
account_dir_path = self._account_dir_path(account.id)
|
||||
|
|
|
|||
|
|
@ -249,6 +249,14 @@ class AccountFileStorageTest(test_util.ConfigTestCase):
|
|||
account = self.storage.load(self.acc.id)
|
||||
self.assertEqual(prev_account, account)
|
||||
|
||||
def test_upgrade_load_single_account(self):
|
||||
self._set_server('https://acme-staging.api.letsencrypt.org/directory')
|
||||
self.storage.save(self.acc, self.mock_client)
|
||||
prev_account = self.storage.load(self.acc.id)
|
||||
self._set_server_and_stop_symlink('https://acme-staging-v02.api.letsencrypt.org/directory')
|
||||
account = self.storage.load(self.acc.id)
|
||||
self.assertEqual(prev_account, account)
|
||||
|
||||
def test_load_ioerror(self):
|
||||
self.storage.save(self.acc, self.mock_client)
|
||||
mock_open = mock.mock_open()
|
||||
|
|
@ -307,6 +315,18 @@ class AccountFileStorageTest(test_util.ConfigTestCase):
|
|||
self._test_delete_folders('https://acme-staging-v02.api.letsencrypt.org/directory')
|
||||
self._assert_symlinked_account_removed()
|
||||
|
||||
def _set_server_and_stop_symlink(self, server_path):
|
||||
self._set_server(server_path)
|
||||
with open(os.path.join(self.config.accounts_dir, 'foo'), 'w') as f:
|
||||
f.write('bar')
|
||||
|
||||
def test_delete_shared_account_up(self):
|
||||
self._set_server_and_stop_symlink('https://acme-staging-v02.api.letsencrypt.org/directory')
|
||||
self._test_delete_folders('https://acme-staging.api.letsencrypt.org/directory')
|
||||
|
||||
def test_delete_shared_account_down(self):
|
||||
self._set_server_and_stop_symlink('https://acme-staging-v02.api.letsencrypt.org/directory')
|
||||
self._test_delete_folders('https://acme-staging.api.letsencrypt.org/directory')
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main() # pragma: no cover
|
||||
|
|
|
|||
Loading…
Reference in a new issue