From 9c0133e1feb1204fe9b7252170da3e4f086325ed Mon Sep 17 00:00:00 2001 From: Adrien Ferrand Date: Fri, 20 Nov 2020 13:46:36 +0100 Subject: [PATCH] Forbid os.readlink() --- certbot/certbot/_internal/account.py | 5 +++-- certbot/certbot/_internal/storage.py | 9 +++++---- certbot/certbot/compat/filesystem.py | 4 +++- certbot/certbot/compat/os.py | 11 +++++++++++ certbot/tests/cert_manager_test.py | 2 +- certbot/tests/storage_test.py | 4 ++-- 6 files changed, 25 insertions(+), 10 deletions(-) diff --git a/certbot/certbot/_internal/account.py b/certbot/certbot/_internal/account.py index 8cfe5ea11..013c4a609 100644 --- a/certbot/certbot/_internal/account.py +++ b/certbot/certbot/_internal/account.py @@ -20,6 +20,7 @@ from certbot import interfaces from certbot import util from certbot._internal import constants from certbot.compat import os +from certbot.compat import filesystem logger = logging.getLogger(__name__) @@ -324,7 +325,7 @@ class AccountFileStorage(interfaces.AccountStorage): if server_path in reused_servers: next_server_path = reused_servers[server_path] next_dir_path = link_func(next_server_path) - if os.path.islink(next_dir_path) and os.readlink(next_dir_path) == dir_path: + if os.path.islink(next_dir_path) and filesystem.realpath(next_dir_path) == dir_path: possible_next_link = True server_path = next_server_path dir_path = next_dir_path @@ -332,7 +333,7 @@ class AccountFileStorage(interfaces.AccountStorage): # if there's not a next one up to delete, then delete me # and whatever I link to while os.path.islink(dir_path): - target = os.readlink(dir_path) + target = filesystem.realpath(dir_path) os.unlink(dir_path) dir_path = target diff --git a/certbot/certbot/_internal/storage.py b/certbot/certbot/_internal/storage.py index b6c37a5ba..5351a4f8a 100644 --- a/certbot/certbot/_internal/storage.py +++ b/certbot/certbot/_internal/storage.py @@ -214,7 +214,7 @@ def get_link_target(link): """ try: - target = os.readlink(link) + target = filesystem.realpath(link) except OSError: raise errors.CertStorageError( "Expected {0} to be a symlink".format(link)) @@ -223,6 +223,7 @@ def get_link_target(link): target = os.path.join(os.path.dirname(link), target) return os.path.abspath(target) + def _write_live_readme_to(readme_path, is_base_dir=False): prefix = "" if is_base_dir: @@ -665,7 +666,7 @@ class RenewableCert(interfaces.RenewableCert): current_link = getattr(self, kind) if os.path.lexists(current_link): os.unlink(current_link) - os.symlink(os.readlink(previous_link), current_link) + os.symlink(filesystem.realpath(previous_link), current_link) for _, link in previous_symlinks: if os.path.exists(link): @@ -846,7 +847,7 @@ class RenewableCert(interfaces.RenewableCert): link = getattr(self, kind) filename = "{0}{1}.pem".format(kind, version) # Relative rather than absolute target directory - target_directory = os.path.dirname(os.readlink(link)) + target_directory = os.path.dirname(filesystem.realpath(link)) # TODO: it could be safer to make the link first under a temporary # filename, then unlink the old link, then rename the new link # to the old link; this ensures that this process is able to @@ -1121,7 +1122,7 @@ class RenewableCert(interfaces.RenewableCert): # The behavior below keeps the prior key by creating a new # symlink to the old key or the target of the old key symlink. if os.path.islink(old_privkey): - old_privkey = os.readlink(old_privkey) + old_privkey = filesystem.realpath(old_privkey) else: old_privkey = "privkey{0}.pem".format(prior_version) logger.debug("Writing symlink to old private key, %s.", old_privkey) diff --git a/certbot/certbot/compat/filesystem.py b/certbot/certbot/compat/filesystem.py index 0c8a7514b..09c15f80b 100644 --- a/certbot/certbot/compat/filesystem.py +++ b/certbot/certbot/compat/filesystem.py @@ -4,6 +4,7 @@ from __future__ import absolute_import import errno import os # pylint: disable=os-module-forbidden import stat +import sys from acme.magic_typing import List @@ -361,7 +362,8 @@ def realpath(file_path): """ original_path = file_path - if POSIX_MODE: + # Since Python 3.8, os.path.realpath also resolves symlinks on Windows. + if POSIX_MODE or sys.version_info >= (3, 8): path = os.path.realpath(file_path) if os.path.islink(path): # If path returned by realpath is still a link, it means that it failed to diff --git a/certbot/certbot/compat/os.py b/certbot/certbot/compat/os.py index b4aea054f..ed1311c67 100644 --- a/certbot/certbot/compat/os.py +++ b/certbot/certbot/compat/os.py @@ -152,3 +152,14 @@ def fstat(*unused_args, **unused_kwargs): raise RuntimeError('Usage of os.fstat() is forbidden. ' 'Use certbot.compat.filesystem functions instead ' '(eg. has_min_permissions, has_same_ownership).') + + +# Method os.readlink has a significant behavior change with Python 3.8+. Starting +# with this version, it will return the resolved path in its "extended-style" form +# unconditionally, which allows to use more than 259 characters, and its string +# representation is prepended with "\\?\". Problem is that it does it for any path, +# and will make equality comparison fail with paths that will use the simple form. +def readlink(*unused_args, **unused_kwargs): + """Method os.readlink() is forbidden""" + raise RuntimeError('Usage of os.readlink() is forbidden. ' + 'Use certbot.compat.filesystem.realpath() instead.') diff --git a/certbot/tests/cert_manager_test.py b/certbot/tests/cert_manager_test.py index a551e4400..cb8ecc5a1 100644 --- a/certbot/tests/cert_manager_test.py +++ b/certbot/tests/cert_manager_test.py @@ -99,7 +99,7 @@ class UpdateLiveSymlinksTest(BaseCertManagerTest): for kind in ALL_FOUR: os.chdir(os.path.dirname(self.config_files[domain][kind])) self.assertEqual( - filesystem.realpath(os.readlink(self.config_files[domain][kind])), + filesystem.realpath(self.config_files[domain][kind]), filesystem.realpath(archive_paths[domain][kind])) finally: os.chdir(prev_dir) diff --git a/certbot/tests/storage_test.py b/certbot/tests/storage_test.py index b67c4cbce..0ea38df54 100644 --- a/certbot/tests/storage_test.py +++ b/certbot/tests/storage_test.py @@ -330,7 +330,7 @@ class RenewableCertTests(BaseRenewableCertTest): self.test_rc._update_link_to("chain", 3000) # However, current_version doesn't allow querying the resulting # version (because it's a broken link). - self.assertEqual(os.path.basename(os.readlink(self.test_rc.chain)), + self.assertEqual(os.path.basename(filesystem.realpath(self.test_rc.chain)), "chain3000.pem") def test_version(self): @@ -514,7 +514,7 @@ class RenewableCertTests(BaseRenewableCertTest): # privkey. for i in (6, 7, 8): self.assertTrue(os.path.islink(self.test_rc.version("privkey", i))) - self.assertEqual("privkey3.pem", os.path.basename(os.readlink( + self.assertEqual("privkey3.pem", os.path.basename(filesystem.realpath( self.test_rc.version("privkey", i)))) for kind in ALL_FOUR: