diff --git a/certbot/account.py b/certbot/account.py index f2ed5cfd5..59ceb42e0 100644 --- a/certbot/account.py +++ b/certbot/account.py @@ -1,5 +1,6 @@ """Creates ACME accounts for server.""" import datetime +import functools import hashlib import logging import os @@ -191,6 +192,11 @@ class AccountFileStorage(interfaces.AccountStorage): def find_all(self): return self._find_all_for_server_path(self.config.server_path) + def _symlink_to_account_dir(self, prev_server_path, server_path, account_id): + prev_account_dir = self._account_dir_path_for_server_path(account_id, prev_server_path) + new_account_dir = self._account_dir_path_for_server_path(account_id, server_path) + os.symlink(prev_account_dir, new_account_dir) + def _symlink_to_accounts_dir(self, prev_server_path, server_path): accounts_dir = self.config.accounts_dir_for_server_path(server_path) if os.path.islink(accounts_dir): @@ -207,7 +213,12 @@ class AccountFileStorage(interfaces.AccountStorage): prev_server_path = constants.LE_REUSE_SERVERS[server_path] prev_loaded_account = self._load_for_server_path(account_id, prev_server_path) # we didn't error so we found something, so create a symlink to that - self._symlink_to_accounts_dir(prev_server_path, server_path) + accounts_dir = self.config.accounts_dir_for_server_path(server_path) + # If accounts_dir isn't empty, make an account specific symlink + if os.listdir(accounts_dir): + self._symlink_to_account_dir(prev_server_path, server_path, account_id) + else: + self._symlink_to_accounts_dir(prev_server_path, server_path) return prev_loaded_account else: raise errors.AccountNotFound( @@ -250,49 +261,65 @@ class AccountFileStorage(interfaces.AccountStorage): :param account_id: id of account which should be deleted """ - # Step 1: remove the account itself account_dir_path = self._account_dir_path(account_id) if not os.path.isdir(account_dir_path): raise errors.AccountNotFound( "Account at %s does not exist" % account_dir_path) - shutil.rmtree(account_dir_path) + # Step 1: Delete account specific links and the directory + self._delete_account_dir_for_server_path(account_id, self.config.server_path) - # Step 2: remove the directory if it's empty, and linked directories + # Step 2: Remove any accounts links and directories that are now empty if not os.listdir(self.config.accounts_dir): self._delete_accounts_dir_for_server_path(self.config.server_path) + def _delete_account_dir_for_server_path(self, account_id, server_path): + link_func = functools.partial(self._account_dir_path_for_server_path, account_id) + nonsymlinked_dir = self._delete_links_and_find_target_dir(server_path, link_func) + shutil.rmtree(nonsymlinked_dir) + def _delete_accounts_dir_for_server_path(self, server_path): - accounts_dir_path = self.config.accounts_dir_for_server_path(server_path) + link_func = self.config.accounts_dir_for_server_path + nonsymlinked_dir = self._delete_links_and_find_target_dir(server_path, link_func) + os.rmdir(nonsymlinked_dir) + + def _delete_links_and_find_target_dir(self, server_path, link_func): + """Delete symlinks and return the nonsymlinked directory path. + + :param str server_path: file path based on server + :param callable link_func: callable that returns possible links + given a server_path + + :returns: the final, non-symlinked target + :rtype: str + + """ + dir_path = link_func(server_path) # does an appropriate directory link to me? if so, make sure that's gone reused_servers = {} for k in constants.LE_REUSE_SERVERS: reused_servers[constants.LE_REUSE_SERVERS[k]] = k - # is there a next one up? call that and be done - if server_path in reused_servers: - next_server_path = reused_servers[server_path] - next_accounts_dir_path = self.config.accounts_dir_for_server_path(next_server_path) - if os.path.islink(next_accounts_dir_path) \ - and os.readlink(next_accounts_dir_path) == accounts_dir_path: - self._delete_accounts_dir_for_server_path(next_server_path) - return + # is there a next one up? + possible_next_link = True + while possible_next_link: + possible_next_link = False + if server_path in reused_servers: + next_server_path = reused_servers[server_path] + next_dir_path = link_func(next_server_path) + if os.path.islink(next_dir_path) and os.readlink(next_dir_path) == dir_path: + possible_next_link = True + server_path = next_server_path + dir_path = next_dir_path # if there's not a next one up to delete, then delete me - # and whatever I link to if applicable - if os.path.islink(accounts_dir_path): - # save my info then delete me - target = os.readlink(accounts_dir_path) - os.unlink(accounts_dir_path) - # then delete whatever I linked to, if appropriate - if server_path in constants.LE_REUSE_SERVERS: - prev_server_path = constants.LE_REUSE_SERVERS[server_path] - prev_accounts_dir_path = self.config.accounts_dir_for_server_path(prev_server_path) - if target == prev_accounts_dir_path: - self._delete_accounts_dir_for_server_path(prev_server_path) - else: - # just delete me - os.rmdir(accounts_dir_path) + # and whatever I link to + while os.path.islink(dir_path): + target = os.readlink(dir_path) + os.unlink(dir_path) + dir_path = target + + return dir_path def _save(self, account, acme, regr_only): account_dir_path = self._account_dir_path(account.id) diff --git a/certbot/tests/account_test.py b/certbot/tests/account_test.py index e0ec3d5f8..701478336 100644 --- a/certbot/tests/account_test.py +++ b/certbot/tests/account_test.py @@ -249,6 +249,14 @@ class AccountFileStorageTest(test_util.ConfigTestCase): account = self.storage.load(self.acc.id) self.assertEqual(prev_account, account) + def test_upgrade_load_single_account(self): + self._set_server('https://acme-staging.api.letsencrypt.org/directory') + self.storage.save(self.acc, self.mock_client) + prev_account = self.storage.load(self.acc.id) + self._set_server_and_stop_symlink('https://acme-staging-v02.api.letsencrypt.org/directory') + account = self.storage.load(self.acc.id) + self.assertEqual(prev_account, account) + def test_load_ioerror(self): self.storage.save(self.acc, self.mock_client) mock_open = mock.mock_open() @@ -307,6 +315,18 @@ class AccountFileStorageTest(test_util.ConfigTestCase): self._test_delete_folders('https://acme-staging-v02.api.letsencrypt.org/directory') self._assert_symlinked_account_removed() + def _set_server_and_stop_symlink(self, server_path): + self._set_server(server_path) + with open(os.path.join(self.config.accounts_dir, 'foo'), 'w') as f: + f.write('bar') + + def test_delete_shared_account_up(self): + self._set_server_and_stop_symlink('https://acme-staging-v02.api.letsencrypt.org/directory') + self._test_delete_folders('https://acme-staging.api.letsencrypt.org/directory') + + def test_delete_shared_account_down(self): + self._set_server_and_stop_symlink('https://acme-staging-v02.api.letsencrypt.org/directory') + self._test_delete_folders('https://acme-staging.api.letsencrypt.org/directory') if __name__ == "__main__": unittest.main() # pragma: no cover